From ea9bac69858fa6ae1e0377bb55bc30ac4711e2b7 Mon Sep 17 00:00:00 2001 From: Peter Luenenschloss <peter.luenenschloss@ufz.de> Date: Thu, 11 Jul 2019 16:48:28 +0200 Subject: [PATCH] flagSoilMoistureByBreakDetection implemented --- saqc/funcs/functions.py | 59 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 4 deletions(-) diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index 3557dfbba..aa1cb9388 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -210,7 +210,7 @@ def flagSoilMoistureByPrecipitationEvents(data, flags, field, flagger, prec_refe 'The reference variable {} is either not part of the passed data frame, or the value is not registered to' ' the flags frame. To register it to the flags frame and thus, make it available for reference within ' 'tests, you need to run at least one single target test on it. The test will be skipped.' - .format(soil_temp_reference)) + .format(prec_reference)) return data, flags # retrieve input sampling rate (needed to translate ref and data rates into each other): @@ -313,7 +313,7 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_ holds :param noise_barrier: A float, determining the bound, the data noisy-ness around a potential spike should not exceed, in order to guarantee a justifyed judgement: - There for the coefficient of variation (COVA) of all values in a certain window + Therefor the coefficient of variation (COVA) of all values in a certain window around the datapoint (controlled by param noise_window, but excluding the point itself, is evaluated and tested for: COVA < noise_barrier. @@ -329,9 +329,11 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_ dataseries, data_rate = retrieveTrustworthyOriginal(data, flags, flagger) else: dataseries, data_rate = retrieveTrustworthyOriginal(data[field], flags[field], flagger) + # abort processing if original series has no valid entries! if data_rate is np.nan: return data, flags + # normalize data if nessecary if ~normalized_data: dataseries=dataseries/100 @@ -347,8 +349,10 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_ smoothing_periods = int(np.ceil((filter_window_seconds / data_rate.n))) lower_dev_bound = 1 - dev_cont_factor upper_dev_bound = 1 + dev_cont_factor + if smoothing_periods % 2 == 0: smoothing_periods += 1 + for spike in spikes.index: start_slice = spike - pd.Timedelta(filter_window_size) end_slice = spike + pd.Timedelta(filter_window_size) @@ -358,13 +362,16 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_ deriv=2) length = scnd_derivate.size test_ratio_1 = np.abs(scnd_derivate[int((length-1) / 2)] / scnd_derivate[int((length+1) / 2)]) + if lower_dev_bound < test_ratio_1 < upper_dev_bound: start_slice = spike - pd.Timedelta(noise_window_size) end_slice = spike + pd.Timedelta(noise_window_size) test_slice = dataseries[start_slice:end_slice].drop(spike) test_ratio_2 = np.abs(test_slice.var() / test_slice.mean()) + if test_ratio_2 > noise_barrier: spikes[spike] = False + else: spikes[spike] = False @@ -373,7 +380,7 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_ return data, flags -def flagSoilMoistureByBreakDetection(data, flags, field, flagger, **kwargs): +def flagSoilMoistureByBreakDetection(data, flags, field, flagger, filter_window_size='3h', **kwargs): """Function @@ -395,4 +402,48 @@ def flagSoilMoistureByBreakDetection(data, flags, field, flagger, **kwargs): if data_rate is np.nan: return data, flags - dataseries_shifted = dataseries.shift(+1) + # relative - change - break criteria testing: + abs_change = np.abs(dataseries.shift(+1) - dataseries) + breaks = (abs_change > 0.01) & (abs_change / dataseries > 0.1) + breaks = breaks[breaks == True] + + # First derivative criterion + filter_window_seconds = pd.Timedelta.total_seconds(pd.Timedelta(filter_window_size)) + smoothing_periods = int(np.ceil((filter_window_seconds / data_rate.n))) + if smoothing_periods % 2 == 0: + smoothing_periods += 1 + + for brake in breaks.index: + # slice out slice-to-be-filtered (with some safety extension of 3 hours) + slice_start = brake - pd.Timedelta('12h') -pd.Timedelta('3h') + slice_end = brake + pd.Timedelta('12h') + pd.Timedelta('3h') + data_slice = dataseries[slice_start:slice_end] + first_deri_series = pd.Series(data=savgol_filter(data_slice, + window_length=smoothing_periods, + polyorder=2, + deriv=1), + index=data_slice.index) + # condition constructing and testing: + test_sum = abs((first_deri_series[brake - pd.Timedelta('12h'):brake + pd.Timedelta('12h')].sum()*10) + / first_deri_series.size) + if abs(first_deri_series[brake]) > test_sum: + # second derivative criterion: + slice_start = brake - pd.Timedelta('3h') + slice_end = brake + pd.Timedelta('3h') + data_slice = data_slice[slice_start:slice_end] + second_deri_series = pd.Series(data=savgol_filter(data_slice, + window_length=smoothing_periods, + polyorder=2, + deriv=2), + index=data_slice.index) + # criterion evaluation: + first_second = 0.95 < abs((second_deri_series[brake] / second_deri_series.shift(-1)[brake])) < 1.05 + second_second = second_deri_series.shift(-1)[brake] / second_deri_series.shift(-2)[brake] > 10 + if (~ first_second) | (~ second_second): + breaks[brake] = False + else: + breaks[brake] = False + + breaks = breaks[breaks == True] + flags.loc[breaks.index, field] = flagger.setFlag(flags.loc[breaks.index, field], **kwargs) + return data, flags -- GitLab