Skip to content
Snippets Groups Projects
Commit 31bb9864 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

slope criteria added to raiseCheck -> raise checker now fully equipped to face...

slope criteria added to raiseCheck -> raise checker now fully equipped to face all the data out there
parent 0751d31b
No related branches found
No related tags found
2 merge requests!21Gcef testfuncs,!20Gcef testfuncs
Pipeline #2918 passed with stage
in 7 minutes and 5 seconds
...@@ -22,7 +22,8 @@ from saqc.lib.tools import ( ...@@ -22,7 +22,8 @@ from saqc.lib.tools import (
@register("spikes_limitRaise") @register("spikes_limitRaise")
def flagSpikes_limitRaise( def flagSpikes_limitRaise(
data, field, flagger, thresh, raise_window, intended_freq, average_window=None, mean_raise_factor=2, **kwargs data, field, flagger, thresh, raise_window, intended_freq, average_window=None, mean_raise_factor=2, min_slope=None,
min_slope_weight=0.8, **kwargs
): ):
""" flag a value if it deviates from any of its preceeding values within "window" range, in a margin higher than """ flag a value if it deviates from any of its preceeding values within "window" range, in a margin higher than
thresh """ thresh """
...@@ -30,16 +31,16 @@ def flagSpikes_limitRaise( ...@@ -30,16 +31,16 @@ def flagSpikes_limitRaise(
# NOTE1: this implementation accounts for the case of "pseudo" spikes that result from checking against outliers # NOTE1: this implementation accounts for the case of "pseudo" spikes that result from checking against outliers
# NOTE2: the test is designed to work on raw data as well as on regularized # NOTE2: the test is designed to work on raw data as well as on regularized
# prepare input args
dataseries = data[field].dropna() dataseries = data[field].dropna()
raise_window = pd.Timedelta(raise_window) raise_window = pd.Timedelta(raise_window)
intended_freq = pd.Timedelta(intended_freq) intended_freq = pd.Timedelta(intended_freq)
min_slope = np.abs(min_slope)
raise_window = pd.Timedelta(raise_window)
if average_window is None: if average_window is None:
average_window = 1.5 * pd.Timedelta(raise_window) average_window = 1.5 * pd.Timedelta(raise_window)
intended_freq = pd.Timedelta(intended_freq) # decide if flagging raises or drops?
if thresh > 0: if thresh > 0:
comp = op.ge comp = op.ge
mi_ma = np.max mi_ma = np.max
...@@ -47,6 +48,7 @@ def flagSpikes_limitRaise( ...@@ -47,6 +48,7 @@ def flagSpikes_limitRaise(
comp = op.le comp = op.le
mi_ma = np.min mi_ma = np.min
# jit funcs to roll with:
@numba.jit(nopython=True) @numba.jit(nopython=True)
def raise_check(x, thresh): def raise_check(x, thresh):
...@@ -61,21 +63,44 @@ def flagSpikes_limitRaise( ...@@ -61,21 +63,44 @@ def flagSpikes_limitRaise(
def custom_rolling_mean(x): def custom_rolling_mean(x):
return np.mean(x[:-1]) return np.mean(x[:-1])
# get invalid-raise/drop mask:
raise_series = dataseries.rolling(raise_window, min_periods=2).apply(raise_check, args=(thresh,), raw=True, raise_series = dataseries.rolling(raise_window, min_periods=2).apply(raise_check, args=(thresh,), raw=True,
engine="numba") engine="numba")
weights = pd.Series(dataseries.index).diff(periods=2).shift(-1).dt.total_seconds() / intended_freq.total_seconds() / 2 if raise_series.isna().all():
weights.iloc[0] = 0.5 + (dataseries.index[1] - dataseries.index[0]).total_seconds() / (intended_freq.total_seconds() * 2) return data, flagger
weights.iloc[-1] = 0.5 + (dataseries.index[-1] - dataseries.index[-2]).total_seconds() / (intended_freq.total_seconds() * 2)
# "unflag" values of unsifficient deviation to there predecessors
if min_slope is not None:
w_mask = (pd.Series(dataseries.index).diff().dt.total_seconds() / intended_freq.total_seconds()) > \
min_slope_weight
slope_mask = np.abs(dataseries.diff()) < min_slope
to_unflag = raise_series.notna() & w_mask.values & slope_mask
raise_series[to_unflag] = np.nan
# calculate and apply the weighted mean weights (pseudo-harmonization):
weights = pd.Series(dataseries.index).diff(periods=2).shift(
-1).dt.total_seconds() / intended_freq.total_seconds() / 2
weights.iloc[0] = 0.5 + (dataseries.index[1] - dataseries.index[0]).total_seconds() / (
intended_freq.total_seconds() * 2)
weights.iloc[-1] = 0.5 + (dataseries.index[-1] - dataseries.index[-2]).total_seconds() / (
intended_freq.total_seconds() * 2)
weights[weights > 1.5] = 1.5 weights[weights > 1.5] = 1.5
weighted_data = dataseries.mul(weights.values) weighted_data = dataseries.mul(weights.values)
# rolling weighted mean calculation
weighted_rolling_mean = weighted_data.rolling(average_window, min_periods=2, closed='both').apply( weighted_rolling_mean = weighted_data.rolling(average_window, min_periods=2, closed='both').apply(
custom_rolling_mean, raw=True, custom_rolling_mean, raw=True,
engine="numba") engine="numba")
# check means against critical raise value:
to_flag = comp(dataseries, weighted_rolling_mean + (raise_series / mean_raise_factor)) to_flag = comp(dataseries, weighted_rolling_mean + (raise_series / mean_raise_factor))
to_flag &= raise_series.notna() to_flag &= raise_series.notna()
flagger = flagger.setFlags(field, to_flag.index, **kwargs) flagger = flagger.setFlags(field, to_flag.index, **kwargs)
return data, flagger return data, flagger
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment