Skip to content
Snippets Groups Projects
Commit edc1c22b authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

basic plateau flagger added to QC functions arsenal

parent b7046dfe
No related branches found
No related tags found
No related merge requests found
......@@ -128,6 +128,55 @@ def flagMad(data, flags, field, flagger, length, z, freq=None, **kwargs):
return data, flags
def flagConstants_VarianceBased(data, flags, field, flagger, plateau_window_min='12h',
plateau_var_limit=0.0005, **kwargs):
"""Function flags plateaus/series of constant values. An interval of values y(t),..y(t+n) is flagged, if:
(1) n > "plateau_interval_min"
(2) variance(y(t),...,y(t+n) < plateau_var_limit
:param data: The pandas dataframe holding the data-to-be flagged.
Data must be indexed by a datetime series and be harmonized onto a
time raster with seconds precision (skips allowed).
:param flags: A dataframe holding the flags/flag-entries associated with "data".
:param field: Fieldname of the Soil moisture measurements field in data.
:param flagger: A flagger - object. (saqc.flagger.X)
:param plateau_window_min: Offset String. Only intervals of minimum size "plateau_window_min" have the
chance to get flagged as constant intervals
:param plateau_var_limit: Float. The upper barrier, the variance of an interval mus not exceed, if the
interval wants to be flagged a plateau.
"""
if isinstance(data, pd.Series):
dataseries, data_rate = retrieveTrustworthyOriginal(data, flags, flagger)
else:
dataseries, data_rate = retrieveTrustworthyOriginal(data[field], flags[field], flagger)
# abort processing if original series has no valid entries!
if data_rate is np.nan:
return data, flags
min_periods = int(offset2periods(plateau_window_min, data_rate))
# identify minimal plateaus:
plateaus = dataseries.rolling(window=plateau_window_min).apply(lambda x: (x.var() > plateau_var_limit) | (x.size < min_periods), raw=False)
plateaus = (~plateaus.astype(bool))
# are there any candidates for beeing flagged plateau-ish
if plateaus.sum() == 0:
return data, flags
plateaus_reverse = pd.Series(np.flip(plateaus.values), index=plateaus.index)
reverse_check = plateaus_reverse.rolling(window=plateau_window_min).apply(lambda x: True if True in x.values else False, raw=False).astype(bool)
plateaus = pd.Series(np.flip(reverse_check.values), index=plateaus.index)
if isinstance(flags, pd.Series):
flags.loc[spikes.index, field] = flagger.setFlag(flags.loc[spikes.index, field], **kwargs)
else:
flags.loc[spikes.index] = flagger.setFlag(flags.loc[spikes.index], **kwargs)
return data, flags
@register("Spikes_SpektrumBased")
def flagSpikes_SpektrumBased(data, flags, field, flagger, diff_method='raw', filter_window_size='3h',
raise_factor=0.15, dev_cont_factor=0.2, noise_barrier=1, noise_window_size='12h',
......@@ -273,6 +322,7 @@ def flagSpikes_SpektrumBased(data, flags, field, flagger, diff_method='raw', fil
spikes[spike] = False
spikes = spikes[spikes == True]
if isinstance(flags, pd.Series):
flags.loc[spikes.index, field] = flagger.setFlag(flags.loc[spikes.index, field], **kwargs)
else:
......@@ -646,18 +696,20 @@ def flagSoilMoistureByConstantsDetection(data, flags, field, flagger, plateau_wi
# get data max
data_max = dataseries.max()
min_periods = int(offset2periods(plateau_window_min, data_rate))
# identify minimal plateaus:
plateaus = dataseries.rolling(window=plateau_window_min).apply(lambda x: x.var() > plateau_var_limit, raw=False).astype(bool)
plateaus = ~plateaus
plateaus = dataseries.rolling(window=plateau_window_min).apply(lambda x: (x.var() > plateau_var_limit) | (x.size < min_periods), raw=False)
plateaus = (~plateaus.astype(bool))
# are there any candidates for beeing flagged plateau-ish
if plateaus.sum() == 0:
return data, flags
# nice reverse rolling trick to fully match True entries with the full length plateau intervals:
window_periods = int(offset2periods(plateau_window_min, data_rate))
plateaus = pd.Series(np.flip(plateaus.values)).rolling(window=window_periods, min_periods=0).\
apply(lambda x: True if True in x else False, raw=True).astype(bool)
plateaus_reverse = pd.Series(np.flip(plateaus.values), index=plateaus.index)
reverse_check = plateaus_reverse.rolling(window=plateau_window_min).apply(lambda x: True if True in x.values else False, raw=False).astype(bool)
plateaus = pd.Series(np.flip(reverse_check.values), index=plateaus.index)
# reverse the reversed ts and transform to dataframe, filter for consecutive timestamp values:
plateaus = pd.DataFrame({'date': dataseries.index, 'mask': np.flip(plateaus.values)}, index=dataseries.index)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment