Skip to content
Snippets Groups Projects
Commit 26c1649c authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

added constants detection function to QC functions arsenal

parent ab9ab09a
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,9 @@ from ..lib.tools import (
slidingWindowIndices,
inferFrequency,
estimateSamplingRate,
retrieveTrustworthyOriginal)
retrieveTrustworthyOriginal,
offset2periods,
offset2seconds)
from ..dsl import evalExpression
from ..core.config import Params
......@@ -447,8 +449,7 @@ def flagSoilMoistureByBreakDetection(data, flags, field, flagger, filter_window_
breaks = breaks[breaks == True]
# First derivative criterion
filter_window_seconds = pd.Timedelta.total_seconds(pd.Timedelta(filter_window_size))
smoothing_periods = int(np.ceil((filter_window_seconds / data_rate.n)))
smoothing_periods = int(np.ceil(offset2periods(filter_window_size, data_rate)))
if smoothing_periods % 2 == 0:
smoothing_periods += 1
......@@ -493,7 +494,9 @@ def flagSoilMoistureByBreakDetection(data, flags, field, flagger, filter_window_
return data, flags
def flagSoilMoistureByConstantsDetection(data, flags, field, flagger, plateau_window_min='12h', plateau_var_limit=0.0005):
def flagSoilMoistureByConstantsDetection(data, flags, field, flagger, plateau_window_min='12h',
plateau_var_limit=0.0005, rainfall_window='12h', filter_window_size='3h',
i_start_infimum=0.0025, i_end_supremum=0, data_max_tolerance=0.95):
"""Function:
:param data: The pandas dataframe holding the data-to-be flagged.
Data must be indexed by a datetime series and be harmonized onto a
......@@ -510,17 +513,88 @@ def flagSoilMoistureByConstantsDetection(data, flags, field, flagger, plateau_wi
if data_rate is np.nan:
return data, flags
#identify minimal plateaus:
# get data max
data_max = dataseries.max()
# identify minimal plateaus:
plateaus = dataseries.rolling(window=plateau_window_min).apply(lambda x: x.var() < plateau_var_limit, raw=False)\
.astype(bool)
# are there any candidates for beeing flagged plateau-ish
if plateaus.sum() == 0:
return data, flags
# nice reverse rolling trick to fully match True entries with the full length plateau intervals:
window_periods = int(offset2periods(plateau_window_min, data_rate))
plateaus = pd.Series(np.flip(plateaus.values)).rolling(window=window_periods, min_periods=0).\
apply(lambda x: True if True in x else False, raw=True).astype(bool)
# reverse the reversed ts and transform to dataframe, filter for consecutive timestamp values:
plateaus = pd.DataFrame({'date': dataseries.index, 'mask': np.flip(plateaus.values)}, index=dataseries.index)
plateaus = plateaus[plateaus['mask'] == True].drop('mask',axis=1)
seperator_stair = plateaus['date'].diff() != pd.Timedelta(data_rate)
plateaus['interval_nr'] = seperator_stair.cumsum()
plateaus = plateaus['interval_nr']
invalids = pd.Series([])
# loop through the intervals to be checked:
for interval_2_check in range(1, plateaus.max()):
# how big is the interval?
interval_delimeter = plateaus[plateaus==interval_2_check].index[-1] - \
plateaus[plateaus==interval_2_check].index[0]
# slices of the area for the rainfallsearch
check_start = plateaus[plateaus==interval_2_check].index[0] - interval_delimeter - pd.Timedelta(rainfall_window)
check_end = plateaus[plateaus==interval_2_check].index[-1] - interval_delimeter + pd.Timedelta(rainfall_window)
# slices to be smoothed and derivated
smooth_start = check_start - pd.Timedelta(filter_window_size)
smooth_end = check_end + pd.Timedelta(filter_window_size)
data_slice = dataseries[smooth_start:smooth_end]
# calculate first derivative of testing slice:
smoothing_periods = int(np.ceil(offset2periods(filter_window_size, data_rate)))
if smoothing_periods % 2 == 0:
smoothing_periods += 1
# check if the data slice to be checked is sufficiently big for smoothing options:
if data_slice.size < smoothing_periods:
smoothing_periods = data_slice.size
if smoothing_periods % 2 == 0:
smoothing_periods -= 1
# calculate the derivative
first_deri_series = pd.Series(data=savgol_filter(data_slice,
window_length=smoothing_periods,
polyorder=2,
deriv=1),
index=data_slice.index)
# get test slice
first_deri_series = first_deri_series[check_start:check_end]
if first_deri_series.empty:
continue
# check some explicit and implicit conditions:
rainfall_periods = int(offset2periods(rainfall_window, data_rate)*2)
if rainfall_periods % 2 == 0:
rainfall_periods += 1
maximums = first_deri_series.rolling(window=rainfall_periods, center=True, closed='left').max()
minimums = first_deri_series.rolling(window=rainfall_periods, center=True, closed='left').min()
maximums=maximums[maximums > i_start_infimum]
minimums=minimums[minimums < i_end_supremum]
if maximums.empty | minimums.empty:
continue
i_start_index = maximums.index[0]
i_end_index = minimums.index[-1]
if i_start_index > i_end_index:
continue
potential_invalid = data_slice[i_start_index:i_end_index]
# test if the plateau is a high level plateau:
if potential_invalid.mean() > data_max*data_max_tolerance:
invalids = pd.concat([invalids, potential_invalid])
......@@ -159,6 +159,7 @@ def offset2seconds(offset):
return pd.Timedelta.total_seconds(pd.Timedelta(offset))
def offset2periods(input_offset, period_offset):
"""Function returns the number of periods of length "periods_offset" that sum up to length "input offset".
(Namely their fraction.)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment