Newer
Older
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
import numpy as np
import pandas as pd
import dios
from saqc.funcs.functions import *
from test.common import initData, TESTFLAGGER
return initData(cols=1, start_date="2016-01-01", end_date="2018-12-31", freq="1D")
@pytest.fixture
def field(data):
return data.columns[0]
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagRange(data, field, flagger):
flagger = flagger.initFlags(data)
data, flagger = flagRange(data, field, flagger, min=min, max=max)
flagged = flagger.isFlagged(field)
expected = (data[field] < min) | (data[field] > max)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("method", ['wavelet', 'dtw'])
@pytest.mark.parametrize("pattern", [pytest.lazy_fixture("course_pattern_1"),
pytest.lazy_fixture("course_pattern_2"),] ,)
def test_flagPattern(course_test, flagger, method, pattern):
pattern_data, dict_pattern = pattern()
# testing the same pattern sampled at different frequencies
if pattern_data.columns == "pattern1":
test_data, *_ = course_test(freq="10 min")
test_data['pattern_data'] = pattern_data.to_df()
flagger = flagger.initFlags(test_data)
data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="1 H", method=method)
assert flagger.isFlagged("data")[dict_pattern["pattern_1"]].all()
if pattern_data.columns == "pattern2":
test_data, *_ = course_test(freq="1 H")
test_data['pattern_data'] = pattern_data.to_df()
flagger = flagger.initFlags(test_data)
data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="days", method=method)
assert flagger.isFlagged("data")[dict_pattern["pattern_2"]].all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagSesonalRange(data, field, flagger):
# prepare
({"min": 1, "max": 100, "startmonth": 7, "startday": 1, "endmonth": 8, "endday": 31,}, 31 * 2 * nyears // 2,),
({"min": 1, "max": 100, "startmonth": 12, "startday": 16, "endmonth": 1, "endday": 15,}, 31 * nyears // 2 + 1,),
]
for test, expected in tests:
flagger = flagger.initFlags(data)
data, flagger = flagSesonalRange(data, field, flagger, **test)
flagged = flagger.isFlagged(field)
assert flagged.sum() == expected
@pytest.mark.parametrize("flagger", TESTFLAGGER)
flagger = flagger.initFlags(data)
flags_orig = flagger.getFlags()
flags_set = flagger.setFlags(field, flag=flagger.BAD).getFlags()
_, flagger = clearFlags(data, field, flagger)
flags_cleared = flagger.getFlags()
assert (flags_orig != flags_set).all(None)
assert (flags_orig == flags_cleared).all(None)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
flagger = flagger.initFlags(data)
flags_orig = flagger.setFlags(field).getFlags(field)
_, flagger = forceFlags(data, field, flagger, flag=flagger.GOOD)
flags_forced = flagger.getFlags(field)
assert np.all(flags_orig != flags_forced)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagIsolated(data, flagger):
field = data.columns[0]
data.iloc[1:3, 0] = np.nan
data.iloc[4:5, 0] = np.nan
data.iloc[11:13, 0] = np.nan
data.iloc[15:17, 0] = np.nan
s = data[field].iloc[5:6]
flagger = flagger.setFlags(field, loc=s)
_, flagger_result = flagIsolated(data, field, flagger, group_window="1D", gap_window="2.1D")
assert flagger_result.isFlagged(field)[slice(3, 6, 2)].all()
data, flagger_result = flagIsolated(
data, field, flagger_result, group_window="2D", gap_window="2.1D", continuation_range="1.1D",
assert flagger_result.isFlagged(field)[[3, 5, 13, 14]].all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_2")])
def test_flagCrossScoring(dat, flagger):
data1, characteristics = dat(initial_level=0, final_level=0, out_val=0)
data2, characteristics = dat(initial_level=0, final_level=0, out_val=10)
field = "dummy"
fields = ["data1", "data2"]
s1, s2 = data1.squeeze(), data2.squeeze()
s1 = pd.Series(data=s1.values, index=s1.index)
s2 = pd.Series(data=s2.values, index=s1.index)
data = dios.DictOfSeries([s1, s2], columns=["data1", "data2"])
flagger = flagger.initFlags(data)
_, flagger_result = flagCrossScoring(data, field, flagger, fields=fields, thresh=3, cross_stat=np.mean)
for field in fields:
isflagged = flagger_result.isFlagged(field)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagManual(data, flagger):
field = data.columns[0]
flagger = flagger.initFlags(data)
args = data, field, flagger
dat = data[field]
index_exp = mdata.iloc[[10, 33, 200, 500]].index
mdata.iloc[[101, 133, 220, 506]] = "b"
mdata.loc[index_exp] = "a"
shrinked = mdata.loc[index_exp.union(mdata.iloc[[1, 2, 3, 4, 600, 601]].index)]
kwargs_list = [
dict(mdata=mdata, mflag="a", method="plain"),
dict(mdata=mdata.to_list(), mflag="a", method="plain"),
dict(mdata=mdata, mflag="a", method="ontime"),
dict(mdata=shrinked, mflag="a", method="ontime"),
]
for kw in kwargs_list:
_, fl = flagManual(*args, **kw)
isflagged = fl.isFlagged(field)
assert isflagged[isflagged].index.equals(index_exp)
# flag not exist in mdata
_, fl = flagManual(*args, mdata=mdata, mflag="i do not exist", method="ontime")
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
isflagged = fl.isFlagged(field)
assert isflagged[isflagged].index.equals(pd.DatetimeIndex([]))
# check right-open / ffill
index = pd.date_range(start="2016-01-01", end="2018-12-31", periods=11)
mdata = pd.Series(0, index=index)
mdata.loc[index[[1, 5, 6, 7, 9, 10]]] = 1
# >>> mdata
# 2016-01-01 00:00:00 0
# 2016-04-19 12:00:00 1
# 2016-08-07 00:00:00 0
# 2016-11-24 12:00:00 0
# 2017-03-14 00:00:00 0
# 2017-07-01 12:00:00 1
# 2017-10-19 00:00:00 1
# 2018-02-05 12:00:00 1
# 2018-05-26 00:00:00 0
# 2018-09-12 12:00:00 1
# 2018-12-31 00:00:00 1
# dtype: int64
# add first and last index from data
expected = mdata.copy()
expected.loc[dat.index[0]] = 0
expected.loc[dat.index[-1]] = 1
expected = expected.astype(bool)
_, fl = flagManual(*args, mdata=mdata, mflag=1, method="right-open")
isflagged = fl.isFlagged(field)
last = expected.index[0]
for curr in expected.index[1:]:
expected_value = mdata[last]
# datetime slicing is inclusive !
i = isflagged[last:curr].index[:-1]
chunk = isflagged.loc[i]
assert (chunk == expected_value).all()
last = curr
# check last value
assert isflagged[curr] == expected[curr]
# check left-open / bfill
expected.loc[dat.index[-1]] = 0 # this time the last is False
_, fl = flagManual(*args, mdata=mdata, mflag=1, method="left-open")
isflagged = fl.isFlagged(field)
last = expected.index[0]
assert isflagged[last] == expected[last]
for curr in expected.index[1:]:
expected_value = mdata[curr]
# datetime slicing is inclusive !
i = isflagged[last:curr].index[1:]
chunk = isflagged.loc[i]
assert (chunk == expected_value).all()
last = curr