Newer
Older
import pandas as pd
import dios
from saqc.funcs.drift import flagDriftFromNorm, flagDriftFromReference, flagDriftFromScaledNorm
from saqc.funcs.outliers import flagCrossStatistic, flagRange
from saqc.funcs.flagtools import flagManual, forceFlags, clearFlags
from saqc.funcs.tools import drop, copy, mask
from saqc.funcs.resampling import reindexFlags
from saqc.funcs.breaks import flagIsolated
from test.common import initData, TESTFLAGGER
return initData(cols=1, start_date="2016-01-01", end_date="2018-12-31", freq="1D")
@pytest.fixture
def field(data):
return data.columns[0]
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagRange(data, field, flagger):
flagger = flagger.initFlags(data)
data, flagger = flagRange(data, field, flagger, min=min, max=max)
flagged = flagger.isFlagged(field)
expected = (data[field] < min) | (data[field] > max)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagSesonalRange(data, field, flagger):
# prepare
({"min": 1, "max": 100, "startmonth": 7, "startday": 1, "endmonth": 8, "endday": 31,}, 31 * 2 * nyears // 2,),
({"min": 1, "max": 100, "startmonth": 12, "startday": 16, "endmonth": 1, "endday": 15,}, 31 * nyears // 2 + 1,),
]
for test, expected in tests:
flagger = flagger.initFlags(data)
newfield = f"{field}_masked"
start = f"{test['startmonth']:02}-{test['startday']:02}T00:00:00"
end = f"{test['endmonth']:02}-{test['endday']:02}T00:00:00"
data, flagger = copy(data, field, flagger, field + "_masked")
data, flagger = mask(data, newfield, flagger, mode='periodic', period_start=start, period_end=end,
include_bounds=True)
data, flagger = flagRange(data, newfield, flagger, min=test['min'], max=test['max'])
data, flagger = reindexFlags(data, field, flagger, method='match', source=newfield)
data, flagger = drop(data, newfield, flagger)
flagged = flagger.isFlagged(field)
assert flagged.sum() == expected
@pytest.mark.parametrize("flagger", TESTFLAGGER)
flagger = flagger.initFlags(data)
flags_orig = flagger.getFlags()
flags_set = flagger.setFlags(field, flag=flagger.BAD).getFlags()
_, flagger = clearFlags(data, field, flagger)
flags_cleared = flagger.getFlags()
assert (flags_orig != flags_set).all(None)
assert (flags_orig == flags_cleared).all(None)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
flagger = flagger.initFlags(data)
flags_orig = flagger.setFlags(field).getFlags(field)
_, flagger = forceFlags(data, field, flagger, flag=flagger.GOOD)
flags_forced = flagger.getFlags(field)
assert np.all(flags_orig != flags_forced)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagIsolated(data, flagger):
field = data.columns[0]
data.iloc[1:3, 0] = np.nan
data.iloc[4:5, 0] = np.nan
data.iloc[11:13, 0] = np.nan
data.iloc[15:17, 0] = np.nan
s = data[field].iloc[5:6]
flagger = flagger.setFlags(field, loc=s)
_, flagger_result = flagIsolated(data, field, flagger, group_window="1D", gap_window="2.1D")
assert flagger_result.isFlagged(field)[slice(3, 6, 2)].all()
data, flagger_result = flagIsolated(
data, field, flagger_result, group_window="2D", gap_window="2.1D", continuation_range="1.1D",
assert flagger_result.isFlagged(field)[[3, 5, 13, 14]].all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_2")])
def test_flagCrossScoring(dat, flagger):
data1, characteristics = dat(initial_level=0, final_level=0, out_val=0)
data2, characteristics = dat(initial_level=0, final_level=0, out_val=10)
field = "dummy"
fields = ["data1", "data2"]
s1, s2 = data1.squeeze(), data2.squeeze()
s1 = pd.Series(data=s1.values, index=s1.index)
s2 = pd.Series(data=s2.values, index=s1.index)
data = dios.DictOfSeries([s1, s2], columns=["data1", "data2"])
flagger = flagger.initFlags(data)
_, flagger_result = flagCrossStatistic(data, field, flagger, fields=fields, thresh=3, cross_stat=np.mean)
for field in fields:
isflagged = flagger_result.isFlagged(field)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagManual(data, flagger):
field = data.columns[0]
flagger = flagger.initFlags(data)
args = data, field, flagger
dat = data[field]
index_exp = mdata.iloc[[10, 33, 200, 500]].index
mdata.iloc[[101, 133, 220, 506]] = "b"
mdata.loc[index_exp] = "a"
shrinked = mdata.loc[index_exp.union(mdata.iloc[[1, 2, 3, 4, 600, 601]].index)]
kwargs_list = [
dict(mdata=mdata, mflag="a", method="plain"),
dict(mdata=mdata.to_list(), mflag="a", method="plain"),
dict(mdata=mdata, mflag="a", method="ontime"),
dict(mdata=shrinked, mflag="a", method="ontime"),
]
for kw in kwargs_list:
_, fl = flagManual(*args, **kw)
isflagged = fl.isFlagged(field)
assert isflagged[isflagged].index.equals(index_exp)
# flag not exist in mdata
_, fl = flagManual(*args, mdata=mdata, mflag="i do not exist", method="ontime")
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
isflagged = fl.isFlagged(field)
assert isflagged[isflagged].index.equals(pd.DatetimeIndex([]))
# check right-open / ffill
index = pd.date_range(start="2016-01-01", end="2018-12-31", periods=11)
mdata = pd.Series(0, index=index)
mdata.loc[index[[1, 5, 6, 7, 9, 10]]] = 1
# >>> mdata
# 2016-01-01 00:00:00 0
# 2016-04-19 12:00:00 1
# 2016-08-07 00:00:00 0
# 2016-11-24 12:00:00 0
# 2017-03-14 00:00:00 0
# 2017-07-01 12:00:00 1
# 2017-10-19 00:00:00 1
# 2018-02-05 12:00:00 1
# 2018-05-26 00:00:00 0
# 2018-09-12 12:00:00 1
# 2018-12-31 00:00:00 1
# dtype: int64
# add first and last index from data
expected = mdata.copy()
expected.loc[dat.index[0]] = 0
expected.loc[dat.index[-1]] = 1
expected = expected.astype(bool)
_, fl = flagManual(*args, mdata=mdata, mflag=1, method="right-open")
isflagged = fl.isFlagged(field)
last = expected.index[0]
for curr in expected.index[1:]:
expected_value = mdata[last]
# datetime slicing is inclusive !
i = isflagged[last:curr].index[:-1]
chunk = isflagged.loc[i]
assert (chunk == expected_value).all()
last = curr
# check last value
assert isflagged[curr] == expected[curr]
# check left-open / bfill
expected.loc[dat.index[-1]] = 0 # this time the last is False
_, fl = flagManual(*args, mdata=mdata, mflag=1, method="left-open")
isflagged = fl.isFlagged(field)
last = expected.index[0]
assert isflagged[last] == expected[last]
for curr in expected.index[1:]:
expected_value = mdata[curr]
# datetime slicing is inclusive !
i = isflagged[last:curr].index[1:]
chunk = isflagged.loc[i]
assert (chunk == expected_value).all()
last = curr
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_1")])
def test_flagDriftFromNormal(dat, flagger):
data = dat(periods=200, peak_level=5, name='d1')[0]
data['d2'] = dat(periods=200, peak_level=10, name='d2')[0]['d2']
data['d3'] = dat(periods=200, peak_level=100, name='d3')[0]['d3']
data['d4'] = 3 + 4 * data['d1']
data['d5'] = 3 + 4 * data['d1']
Peter Lünenschloß
committed
data_norm, flagger_norm = flagDriftFromNorm(data, 'dummy', flagger, ['d1', 'd2', 'd3'], segment_freq="200min",
Peter Lünenschloß
committed
data_ref, flagger_ref = flagDriftFromReference(data, 'd1', flagger, ['d1', 'd2', 'd3'], segment_freq="3D",
thresh=20)
data_scale, flagger_scale = flagDriftFromScaledNorm(data, 'dummy', flagger, ['d1', 'd3'], ['d4', 'd5'], segment_freq="3D",
thresh=20, norm_spread=5)
Peter Lünenschloß
committed
assert flagger_norm.isFlagged()['d3'].all()
assert flagger_ref.isFlagged()['d3'].all()
assert flagger_scale.isFlagged()['d3'].all()