-
Peter Lünenschloß authoredd873be8a
test_functions.py 9.12 KiB
#! /usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import pytest
import saqc
from saqc import BAD, DOUBTFUL, UNFLAGGED, SaQC
from saqc.core import DictOfSeries, initFlagsLike
from tests.common import initData
from tests.fixtures import char_dict, course_1 # noqa, todo: fix fixtures
@pytest.fixture
def data():
return initData(cols=1, start_date="2016-01-01", end_date="2018-12-31", freq="1D")
@pytest.fixture
def field(data):
return data.columns[0]
def test_statPass():
data = pd.Series(0, index=pd.date_range("2000", "2001", freq="1D"), name="data")
noise = [-1, 1] * 10
data[100:120] = noise
data[200:210] = noise[:10]
data = DictOfSeries(data=data)
flags = initFlagsLike(data)
qc = SaQC(data, flags).flagByStatLowPass(
"data", np.std, "20D", 0.999, "5D", 0.999, 0, flag=BAD
)
assert (qc.flags["data"].iloc[:100] == UNFLAGGED).all()
assert (qc.flags["data"].iloc[100:120] == BAD).all()
assert (qc.flags["data"].iloc[121:] == UNFLAGGED).all()
def test_flagRange(data, field):
min, max = 10, 90
flags = initFlagsLike(data)
qc = SaQC(data, flags)
qc = qc.flagRange(field, min=min, max=max, flag=BAD)
flagged = qc.flags[field] > UNFLAGGED
expected = (data[field] < min) | (data[field] > max)
assert all(flagged == expected)
def test_flagSesonalRange(data, field):
data[field].iloc[::2] = 0
data[field].iloc[1::2] = 50
nyears = len(data[field].index.year.unique())
tests = [
(
{
"min": 1,
"max": 100,
"startmonth": 7,
"startday": 1,
"endmonth": 8,
"endday": 31,
},
31 * 2 * nyears // 2,
),
(
{
"min": 1,
"max": 100,
"startmonth": 12,
"startday": 16,
"endmonth": 1,
"endday": 15,
},
31 * nyears // 2 + 1,
),
]
flags = initFlagsLike(data)
qc = SaQC(data, flags)
for test, expected in tests:
newfield = f"{field}_masked"
start = f"{test['startmonth']:02}-{test['startday']:02}T00:00:00"
end = f"{test['endmonth']:02}-{test['endday']:02}T00:00:00"
qc = qc.copyField(field, newfield)
qc = qc.selectTime(
newfield,
mode="periodic",
start=start,
end=end,
closed=True,
flag=BAD,
)
qc = qc.flagRange(newfield, min=test["min"], max=test["max"], flag=BAD)
qc = qc.concatFlags(
newfield, method="match", target=field, flag=BAD, overwrite=True
)
qc = qc.dropField(newfield)
flagged = qc._flags[field] > UNFLAGGED
assert flagged.sum() == expected
def test_clearFlags(data, field):
flags = initFlagsLike(data)
flags[:, field] = BAD
assert all(flags[field] == BAD)
qc = SaQC(data, flags)
qc = qc.clearFlags(field)
assert all(qc._flags[field] == UNFLAGGED)
def test_forceFlags(data, field):
flags = initFlagsLike(data)
flags[:, field] = BAD
assert all(flags[field] == BAD)
qc = SaQC(data, flags).forceFlags(field, flag=DOUBTFUL)
assert all(qc._flags[field] == DOUBTFUL)
def test_flagIsolated(data, field):
flags = initFlagsLike(data)
d_len = len(data[field].index)
data[field].iloc[1:3] = np.nan
data[field].iloc[4:5] = np.nan
flags[data[field].index[5:6], field] = BAD
data[field].iloc[11:13] = np.nan
data[field].iloc[15:17] = np.nan
# data flags
# 2016-01-01 0.0 -inf
# 2016-01-02 NaN -inf
# 2016-01-03 NaN -inf
# 2016-01-04 3.0 -inf
# 2016-01-05 NaN -inf
# 2016-01-06 5.0 255.0
# 2016-01-07 6.0 -inf
# 2016-01-08 7.0 -inf
# .. .. ..
qc = SaQC(data, flags).flagIsolated(
field, group_window="1D", gap_window="2.1D", flag=BAD
)
assert (qc._flags[field].iloc[[3, 5]] == BAD).all()
neg_list = [k for k in range(d_len) if k not in [3, 5]]
assert (qc._flags[field].iloc[neg_list] == UNFLAGGED).all()
qc = qc.flagIsolated(
field,
group_window="2D",
gap_window="2.1D",
flag=BAD,
)
assert (qc._flags[field].iloc[[3, 5, 13, 14]] == BAD).all()
neg_list = [k for k in range(d_len) if k not in [3, 5, 13, 14]]
assert (qc._flags[field].iloc[neg_list] == UNFLAGGED).all()
def test_flagManual(data, field):
flags = initFlagsLike(data)
dat = data[field]
mdata = pd.Series("lala", index=dat.index)
index_exp = mdata.iloc[[10, 33, 200, 500]].index
mdata.iloc[[101, 133, 220, 506]] = "b"
mdata.loc[index_exp] = "a"
shrinked = mdata.loc[index_exp.union(mdata.iloc[[1, 2, 3, 4, 600, 601]].index)]
kwargs_list = [
dict(mdata=mdata, mflag="a", method="plain", mformat="mflag", flag=BAD),
dict(mdata=mdata, mflag="a", method="ontime", mformat="mflag", flag=BAD),
dict(mdata=shrinked, mflag="a", method="ontime", mformat="mflag", flag=BAD),
]
for kw in kwargs_list:
qc = SaQC(data, flags).flagManual(field, **kw)
isflagged = qc._flags[field] > UNFLAGGED
assert isflagged[isflagged].index.equals(index_exp)
# flag not exist in mdata
qc = SaQC(data, flags).flagManual(
field,
mdata=mdata,
mflag="i do not exist",
method="ontime",
mformat="mflag",
flag=BAD,
)
isflagged = qc._flags[field] > UNFLAGGED
assert isflagged[isflagged].index.equals(pd.DatetimeIndex([]))
# check closure methods
index = pd.date_range(start="2016-01-01", end="2018-12-31", periods=11)
mdata = pd.Series(0, index=index)
mdata.loc[index[[1, 5, 6, 7, 9, 10]]] = 1
# >>> mdata
# 2016-01-01 00:00:00 0
# 2016-04-19 12:00:00 1
# 2016-08-07 00:00:00 0
# 2016-11-24 12:00:00 0
# 2017-03-14 00:00:00 0
# 2017-07-01 12:00:00 1
# 2017-10-19 00:00:00 1
# 2018-02-05 12:00:00 1
# 2018-05-26 00:00:00 0
# 2018-09-12 12:00:00 1
# 2018-12-31 00:00:00 1
# dtype: int64
m_index = mdata.index
flag_intervals = [
(m_index[1], m_index[2]),
(m_index[5], m_index[8]),
(m_index[9], dat.index.shift(freq="1h")[-1]),
]
bound_drops = {"right-open": [1], "left-open": [0], "closed": []}
for method in ["right-open", "left-open", "closed"]:
qc = qc.flagManual(
field,
mdata=mdata,
mflag=1,
method=method,
mformat="mflag",
flag=BAD,
)
isflagged = qc._flags[field] > UNFLAGGED
for flag_i in flag_intervals:
f_i = isflagged[slice(flag_i[0], flag_i[-1])].index
check_i = f_i.drop(
[flag_i[k] for k in bound_drops[method]], errors="ignore"
)
assert isflagged[check_i].all()
unflagged = isflagged[f_i.difference(check_i)]
if not unflagged.empty:
assert ~unflagged.all()
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_1")])
def test_flagDriftFromNorm(dat):
data = dat(periods=200, peak_level=5, name="field1")[0]
data["field2"] = dat(periods=200, peak_level=10, name="field2")[0]["field2"]
data["field3"] = dat(periods=200, peak_level=100, name="field3")[0]["field3"]
fields = ["field1", "field2", "field3"]
flags = initFlagsLike(data)
qc = SaQC(data, flags).flagDriftFromNorm(
field=fields,
window="200min",
spread=5,
flag=BAD,
)
assert all(qc._flags["field3"] > UNFLAGGED)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_1")])
def test_flagDriftFromReference(dat):
data = dat(periods=200, peak_level=5, name="field1")[0]
data["field2"] = dat(periods=200, peak_level=10, name="field2")[0]["field2"]
data["field3"] = dat(periods=200, peak_level=100, name="field3")[0]["field3"]
fields = ["field1", "field2", "field3"]
flags = initFlagsLike(data)
qc = SaQC(data, flags).flagDriftFromReference(
field=fields,
reference="field1",
freq="3D",
thresh=20,
flag=BAD,
)
assert all(qc._flags["field3"] > UNFLAGGED)
def test_transferFlags():
data = pd.DataFrame({"a": [1, 2], "b": [1, 2], "c": [1, 2]})
qc = saqc.SaQC(data)
qc = qc.flagRange("a", max=1.5)
with pytest.deprecated_call():
qc = qc.transferFlags(["a", "a"], ["b", "c"]) # noqa
assert np.all(qc.flags["b"].values == np.array([UNFLAGGED, BAD]))
assert np.all(qc.flags["c"].values == np.array([UNFLAGGED, BAD]))
def test_flagJumps():
data = pd.DataFrame(
{"a": [1, 1, 1, 1, 1, 6, 6, 6, 6, 6]},
index=pd.date_range(start="2020-01-01", periods=10, freq="D"),
)
qc = SaQC(data=data)
qc = qc.flagJumps(field="a", thresh=1, window="2D")
assert qc.flags["a"][5] == BAD
assert np.all(qc.flags["a"].values[:5] == UNFLAGGED) & np.all(
qc.flags["a"].values[6:] == UNFLAGGED
)