Skip to content
Snippets Groups Projects
Commit ba1c656d authored by David Schäfer's avatar David Schäfer
Browse files

bringing flagNext and flagPeriod back

parent 84aaff39
No related branches found
No related tags found
No related merge requests found
......@@ -10,43 +10,6 @@ from ..lib.plotting import plot
from ..lib.tools import setup
def flagWindow(old, new, field, flagger, direction='fw', window=0, **kwargs) -> pd.Series:
if window == 0 or window == '':
return new
fw, bw = False, False
mask = flagger.getFlags(old[field]) != flagger.getFlags(new[field])
f = flagger.isFlagged(new[field]) & mask
if not mask.any():
# nothing was flagged, so nothing need to be flagged additional
return new
if isinstance(window, int):
x = f.rolling(window=window + 1).sum()
if direction in ['fw', 'both']:
fw = x.fillna(method='bfill').astype(bool)
if direction in ['bw', 'both']:
bw = x.shift(-window).fillna(method='bfill').astype(bool)
else:
# time-based windows
if direction in ['bw', 'both']:
raise NotImplementedError
fw = f.rolling(window=window, closed='both').sum().astype(bool)
fmask = bw | fw
return flagger.setFlags(new, field, fmask, **kwargs)
def flagPeriod(old, new, field, flagger, flag_period=0, **kwargs) -> pd.Series:
return flagWindow(old, new, field, flagger, direction='fw', window=flag_period, **kwargs)
def flagNext(old, new, field, flagger, flag_values=0, **kwargs) -> pd.Series:
return flagWindow(old, new, field, flagger, direction='fw', window=flag_values, **kwargs)
def assignTypeSafe(df, colname, rhs):
"""
Works around a pandas issue: when assigning a
......@@ -121,29 +84,13 @@ def runner(metafname, flagger, data, flags=None, nodata=np.nan):
fchunk = flags.loc[start_date:end_date]
dchunk, ffchunk = evalExpression(
dchunk, fchunk = evalExpression(
flag_test,
data=dchunk, flags=fchunk.copy(), field=varname,
flagger=flagger, nodata=nodata)
# # flag a timespan after the condition is met
# # should be moved into functions
# if Params.FLAGPERIOD in flag_params:
# periodflags = flagPeriod(fchunk, ffchunk, varname, flagger, func_name=func_name, **flag_params)
# ffchunk = assignTypeSafe(ffchunk, varname, periodflags)
# # flag a certain amount of values after condition is met
# if Params.FLAGVALUES in flag_params:
# valueflags = flagNext(fchunk, ffchunk, varname, flagger, func_name=func_name, **flag_params)
# ffchunk = assignTypeSafe(ffchunk, varname, valueflags)
# if flag_params.get(Params.PLOT, False):
# plotvars.append(varname)
# mask = flagger.getFlags(fchunk[varname]) != flagger.getFlags(ffchunk[varname])
# plot(dchunk, ffchunk, mask, varname, flagger, title=flag_test)
data.loc[start_date:end_date] = dchunk
flags.loc[start_date:end_date] = ffchunk.squeeze()
flags.loc[start_date:end_date] = fchunk.squeeze()
flagger.nextTest()
......
......@@ -184,7 +184,7 @@ def evalCode(code, data, flags, field, flagger, nodata):
return eval(code, global_env, local_env)
def evalExpression(expr, data, flags, field, flagger, nodata):
def evalExpression(expr, data, flags, field, flagger, nodata=np.nan):
tree = parseExpression(expr)
dsl_transformer = DslTransformer(initDslFuncMap(nodata), data.columns)
......
......@@ -3,7 +3,7 @@
import numpy as np
from ..lib.tools import sesonalMask
from ..lib.tools import sesonalMask, flagWindow
from .register import register
......@@ -34,9 +34,26 @@ def flagGeneric(data, flags, field, flagger, func, **kwargs):
flags = flagger.setFlags(flags, field, mask, **kwargs)
return data, flags
# @register("ttt")
# def flagTTT(data, flags, field, flagger, func, **kwargs):
# import ipdb; ipdb.set_trace()
@register("flagWindowAfterFlag")
def flagWindowAfterFlag(data, flags, field, flagger, window, func, **kwargs):
data, new_flags = func
repeated_flags = flagWindow(flags, new_flags,
field, flagger,
direction='fw', window=window,
**kwargs)
return data, repeated_flags
@register("flagNextAfterFlag")
def flagNextAfterFlag(data, flags, field, flagger, n, func, **kwargs):
data, new_flags = func
repeated_flags = flagWindow(flags, new_flags,
field, flagger,
direction='fw', window=n,
**kwargs)
return data, repeated_flags
@register("range")
def flagRange(data, flags, field, flagger, min, max, **kwargs):
......
......@@ -284,6 +284,35 @@ def checkQCParameters(para_dict, called_by):
return global_checker
def flagWindow(old, new, field, flagger, direction='fw', window=0, **kwargs) -> pd.Series:
if window == 0 or window == '':
return new
fw, bw = False, False
mask = flagger.getFlags(old[field]) != flagger.getFlags(new[field])
f = flagger.isFlagged(new[field]) & mask
if not mask.any():
# nothing was flagged, so nothing need to be flagged additional
return new
if isinstance(window, int):
x = f.rolling(window=window + 1).sum()
if direction in ['fw', 'both']:
fw = x.fillna(method='bfill').astype(bool)
if direction in ['bw', 'both']:
bw = x.shift(-window).fillna(method='bfill').astype(bool)
else:
# time-based windows
if direction in ['bw', 'both']:
raise NotImplementedError
fw = f.rolling(window=window, closed='both').sum().astype(bool)
fmask = bw | fw
return flagger.setFlags(new, field, fmask, **kwargs)
def sesonalMask(dtindex, month0=1, day0=1, month1=12, day1=None):
"""
This function provide a mask for a sesonal time range in the given dtindex.
......
......@@ -6,7 +6,7 @@ import pandas as pd
import numpy as np
from saqc.funcs import register, flagRange
from saqc.core.core import runner, flagNext, flagPeriod
from saqc.core.core import runner
from saqc.core.config import Fields as F
from saqc.lib.plotting import plot
from test.common import initData, initMetaDict, TESTFLAGGER
......@@ -156,65 +156,6 @@ def test_dtypes(data, flagger):
assert dict(flags.dtypes) == dict(pflags.dtypes)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagNext(flagger):
"""
Test if the flagNext functionality works as expected
NOTE:
needs to move out of this module
"""
data = initData()
flags = flagger.initFlags(data)
orig = flags.copy()
var1 = 'var1'
idx = [0, 1, 2]
dtidx = data.index[idx]
flags = flagger.setFlags(flags, var1, dtidx)
n = 4
fflags = flagNext(orig, flags, var1, flagger, flag_values=4)
flagged = flagger.isFlagged(fflags[var1])
ffindex = fflags[flagged].index
expected = data.index[min(idx):max(idx)+n+1]
assert (expected == ffindex).all()
o = flagger.getFlags(orig).loc[expected, var1]
f = flagger.getFlags(fflags).loc[flagged, var1]
assert (o != f).all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagPeriod(flagger):
"""
Test if the flagNext functionality works as expected
NOTE:
needs to move out of this module
"""
data = initData()
flags = flagger.initFlags(data)
orig = flags.copy()
var1 = 'var1'
idx = [0, 1, 2]
dtidx = data.index[idx]
flags = flagger.setFlags(flags, var1, dtidx)
period = '4h'
fflags = flagPeriod(orig, flags, var1, flagger, flag_period=period)
flagged = flagger.isFlagged(fflags[var1])
ffindex = fflags[flagged].index
m, M = data.index[min(idx)], data.index[max(idx)] + pd.to_timedelta(period)
expected = data.loc[m:M].index
assert (expected == ffindex).all()
o = flagger.getFlags(orig).loc[expected, var1]
f = flagger.getFlags(fflags).loc[flagged, var1]
assert (o != f).all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_plotting(flagger):
""" Test if the plotting code runs. does not show any plot.
......
......@@ -3,10 +3,12 @@
import pytest
import numpy as np
import pandas as pd
from saqc.core.evaluator import evalExpression
from saqc.funcs.functions import flagRange, flagSesonalRange, forceFlags, clearFlags
from test.common import initData, TESTFLAGGER
from test.common import initData, TESTFLAGGER, initMetaDict
@pytest.fixture
......@@ -22,6 +24,28 @@ def field(data):
return data.columns[0]
@pytest.mark.parametrize('flagger', TESTFLAGGER)
def test_flagAfter(data, field, flagger):
flags = flagger.initFlags(data)
min = data.iloc[int(len(data)*.3), 0]
max = data.iloc[int(len(data)*.6), 0]
_, range_flags = flagRange(data, flags, field, flagger, min, max)
tests = [
(f"flagWindowAfterFlag(window='3D', func=range(min={min}, max={max}))", "3D"),
(f"flagNextAfterFlag(n=4, func=range(min={min}, max={max}))", 4),
]
for expr, window in tests:
_, repeated_flags = evalExpression(expr, data, flags, field, flagger)
flagged = repeated_flags[flagger.isFlagged(flags)].dropna()
flag_groups = (flagged
.rolling(window=window)
.apply(lambda df: flagger.isFlagged(flags).all(), raw=False))
assert np.all(flag_groups)
@pytest.mark.parametrize('flagger', TESTFLAGGER)
def test_range(data, field, flagger):
min, max = 10, 90
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment