Skip to content
Snippets Groups Projects
Commit ea9bac69 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

flagSoilMoistureByBreakDetection implemented

parent 1b84c19e
No related branches found
No related tags found
No related merge requests found
......@@ -210,7 +210,7 @@ def flagSoilMoistureByPrecipitationEvents(data, flags, field, flagger, prec_refe
'The reference variable {} is either not part of the passed data frame, or the value is not registered to'
' the flags frame. To register it to the flags frame and thus, make it available for reference within '
'tests, you need to run at least one single target test on it. The test will be skipped.'
.format(soil_temp_reference))
.format(prec_reference))
return data, flags
# retrieve input sampling rate (needed to translate ref and data rates into each other):
......@@ -313,7 +313,7 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_
holds
:param noise_barrier: A float, determining the bound, the data noisy-ness around a potential spike
should not exceed, in order to guarantee a justifyed judgement:
There for the coefficient of variation (COVA) of all values in a certain window
Therefor the coefficient of variation (COVA) of all values in a certain window
around the datapoint (controlled by param noise_window,
but excluding the point itself, is evaluated and tested
for: COVA < noise_barrier.
......@@ -329,9 +329,11 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_
dataseries, data_rate = retrieveTrustworthyOriginal(data, flags, flagger)
else:
dataseries, data_rate = retrieveTrustworthyOriginal(data[field], flags[field], flagger)
# abort processing if original series has no valid entries!
if data_rate is np.nan:
return data, flags
# normalize data if nessecary
if ~normalized_data:
dataseries=dataseries/100
......@@ -347,8 +349,10 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_
smoothing_periods = int(np.ceil((filter_window_seconds / data_rate.n)))
lower_dev_bound = 1 - dev_cont_factor
upper_dev_bound = 1 + dev_cont_factor
if smoothing_periods % 2 == 0:
smoothing_periods += 1
for spike in spikes.index:
start_slice = spike - pd.Timedelta(filter_window_size)
end_slice = spike + pd.Timedelta(filter_window_size)
......@@ -358,13 +362,16 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_
deriv=2)
length = scnd_derivate.size
test_ratio_1 = np.abs(scnd_derivate[int((length-1) / 2)] / scnd_derivate[int((length+1) / 2)])
if lower_dev_bound < test_ratio_1 < upper_dev_bound:
start_slice = spike - pd.Timedelta(noise_window_size)
end_slice = spike + pd.Timedelta(noise_window_size)
test_slice = dataseries[start_slice:end_slice].drop(spike)
test_ratio_2 = np.abs(test_slice.var() / test_slice.mean())
if test_ratio_2 > noise_barrier:
spikes[spike] = False
else:
spikes[spike] = False
......@@ -373,7 +380,7 @@ def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, filter_window_
return data, flags
def flagSoilMoistureByBreakDetection(data, flags, field, flagger, **kwargs):
def flagSoilMoistureByBreakDetection(data, flags, field, flagger, filter_window_size='3h', **kwargs):
"""Function
......@@ -395,4 +402,48 @@ def flagSoilMoistureByBreakDetection(data, flags, field, flagger, **kwargs):
if data_rate is np.nan:
return data, flags
dataseries_shifted = dataseries.shift(+1)
# relative - change - break criteria testing:
abs_change = np.abs(dataseries.shift(+1) - dataseries)
breaks = (abs_change > 0.01) & (abs_change / dataseries > 0.1)
breaks = breaks[breaks == True]
# First derivative criterion
filter_window_seconds = pd.Timedelta.total_seconds(pd.Timedelta(filter_window_size))
smoothing_periods = int(np.ceil((filter_window_seconds / data_rate.n)))
if smoothing_periods % 2 == 0:
smoothing_periods += 1
for brake in breaks.index:
# slice out slice-to-be-filtered (with some safety extension of 3 hours)
slice_start = brake - pd.Timedelta('12h') -pd.Timedelta('3h')
slice_end = brake + pd.Timedelta('12h') + pd.Timedelta('3h')
data_slice = dataseries[slice_start:slice_end]
first_deri_series = pd.Series(data=savgol_filter(data_slice,
window_length=smoothing_periods,
polyorder=2,
deriv=1),
index=data_slice.index)
# condition constructing and testing:
test_sum = abs((first_deri_series[brake - pd.Timedelta('12h'):brake + pd.Timedelta('12h')].sum()*10)
/ first_deri_series.size)
if abs(first_deri_series[brake]) > test_sum:
# second derivative criterion:
slice_start = brake - pd.Timedelta('3h')
slice_end = brake + pd.Timedelta('3h')
data_slice = data_slice[slice_start:slice_end]
second_deri_series = pd.Series(data=savgol_filter(data_slice,
window_length=smoothing_periods,
polyorder=2,
deriv=2),
index=data_slice.index)
# criterion evaluation:
first_second = 0.95 < abs((second_deri_series[brake] / second_deri_series.shift(-1)[brake])) < 1.05
second_second = second_deri_series.shift(-1)[brake] / second_deri_series.shift(-2)[brake] > 10
if (~ first_second) | (~ second_second):
breaks[brake] = False
else:
breaks[brake] = False
breaks = breaks[breaks == True]
flags.loc[breaks.index, field] = flagger.setFlag(flags.loc[breaks.index, field], **kwargs)
return data, flags
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment