Newer
Older
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
import numpy as np
import pandas as pd
import dios
from saqc.funcs.functions import *
from test.common import initData, TESTFLAGGER
return initData(cols=1, start_date="2016-01-01", end_date="2018-12-31", freq="1D")
@pytest.fixture
def field(data):
return data.columns[0]
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagRange(data, field, flagger):
flagger = flagger.initFlags(data)
data, flagger = flagRange(data, field, flagger, min=min, max=max)
flagged = flagger.isFlagged(field)
expected = (data[field] < min) | (data[field] > max)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("method", ['wavelet', 'dtw'])
@pytest.mark.parametrize("pattern", [pytest.lazy_fixture("course_pattern_1"),
pytest.lazy_fixture("course_pattern_2"),] ,)
def test_flagPattern(course_test, flagger, method, pattern):
pattern_data, dict_pattern = pattern()
# testing the same pattern sampled at different frequencies
if pattern_data.columns == "pattern1":
test_data, *_ = course_test(freq="10 min")
test_data['pattern_data'] = pattern_data.to_df()
flagger = flagger.initFlags(test_data)
data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="1 H", method=method)
assert flagger.isFlagged("data")[dict_pattern["pattern_1"]].all()
if pattern_data.columns == "pattern2":
test_data, *_ = course_test(freq="1 H")
test_data['pattern_data'] = pattern_data.to_df()
flagger = flagger.initFlags(test_data)
data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="days", method=method)
assert flagger.isFlagged("data")[dict_pattern["pattern_2"]].all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagSesonalRange(data, field, flagger):
# prepare
({"min": 1, "max": 100, "startmonth": 7, "startday": 1, "endmonth": 8, "endday": 31,}, 31 * 2 * nyears // 2,),
({"min": 1, "max": 100, "startmonth": 12, "startday": 16, "endmonth": 1, "endday": 15,}, 31 * nyears // 2 + 1,),
]
for test, expected in tests:
flagger = flagger.initFlags(data)
data, flagger = flagSesonalRange(data, field, flagger, **test)
flagged = flagger.isFlagged(field)
assert flagged.sum() == expected
@pytest.mark.parametrize("flagger", TESTFLAGGER)
flagger = flagger.initFlags(data)
flags_orig = flagger.getFlags()
flags_set = flagger.setFlags(field, flag=flagger.BAD).getFlags()
_, flagger = clearFlags(data, field, flagger)
flags_cleared = flagger.getFlags()
assert (flags_orig != flags_set).all(None)
assert (flags_orig == flags_cleared).all(None)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
flagger = flagger.initFlags(data)
flags_orig = flagger.setFlags(field).getFlags(field)
_, flagger = forceFlags(data, field, flagger, flag=flagger.GOOD)
flags_forced = flagger.getFlags(field)
assert np.all(flags_orig != flags_forced)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagIsolated(data, flagger):
field = data.columns[0]
data.iloc[1:3, 0] = np.nan
data.iloc[4:5, 0] = np.nan
data.iloc[11:13, 0] = np.nan
data.iloc[15:17, 0] = np.nan
s = data[field].iloc[5:6]
flagger = flagger.setFlags(field, loc=s)
_, flagger_result = flagIsolated(data, field, flagger, group_window="1D", gap_window="2.1D")
assert flagger_result.isFlagged(field)[slice(3, 6, 2)].all()
data, flagger_result = flagIsolated(
data, field, flagger_result, group_window="2D", gap_window="2.1D", continuation_range="1.1D",
assert flagger_result.isFlagged(field)[[3, 5, 13, 14]].all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_2")])
def test_flagCrossScoring(dat, flagger):
data1, characteristics = dat(initial_level=0, final_level=0, out_val=0)
data2, characteristics = dat(initial_level=0, final_level=0, out_val=10)
field = "dummy"
fields = ["data1", "data2"]
s1, s2 = data1.squeeze(), data2.squeeze()
s1 = pd.Series(data=s1.values, index=s1.index)
s2 = pd.Series(data=s2.values, index=s1.index)
data = dios.DictOfSeries([s1, s2], columns=["data1", "data2"])
flagger = flagger.initFlags(data)
_, flagger_result = flagCrossScoring(
data, field, flagger, fields=fields, thresh=3, cross_stat=np.mean
)
for field in fields:
isflagged = flagger_result.isFlagged(field)
assert isflagged[characteristics['raise']].all()
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagManual(data, flagger):
field = data.columns[0]
flagger = flagger.initFlags(data)
args = data, field, flagger
dat = data[field]
mdata = pd.Series('lala', index=dat.index)
index_exp = mdata.iloc[[10, 33, 200, 500]].index
mdata.iloc[[101, 133, 220, 506]] = 'b'
mdata.loc[index_exp] = 'a'
shrinked = mdata.loc[index_exp.union(mdata.iloc[[1, 2, 3, 4, 600, 601]].index)]
kwargs_list = [
dict(mdata=mdata, mflag='a', method='plain'),
dict(mdata=mdata.to_list(), mflag='a', method='plain'),
dict(mdata=mdata, mflag='a', method='ontime'),
dict(mdata=shrinked, mflag='a', method='ontime'),
]
for kw in kwargs_list:
_, fl = flagManual(*args, **kw)
isflagged = fl.isFlagged(field)
assert isflagged[isflagged].index.equals(index_exp)
# flag not exist in mdata
_, fl = flagManual(*args, mdata=mdata, mflag="i do not exist", method='ontime')
isflagged = fl.isFlagged(field)
assert isflagged[isflagged].index.equals(pd.DatetimeIndex([]))
# check right-open / ffill
index = pd.date_range(start="2016-01-01", end="2018-12-31", periods=11)
mdata = pd.Series(0, index=index)
mdata.loc[index[[1, 5, 6, 7, 9, 10]]] = 1
# >>> mdata
# 2016-01-01 00:00:00 0
# 2016-04-19 12:00:00 1
# 2016-08-07 00:00:00 0
# 2016-11-24 12:00:00 0
# 2017-03-14 00:00:00 0
# 2017-07-01 12:00:00 1
# 2017-10-19 00:00:00 1
# 2018-02-05 12:00:00 1
# 2018-05-26 00:00:00 0
# 2018-09-12 12:00:00 1
# 2018-12-31 00:00:00 1
# dtype: int64
# add first and last index from data
expected = mdata.copy()
expected.loc[dat.index[0]] = 0
expected.loc[dat.index[-1]] = 1
expected = expected.astype(bool)
_, fl = flagManual(*args, mdata=mdata, mflag=1, method='right-open')
isflagged = fl.isFlagged(field)
last = expected.index[0]
for curr in expected.index[1:]:
expected_value = mdata[last]
# datetime slicing is inclusive !
i = isflagged[last:curr].index[:-1]
chunk = isflagged.loc[i]
assert (chunk == expected_value).all()
last = curr
# check last value
assert isflagged[curr] == expected[curr]
# check left-open / bfill
expected.loc[dat.index[-1]] = 0 # this time the last is False
_, fl = flagManual(*args, mdata=mdata, mflag=1, method='left-open')
isflagged = fl.isFlagged(field)
last = expected.index[0]
assert isflagged[last] == expected[last]
for curr in expected.index[1:]:
expected_value = mdata[curr]
# datetime slicing is inclusive !
i = isflagged[last:curr].index[1:]
chunk = isflagged.loc[i]
assert (chunk == expected_value).all()
last = curr