From 2319b32294b96ea13ac8a79a0cea11900db244c2 Mon Sep 17 00:00:00 2001 From: Peter Luenenschloss <peter.luenenschloss@ufz.de> Date: Tue, 3 Dec 2019 16:14:22 +0100 Subject: [PATCH] flag soil moisture by constants detection finally implemented / small bugfix in flagConstantsVarianceBased --- saqc/funcs/constants_detection.py | 2 +- saqc/funcs/soil_moisture_tests.py | 171 ++++++++----------------- test/funcs/test_constants_detection.py | 5 +- 3 files changed, 60 insertions(+), 118 deletions(-) diff --git a/saqc/funcs/constants_detection.py b/saqc/funcs/constants_detection.py index 8e6b2850a..ff099f91a 100644 --- a/saqc/funcs/constants_detection.py +++ b/saqc/funcs/constants_detection.py @@ -96,7 +96,7 @@ def flagConstant_varianceBased( if plateaus.sum() == 0: return data, flagger - plateaus.fillna(method="bfill", limit=min_periods, inplace=True) + plateaus.fillna(method="bfill", limit=min_periods-1, inplace=True) # result: plateaus = (plateaus[plateaus == 1.0]).index diff --git a/saqc/funcs/soil_moisture_tests.py b/saqc/funcs/soil_moisture_tests.py index f9c232940..37f9ad1de 100644 --- a/saqc/funcs/soil_moisture_tests.py +++ b/saqc/funcs/soil_moisture_tests.py @@ -9,6 +9,7 @@ from scipy.signal import savgol_filter from saqc.funcs.break_detection import flagBreaks_spektrumBased from saqc.funcs.spike_detection import flagSpikes_spektrumBased +from saqc.funcs.constants_detection import flagConstant_varianceBased from saqc.funcs.register import register from saqc.lib.tools import ( estimateSamplingRate, @@ -263,10 +264,12 @@ def flagSoilMoistureByConstantsDetection( flagger, plateau_window_min="12h", plateau_var_limit=0.0005, - rainfall_window="12h", + rainfall_window_range="12h", filter_window_size="3h", - i_start_infimum=0.0025, - i_end_supremum=0, + var_total_nans=np.inf, + var_consec_nans=np.inf, + derivative_maximum_lb=0.0025, + derivative_minimum_ub=0, data_max_tolerance=0.95, **kwargs ): @@ -274,126 +277,62 @@ def flagSoilMoistureByConstantsDetection( """Function is not ready to use yet: we are waiting for response from the author of [Paper] in order of getting able to exclude some sources of confusion. + Note, function has to be harmonized to equidistant freq_grid + + Note, in current implementation, it has to hold that: (rainfall_window_range >= plateau_window_min) + :param data: The pandas dataframe holding the data-to-be flagged. Data must be indexed by a datetime series and be harmonized onto a time raster with seconds precision (skips allowed). :param field: Fieldname of the Soil moisture measurements field in data. :param flagger: A flagger - object. (saqc.flagger.X) """ - dataseries, data_rate = retrieveTrustworthyOriginal(data, field, flagger) - - # abort processing if original series has no valid entries! - if data_rate is np.nan: - return data, flagger - - # get data max - data_max = dataseries.max() - min_periods = int(offset2periods(plateau_window_min, data_rate)) - # identify minimal plateaus: - plateaus = dataseries.rolling(window=plateau_window_min).apply( - lambda x: (x.var() > plateau_var_limit) | (x.size < min_periods), raw=False - ) - plateaus = ~plateaus.astype(bool) + # get plateaus: + _, comp_flagger = flagConstant_varianceBased(data, field, flagger, plateau_window_min=plateau_window_min, + plateau_var_limit=plateau_var_limit, var_total_nans=var_total_nans, + var_consec_nans=var_consec_nans) - # are there any candidates for beeing flagged plateau-ish - if plateaus.sum() == 0: - return data, flagger - - plateaus_reverse = pd.Series(np.flip(plateaus.values), index=plateaus.index) - reverse_check = ( - plateaus_reverse.rolling(window=plateau_window_min) - .apply(lambda x: True if True in x.values else False, raw=False) - .astype(bool) - ) - plateaus = pd.Series(np.flip(reverse_check.values), index=plateaus.index) + new_plateaus = (comp_flagger.getFlags(field)).eq(flagger.getFlags(field)) + # get dataseries at its sampling freq: + dataseries, moist_rate = retrieveTrustworthyOriginal(data, field, flagger) + # get valuse referring to dataseries: + new_plateaus.resample(pd.Timedelta(moist_rate)).asfreq() + # cut out test_slices for min/max derivatives condition check: + # offset 2 periods: + rainfall_window_range = int(np.ceil(pd.Timedelta(rainfall_window_range) / moist_rate)) + plateau_window_min = int(np.ceil(pd.Timedelta(plateau_window_min) / moist_rate)) + period_diff = rainfall_window_range - plateau_window_min + # we cast plateua series to int - because replace has problems with replacing bools by "method". + new_plateaus = new_plateaus.astype(int) + # get plateau groups: + group_counter = new_plateaus.cumsum() + group_counter = group_counter[group_counter.diff() == 0] + group_counter.name = 'group_counter' + plateau_groups = pd.merge(group_counter, dataseries, left_index=True, right_index=True, how='inner') + # test mean-condition on plateau groups: + test_barrier = data_max_tolerance*dataseries.max() + plateau_group_drops = plateau_groups.groupby('group_counter').filter(lambda x: x[field].mean() <= test_barrier) + # discard values that didnt pass the test from plateau candidate series: + new_plateaus[plateau_group_drops.index] = 1 + + # we extend the plateaus to cover condition testing sets + # 1: extend backwards (with a technical "one" added): + cond1_sets = new_plateaus.replace(1, method='bfill', limit=(rainfall_window_range + plateau_window_min)) + # 2. extend forwards: + if period_diff > 0: + cond1_sets = cond1_sets.replace(1, method='ffill', limit=period_diff) + # get first derivative + first_derivative = dataseries.diff() + # cumsum trick to seperate continous plateau groups from each other + group_counter = cond1_sets.cumsum() + group_counter = group_counter[group_counter.diff() == 0] + group_counter.name = 'group_counter' + group_frame = pd.merge(group_counter, first_derivative, left_index=True, right_index=True, how='inner') + group_frame = group_frame.groupby('group_counter') + condition_passed = group_frame.filter( + lambda x: (x[field].max() >= derivative_maximum_lb) & (x[field].min() <= derivative_minimum_ub)) + + flagger = flagger.setFlags(field, loc=condition_passed.index, **kwargs) - # reverse the reversed ts and transform to dataframe, filter for consecutive timestamp values: - plateaus = pd.DataFrame( - {"date": dataseries.index, "mask": np.flip(plateaus.values)}, - index=dataseries.index, - ) - plateaus = plateaus[plateaus["mask"] == True].drop("mask", axis=1) - seperator_stair = plateaus["date"].diff() != pd.Timedelta(data_rate) - plateaus["interval_nr"] = seperator_stair.cumsum() - plateaus = plateaus["interval_nr"] - invalids = pd.Series([]) - # loop through the intervals to be checked: - for interval_2_check in range(1, plateaus.max() + 1): - # how big is the interval? - interval_delimeter = ( - plateaus[plateaus == interval_2_check].index[-1] - - plateaus[plateaus == interval_2_check].index[0] - ) - - # slices of the area for the rainfallsearch - check_start = ( - plateaus[plateaus == interval_2_check].index[0] - - interval_delimeter - - pd.Timedelta(rainfall_window) - ) - check_end = ( - plateaus[plateaus == interval_2_check].index[-1] - - interval_delimeter - + pd.Timedelta(rainfall_window) - ) - - # slices to be smoothed and derivated - smooth_start = check_start - pd.Timedelta(filter_window_size) - smooth_end = check_end + pd.Timedelta(filter_window_size) - data_slice = dataseries[smooth_start:smooth_end] - - # calculate first derivative of testing slice: - smoothing_periods = int(np.ceil(offset2periods(filter_window_size, data_rate))) - if smoothing_periods % 2 == 0: - smoothing_periods += 1 - - # check if the data slice to be checked is sufficiently big for smoothing options: - if data_slice.size < smoothing_periods: - smoothing_periods = data_slice.size - if smoothing_periods % 2 == 0: - smoothing_periods -= 1 - - # calculate the derivative - first_deri_series = pd.Series( - data=savgol_filter( - data_slice, window_length=smoothing_periods, polyorder=2, deriv=1 - ), - index=data_slice.index, - ) - - # get test slice - first_deri_series = first_deri_series[check_start:check_end] - if first_deri_series.empty: - continue - - # check some explicit and implicit conditions: - rainfall_periods = int(offset2periods(rainfall_window, data_rate) * 2) - if rainfall_periods % 2 == 0: - rainfall_periods += 1 - maximums = first_deri_series.rolling( - window=rainfall_periods, center=True, closed="left" - ).max() - minimums = first_deri_series.rolling( - window=rainfall_periods, center=True, closed="left" - ).min() - - maximums = maximums[maximums > i_start_infimum] - minimums = minimums[minimums < i_end_supremum] - - if maximums.empty | minimums.empty: - continue - - i_start_index = maximums.index[0] - i_end_index = minimums.index[-1] - - if i_start_index > i_end_index: - continue - - potential_invalid = data_slice[i_start_index:i_end_index] - # test if the plateau is a high level plateau: - if potential_invalid.mean() > data_max * data_max_tolerance: - invalids = pd.concat([invalids, potential_invalid]) - - flagger = flagger.setFlag(field, invalids.index, **kwargs) return data, flagger diff --git a/test/funcs/test_constants_detection.py b/test/funcs/test_constants_detection.py index 07eaa4ff9..b9b7d3545 100644 --- a/test/funcs/test_constants_detection.py +++ b/test/funcs/test_constants_detection.py @@ -5,6 +5,7 @@ import pytest import numpy as np from saqc.funcs.constants_detection import flagConstant_varianceBased +from saqc.funcs.soil_moisture_tests import flagSoilMoistureByConstantsDetection from test.common import TESTFLAGGER, initData @@ -20,7 +21,7 @@ def data(): @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_flagConstants_varianceBased(data, flagger): - data.iloc[5:25] = 0 + data.iloc[5:25] = 200 expected = np.arange(5, 25) field, *_ = data.columns flagger = flagger.initFlags(data) @@ -30,3 +31,5 @@ def test_flagConstants_varianceBased(data, flagger): flag_result = flagger_result.getFlags(field) test_sum = (flag_result[expected] == flagger.BAD).sum() assert test_sum == len(expected) + + -- GitLab