Skip to content
Snippets Groups Projects
Commit 2319b322 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

flag soil moisture by constants detection finally implemented / small bugfix...

flag soil moisture by constants detection finally implemented / small bugfix in flagConstantsVarianceBased
parent ca34de91
No related branches found
No related tags found
No related merge requests found
...@@ -96,7 +96,7 @@ def flagConstant_varianceBased( ...@@ -96,7 +96,7 @@ def flagConstant_varianceBased(
if plateaus.sum() == 0: if plateaus.sum() == 0:
return data, flagger return data, flagger
plateaus.fillna(method="bfill", limit=min_periods, inplace=True) plateaus.fillna(method="bfill", limit=min_periods-1, inplace=True)
# result: # result:
plateaus = (plateaus[plateaus == 1.0]).index plateaus = (plateaus[plateaus == 1.0]).index
......
...@@ -9,6 +9,7 @@ from scipy.signal import savgol_filter ...@@ -9,6 +9,7 @@ from scipy.signal import savgol_filter
from saqc.funcs.break_detection import flagBreaks_spektrumBased from saqc.funcs.break_detection import flagBreaks_spektrumBased
from saqc.funcs.spike_detection import flagSpikes_spektrumBased from saqc.funcs.spike_detection import flagSpikes_spektrumBased
from saqc.funcs.constants_detection import flagConstant_varianceBased
from saqc.funcs.register import register from saqc.funcs.register import register
from saqc.lib.tools import ( from saqc.lib.tools import (
estimateSamplingRate, estimateSamplingRate,
...@@ -263,10 +264,12 @@ def flagSoilMoistureByConstantsDetection( ...@@ -263,10 +264,12 @@ def flagSoilMoistureByConstantsDetection(
flagger, flagger,
plateau_window_min="12h", plateau_window_min="12h",
plateau_var_limit=0.0005, plateau_var_limit=0.0005,
rainfall_window="12h", rainfall_window_range="12h",
filter_window_size="3h", filter_window_size="3h",
i_start_infimum=0.0025, var_total_nans=np.inf,
i_end_supremum=0, var_consec_nans=np.inf,
derivative_maximum_lb=0.0025,
derivative_minimum_ub=0,
data_max_tolerance=0.95, data_max_tolerance=0.95,
**kwargs **kwargs
): ):
...@@ -274,126 +277,62 @@ def flagSoilMoistureByConstantsDetection( ...@@ -274,126 +277,62 @@ def flagSoilMoistureByConstantsDetection(
"""Function is not ready to use yet: we are waiting for response from the author of [Paper] in order of getting """Function is not ready to use yet: we are waiting for response from the author of [Paper] in order of getting
able to exclude some sources of confusion. able to exclude some sources of confusion.
Note, function has to be harmonized to equidistant freq_grid
Note, in current implementation, it has to hold that: (rainfall_window_range >= plateau_window_min)
:param data: The pandas dataframe holding the data-to-be flagged. :param data: The pandas dataframe holding the data-to-be flagged.
Data must be indexed by a datetime series and be harmonized onto a Data must be indexed by a datetime series and be harmonized onto a
time raster with seconds precision (skips allowed). time raster with seconds precision (skips allowed).
:param field: Fieldname of the Soil moisture measurements field in data. :param field: Fieldname of the Soil moisture measurements field in data.
:param flagger: A flagger - object. (saqc.flagger.X) :param flagger: A flagger - object. (saqc.flagger.X)
""" """
dataseries, data_rate = retrieveTrustworthyOriginal(data, field, flagger)
# abort processing if original series has no valid entries!
if data_rate is np.nan:
return data, flagger
# get data max
data_max = dataseries.max()
min_periods = int(offset2periods(plateau_window_min, data_rate)) # get plateaus:
# identify minimal plateaus: _, comp_flagger = flagConstant_varianceBased(data, field, flagger, plateau_window_min=plateau_window_min,
plateaus = dataseries.rolling(window=plateau_window_min).apply( plateau_var_limit=plateau_var_limit, var_total_nans=var_total_nans,
lambda x: (x.var() > plateau_var_limit) | (x.size < min_periods), raw=False var_consec_nans=var_consec_nans)
)
plateaus = ~plateaus.astype(bool)
# are there any candidates for beeing flagged plateau-ish new_plateaus = (comp_flagger.getFlags(field)).eq(flagger.getFlags(field))
if plateaus.sum() == 0: # get dataseries at its sampling freq:
return data, flagger dataseries, moist_rate = retrieveTrustworthyOriginal(data, field, flagger)
# get valuse referring to dataseries:
plateaus_reverse = pd.Series(np.flip(plateaus.values), index=plateaus.index) new_plateaus.resample(pd.Timedelta(moist_rate)).asfreq()
reverse_check = ( # cut out test_slices for min/max derivatives condition check:
plateaus_reverse.rolling(window=plateau_window_min) # offset 2 periods:
.apply(lambda x: True if True in x.values else False, raw=False) rainfall_window_range = int(np.ceil(pd.Timedelta(rainfall_window_range) / moist_rate))
.astype(bool) plateau_window_min = int(np.ceil(pd.Timedelta(plateau_window_min) / moist_rate))
) period_diff = rainfall_window_range - plateau_window_min
plateaus = pd.Series(np.flip(reverse_check.values), index=plateaus.index) # we cast plateua series to int - because replace has problems with replacing bools by "method".
new_plateaus = new_plateaus.astype(int)
# get plateau groups:
group_counter = new_plateaus.cumsum()
group_counter = group_counter[group_counter.diff() == 0]
group_counter.name = 'group_counter'
plateau_groups = pd.merge(group_counter, dataseries, left_index=True, right_index=True, how='inner')
# test mean-condition on plateau groups:
test_barrier = data_max_tolerance*dataseries.max()
plateau_group_drops = plateau_groups.groupby('group_counter').filter(lambda x: x[field].mean() <= test_barrier)
# discard values that didnt pass the test from plateau candidate series:
new_plateaus[plateau_group_drops.index] = 1
# we extend the plateaus to cover condition testing sets
# 1: extend backwards (with a technical "one" added):
cond1_sets = new_plateaus.replace(1, method='bfill', limit=(rainfall_window_range + plateau_window_min))
# 2. extend forwards:
if period_diff > 0:
cond1_sets = cond1_sets.replace(1, method='ffill', limit=period_diff)
# get first derivative
first_derivative = dataseries.diff()
# cumsum trick to seperate continous plateau groups from each other
group_counter = cond1_sets.cumsum()
group_counter = group_counter[group_counter.diff() == 0]
group_counter.name = 'group_counter'
group_frame = pd.merge(group_counter, first_derivative, left_index=True, right_index=True, how='inner')
group_frame = group_frame.groupby('group_counter')
condition_passed = group_frame.filter(
lambda x: (x[field].max() >= derivative_maximum_lb) & (x[field].min() <= derivative_minimum_ub))
flagger = flagger.setFlags(field, loc=condition_passed.index, **kwargs)
# reverse the reversed ts and transform to dataframe, filter for consecutive timestamp values:
plateaus = pd.DataFrame(
{"date": dataseries.index, "mask": np.flip(plateaus.values)},
index=dataseries.index,
)
plateaus = plateaus[plateaus["mask"] == True].drop("mask", axis=1)
seperator_stair = plateaus["date"].diff() != pd.Timedelta(data_rate)
plateaus["interval_nr"] = seperator_stair.cumsum()
plateaus = plateaus["interval_nr"]
invalids = pd.Series([])
# loop through the intervals to be checked:
for interval_2_check in range(1, plateaus.max() + 1):
# how big is the interval?
interval_delimeter = (
plateaus[plateaus == interval_2_check].index[-1]
- plateaus[plateaus == interval_2_check].index[0]
)
# slices of the area for the rainfallsearch
check_start = (
plateaus[plateaus == interval_2_check].index[0]
- interval_delimeter
- pd.Timedelta(rainfall_window)
)
check_end = (
plateaus[plateaus == interval_2_check].index[-1]
- interval_delimeter
+ pd.Timedelta(rainfall_window)
)
# slices to be smoothed and derivated
smooth_start = check_start - pd.Timedelta(filter_window_size)
smooth_end = check_end + pd.Timedelta(filter_window_size)
data_slice = dataseries[smooth_start:smooth_end]
# calculate first derivative of testing slice:
smoothing_periods = int(np.ceil(offset2periods(filter_window_size, data_rate)))
if smoothing_periods % 2 == 0:
smoothing_periods += 1
# check if the data slice to be checked is sufficiently big for smoothing options:
if data_slice.size < smoothing_periods:
smoothing_periods = data_slice.size
if smoothing_periods % 2 == 0:
smoothing_periods -= 1
# calculate the derivative
first_deri_series = pd.Series(
data=savgol_filter(
data_slice, window_length=smoothing_periods, polyorder=2, deriv=1
),
index=data_slice.index,
)
# get test slice
first_deri_series = first_deri_series[check_start:check_end]
if first_deri_series.empty:
continue
# check some explicit and implicit conditions:
rainfall_periods = int(offset2periods(rainfall_window, data_rate) * 2)
if rainfall_periods % 2 == 0:
rainfall_periods += 1
maximums = first_deri_series.rolling(
window=rainfall_periods, center=True, closed="left"
).max()
minimums = first_deri_series.rolling(
window=rainfall_periods, center=True, closed="left"
).min()
maximums = maximums[maximums > i_start_infimum]
minimums = minimums[minimums < i_end_supremum]
if maximums.empty | minimums.empty:
continue
i_start_index = maximums.index[0]
i_end_index = minimums.index[-1]
if i_start_index > i_end_index:
continue
potential_invalid = data_slice[i_start_index:i_end_index]
# test if the plateau is a high level plateau:
if potential_invalid.mean() > data_max * data_max_tolerance:
invalids = pd.concat([invalids, potential_invalid])
flagger = flagger.setFlag(field, invalids.index, **kwargs)
return data, flagger return data, flagger
...@@ -5,6 +5,7 @@ import pytest ...@@ -5,6 +5,7 @@ import pytest
import numpy as np import numpy as np
from saqc.funcs.constants_detection import flagConstant_varianceBased from saqc.funcs.constants_detection import flagConstant_varianceBased
from saqc.funcs.soil_moisture_tests import flagSoilMoistureByConstantsDetection
from test.common import TESTFLAGGER, initData from test.common import TESTFLAGGER, initData
...@@ -20,7 +21,7 @@ def data(): ...@@ -20,7 +21,7 @@ def data():
@pytest.mark.parametrize("flagger", TESTFLAGGER) @pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagConstants_varianceBased(data, flagger): def test_flagConstants_varianceBased(data, flagger):
data.iloc[5:25] = 0 data.iloc[5:25] = 200
expected = np.arange(5, 25) expected = np.arange(5, 25)
field, *_ = data.columns field, *_ = data.columns
flagger = flagger.initFlags(data) flagger = flagger.initFlags(data)
...@@ -30,3 +31,5 @@ def test_flagConstants_varianceBased(data, flagger): ...@@ -30,3 +31,5 @@ def test_flagConstants_varianceBased(data, flagger):
flag_result = flagger_result.getFlags(field) flag_result = flagger_result.getFlags(field)
test_sum = (flag_result[expected] == flagger.BAD).sum() test_sum = (flag_result[expected] == flagger.BAD).sum()
assert test_sum == len(expected) assert test_sum == len(expected)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment