Skip to content
Snippets Groups Projects
test_functions.py 8.54 KiB
Newer Older
Bert Palm's avatar
Bert Palm committed
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import pytest
import numpy as np
Bert Palm's avatar
Bert Palm committed

from saqc.funcs.drift import flagDriftFromNorm, flagDriftFromReference, flagDriftFromScaledNorm
from saqc.funcs.outliers import flagCrossStatistic, flagRange
from saqc.funcs.flagtools import flagManual, forceFlags, clearFlags
from saqc.funcs.tools import drop, copy, mask
from saqc.funcs.resampling import reindexFlags
from saqc.funcs.breaks import flagIsolated
from test.common import initData, TESTFLAGGER
Bert Palm's avatar
Bert Palm committed

Bert Palm's avatar
Bert Palm committed

David Schäfer's avatar
David Schäfer committed
@pytest.fixture
def data():
    return initData(cols=1, start_date="2016-01-01", end_date="2018-12-31", freq="1D")
David Schäfer's avatar
David Schäfer committed


@pytest.fixture
def field(data):
    return data.columns[0]

David Schäfer's avatar
David Schäfer committed

@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagRange(data, field, flagger):
David Schäfer's avatar
David Schäfer committed
    min, max = 10, 90
    flagger = flagger.initFlags(data)
    data, flagger = flagRange(data, field, flagger, min=min, max=max)
    flagged = flagger.isFlagged(field)
    expected = (data[field] < min) | (data[field] > max)
Bert Palm's avatar
Bert Palm committed
    assert (flagged == expected).all()
Bert Palm's avatar
Bert Palm committed


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagSesonalRange(data, field, flagger):
    # prepare
Bert Palm's avatar
Bert Palm committed
    data.iloc[::2] = 0
    data.iloc[1::2] = 50
