From 80dc62c22e92e528746d2424f0098a3eb85fbe04 Mon Sep 17 00:00:00 2001 From: Peter Luenenschloss <peter.luenenschloss@ufz.de> Date: Thu, 11 Jul 2019 11:19:18 +0200 Subject: [PATCH] Soil moisture Spike detection integrated at working level / minor bugs fixed --- saqc/funcs/functions.py | 66 ++++++++++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 14 deletions(-) diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index 2521934f5..b18e0aafe 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -247,27 +247,62 @@ def flagSoilMoistureByPrecipitationEvents(data, flags, field, flagger, prec_refe return data, flags -def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, raise_factor=0.15, filter_window_size='3h', - normalized_data=True): - """Function +def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_size='3h', + normalized_data=True, raise_factor=0.15, dev_cont_factor=0.2, noise_barrier=1, + noise_window_size='12h', **kwargs): + """Function detects and flags spikes in soil moisture data. + A datapoint is considered a spike, if: + (1) the quotient to its preceeding datapoint exceeds a certain bound + (controlled by param "raise_factor") + (2) the quotient of the datas second derivate at the preceeding and subsequent timestamps is close enough to 1. + (controlled by param "dev_cont_factor") + (3) the surrounding data is not too noisy. (Coefficient of Variation[+/- 12 h] < 1) + (controlled by param "noise_barrier") + + Following things you should be conscious about when applying the test: NOTE1: You should run less complex tests, especially range-tests, the flag-by-precipitation-test and the flag-by-frost test previously to this one, since the spike check for any potential, unflagged spike, is relatively costly (1 x smoothing + 2 x deviating + 2 x condition application). NOTE2: Test will only provide meaningful results, if dataseries input of soilmoisture data is projected onto - [0,1] interval + [0,1] interval, since, otherwise, the (coefficient of variation < barrier) condition is very likely always true. + (Set normalized_data parameter to "False", to trigger automatic projection) - - :param data: The pandas dataframe holding the data-to-be flagged, as well as the reference - series. Data must be indexed by a datetime series and be harmonized onto a + :param data: The pandas dataframe holding the data-to-be flagged. + Data must be indexed by a datetime series and be harmonized onto a time raster with seconds precision. :param flags: A dataframe holding the flags/flag-entries associated with "data". :param field: Fieldname of the Soil moisture measurements field in data. :param flagger: A flagger - object. (saqc.flagger.X) - + :param filter_window_size: Offset string. For computing second derivate, a Savitzky-Golay filter + is applied onto the timeseries. This Offset string controlls the sice of the + smoothing window used. + :param normalized_data: Boolean. If False, the function projects the data-to-be-flagged onto the + [0,1] interval, before testing for spikes. + :param raise_factor: A float, determinating the bound, the quotient of two consecutive values + has to exceed, to be regarded as potentially spike. A value of 0.x will + trigger the spike test for value y_t, if: + (y_t)/(y_t-1) > 1 + 0.x or: + (y_t)/(y_t-1) < 1 - 0.x. + :param dev_cont_factor: A float, determining the interval, the quotient of the datas second derivate + around a potential spike has to be part of, to trigger spike flagging for + this value. A datapoint y_t will pass this spike condition if, + for dev_cont_factor = 0.x, and the second derivative y'' of y, the condition: + 1 - 0.x < abs((y''_t-1)/(y''_t+1)) < 1 + 0.x + holds + :param noise_barrier: A float, determining the bound, the data noisy-ness around a potential spike + should not exceed, in order to guarantee a justifyed judgement: + There for the coefficient of variation (COVA) of all values in a certain window + around the datapoint (controlled by param noise_window, + but excluding the point itself, is evaluated and tested + for: COVA < noise_barrier. + :param noise_window_size: Offset string, determining the size of the window, the coefficient of variation + is calculated of, to determine data noisy-ness around a potential spike. + The potential spike y_t will be centered in a window of expansion: + [y_t - noise_window_size, y_t + noise_window_size]. """ # retrieve data series input at its original sampling rate @@ -283,14 +318,17 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, raise_factor=0 if ~normalized_data: dataseries=dataseries/100 - quotient_series = dataseries/dataseries.shift(-1) + quotient_series = dataseries/dataseries.shift(+1) spikes = (quotient_series > (1 + raise_factor)) | (quotient_series < (1 - raise_factor)) spikes = spikes[spikes == True] # loop through spikes: (loop may sound ugly - but since the number of spikes is supposed to not exceed the # thousands for year data - a loop going through all the spikes instances is much faster than # a rolling window, rolling all through a stacked year dataframe ) + # calculate some values, repeatedly needed in the course of the loop: filter_window_seconds = pd.Timedelta.total_seconds(pd.Timedelta(filter_window_size)) smoothing_periods = int(np.ceil((filter_window_seconds / data_rate.n))) + lower_dev_bound = 1 - dev_cont_factor + upper_dev_bound = 1 + dev_cont_factor if smoothing_periods % 2 == 0: smoothing_periods += 1 for spike in spikes.index: @@ -302,12 +340,12 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, raise_factor=0 deriv=2) length = scnd_derivate.size test_ratio_1 = np.abs(scnd_derivate[int((length-1) / 2)] / scnd_derivate[int((length+1) / 2)]) - if 0.8 < test_ratio_1 < 1.2: - start_slice = spike - pd.Timedelta('12h') - end_slice = spike + pd.Timedelta('12h') - test_slice = dataseries[start_slice:end_slice] + if lower_dev_bound < test_ratio_1 < upper_dev_bound: + start_slice = spike - pd.Timedelta(noise_window_size) + end_slice = spike + pd.Timedelta(noise_window_size) + test_slice = dataseries[start_slice:end_slice].drop(spike) test_ratio_2 = np.abs(test_slice.var() / test_slice.mean()) - if test_ratio_2 > 1: + if test_ratio_2 > noise_barrier: spikes[spike] = False else: spikes[spike] = False -- GitLab