Bert Palm's avatar
Bert Palm committed
    nyears = len(data[field].index.year.unique())

    tests = [
David Schäfer's avatar
David Schäfer committed
        ({"min": 1, "max": 100, "startmonth": 7, "startday": 1, "endmonth": 8, "endday": 31,}, 31 * 2 * nyears // 2,),
        ({"min": 1, "max": 100, "startmonth": 12, "startday": 16, "endmonth": 1, "endday": 15,}, 31 * nyears // 2 + 1,),
    ]

    for test, expected in tests:
        flagger = flagger.initFlags(data)
        newfield = f"{field}_masked"
        start = f"{test['startmonth']:02}-{test['startday']:02}T00:00:00"
        end = f"{test['endmonth']:02}-{test['endday']:02}T00:00:00"

        data, flagger = copy(data, field, flagger, field + "_masked")
        data, flagger = mask(data, newfield, flagger, mode='periodic', period_start=start, period_end=end,
                             include_bounds=True)
        data, flagger = flagRange(data, newfield, flagger, min=test['min'], max=test['max'])
        data, flagger = reindexFlags(data, field, flagger, method='match', source=newfield)
        data, flagger = drop(data, newfield, flagger)
        flagged = flagger.isFlagged(field)
        assert flagged.sum() == expected
Bert Palm's avatar
Bert Palm committed


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_clearFlags(data, field, flagger):
    flagger = flagger.initFlags(data)
    flags_orig = flagger.getFlags()
    flags_set = flagger.setFlags(field, flag=flagger.BAD).getFlags()
    _, flagger = clearFlags(data, field, flagger)
    flags_cleared = flagger.getFlags()
Bert Palm's avatar
Bert Palm committed
    assert (flags_orig != flags_set).all(None)
    assert (flags_orig == flags_cleared).all(None)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
David Schäfer's avatar
David Schäfer committed
def test_forceFlags(data, flagger):
    flagger = flagger.initFlags(data)
David Schäfer's avatar
David Schäfer committed
    field, *_ = data.columns
    flags_orig = flagger.setFlags(field).getFlags(field)
    _, flagger = forceFlags(data, field, flagger, flag=flagger.GOOD)
    flags_forced = flagger.getFlags(field)
    assert np.all(flags_orig != flags_forced)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagIsolated(data, flagger):
    field = data.columns[0]
    data.iloc[1:3, 0] = np.nan
    data.iloc[4:5, 0] = np.nan
    data.iloc[11:13, 0] = np.nan
    data.iloc[15:17, 0] = np.nan
David Schäfer's avatar
David Schäfer committed
    flagger = flagger.initFlags(data)
Bert Palm's avatar
Bert Palm committed
    s = data[field].iloc[5:6]
    flagger = flagger.setFlags(field, loc=s)
    _, flagger_result = flagIsolated(data, field, flagger, group_window="1D", gap_window="2.1D")

    assert flagger_result.isFlagged(field)[slice(3, 6, 2)].all()
    data, flagger_result = flagIsolated(
David Schäfer's avatar
David Schäfer committed
        data, field, flagger_result, group_window="2D", gap_window="2.1D", continuation_range="1.1D",
    assert flagger_result.isFlagged(field)[[3, 5, 13, 14]].all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_2")])
def test_flagCrossScoring(dat, flagger):
    data1, characteristics = dat(initial_level=0, final_level=0, out_val=0)
    data2, characteristics = dat(initial_level=0, final_level=0, out_val=10)
    field = "dummy"
    fields = ["data1", "data2"]
    s1, s2 = data1.squeeze(), data2.squeeze()
    s1 = pd.Series(data=s1.values, index=s1.index)
    s2 = pd.Series(data=s2.values, index=s1.index)
    data = dios.DictOfSeries([s1, s2], columns=["data1", "data2"])
    flagger = flagger.initFlags(data)
    _, flagger_result = flagCrossStatistic(data, field, flagger, fields=fields, thresh=3, cross_stat=np.mean)
    for field in fields:
        isflagged = flagger_result.isFlagged(field)
David Schäfer's avatar
David Schäfer committed
        assert isflagged[characteristics["raise"]].all()


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagManual(data, flagger):
    field = data.columns[0]
    flagger = flagger.initFlags(data)
    args = data, field, flagger
    dat = data[field]

David Schäfer's avatar
David Schäfer committed
    mdata = pd.Series("lala", index=dat.index)
    index_exp = mdata.iloc[[10, 33, 200, 500]].index
David Schäfer's avatar
David Schäfer committed
    mdata.iloc[[101, 133, 220, 506]] = "b"
    mdata.loc[index_exp] = "a"
    shrinked = mdata.loc[index_exp.union(mdata.iloc[[1, 2, 3, 4, 600, 601]].index)]

    kwargs_list = [
David Schäfer's avatar
David Schäfer committed
        dict(mdata=mdata, mflag="a", method="plain"),
        dict(mdata=mdata.to_list(), mflag="a", method="plain"),
        dict(mdata=mdata, mflag="a", method="ontime"),
        dict(mdata=shrinked, mflag="a", method="ontime"),
    ]

    for kw in kwargs_list:
        _, fl = flagManual(*args, **kw)
        isflagged = fl.isFlagged(field)
        assert isflagged[isflagged].index.equals(index_exp)

    # flag not exist in mdata
David Schäfer's avatar
David Schäfer committed
    _, fl = flagManual(*args, mdata=mdata, mflag="i do not exist", method="ontime")
    isflagged = fl.isFlagged(field)
    assert isflagged[isflagged].index.equals(pd.DatetimeIndex([]))

    # check right-open / ffill
    index = pd.date_range(start="2016-01-01", end="2018-12-31", periods=11)
    mdata = pd.Series(0, index=index)
    mdata.loc[index[[1, 5, 6, 7, 9, 10]]] = 1
    # >>> mdata
    # 2016-01-01 00:00:00    0
    # 2016-04-19 12:00:00    1
    # 2016-08-07 00:00:00    0
    # 2016-11-24 12:00:00    0
    # 2017-03-14 00:00:00    0
    # 2017-07-01 12:00:00    1
    # 2017-10-19 00:00:00    1
    # 2018-02-05 12:00:00    1
    # 2018-05-26 00:00:00    0
    # 2018-09-12 12:00:00    1
    # 2018-12-31 00:00:00    1
    # dtype: int64

    # add first and last index from data
    expected = mdata.copy()
    expected.loc[dat.index[0]] = 0
    expected.loc[dat.index[-1]] = 1
    expected = expected.astype(bool)

David Schäfer's avatar
David Schäfer committed
    _, fl = flagManual(*args, mdata=mdata, mflag=1, method="right-open")
    isflagged = fl.isFlagged(field)
    last = expected.index[0]
    for curr in expected.index[1:]:
        expected_value = mdata[last]
        # datetime slicing is inclusive !
        i = isflagged[last:curr].index[:-1]
        chunk = isflagged.loc[i]
        assert (chunk == expected_value).all()
        last = curr
    # check last value
    assert isflagged[curr] == expected[curr]

    # check left-open / bfill
    expected.loc[dat.index[-1]] = 0  # this time the last is False
David Schäfer's avatar
David Schäfer committed
    _, fl = flagManual(*args, mdata=mdata, mflag=1, method="left-open")
    isflagged = fl.isFlagged(field)
    last = expected.index[0]
    assert isflagged[last] == expected[last]
    for curr in expected.index[1:]:
        expected_value = mdata[curr]
        # datetime slicing is inclusive !
        i = isflagged[last:curr].index[1:]
        chunk = isflagged.loc[i]
        assert (chunk == expected_value).all()
        last = curr
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_1")])
def test_flagDriftFromNormal(dat, flagger):
    data = dat(periods=200, peak_level=5, name='d1')[0]
    data['d2'] = dat(periods=200, peak_level=10, name='d2')[0]['d2']
    data['d3'] = dat(periods=200, peak_level=100, name='d3')[0]['d3']
    data['d4'] = 3 + 4 * data['d1']
    data['d5'] = 3 + 4 * data['d1']

    flagger = flagger.initFlags(data)
    data_norm, flagger_norm = flagDriftFromNorm(data, 'dummy', flagger, ['d1', 'd2', 'd3'], segment_freq="200min",
Peter Lünenschloß's avatar
Peter Lünenschloß committed
                                      norm_spread=5)

    data_ref, flagger_ref = flagDriftFromReference(data, 'd1', flagger, ['d1', 'd2', 'd3'], segment_freq="3D",
                                      thresh=20)
    data_scale, flagger_scale = flagDriftFromScaledNorm(data, 'dummy', flagger, ['d1', 'd3'], ['d4', 'd5'], segment_freq="3D",
                                                        thresh=20, norm_spread=5)
    assert flagger_norm.isFlagged()['d3'].all()
    assert flagger_ref.isFlagged()['d3'].all()
    assert flagger_scale.isFlagged()['d3'].all()