Skip to content
Snippets Groups Projects
test_spikes_detection.py 5.10 KiB
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import pytest
import numpy as np
import pandas as pd
import dios

from saqc.funcs.spikes_detection import (
    spikes_flagSpektrumBased,
    spikes_flagMad,
    spikes_flagSlidingZscore,
    spikes_flagBasic,
    spikes_flagRaise,
    spikes_flagMultivarScores,
    spikes_flagGrubbs
)

from test.common import TESTFLAGGER


@pytest.fixture(scope="module")
def spiky_data():
    index = pd.date_range(start="2011-01-01", end="2011-01-05", freq="5min")
    s = pd.Series(np.linspace(1, 2, index.size), index=index, name="spiky_data")
    s.iloc[100] = 100
    s.iloc[1000] = -100
    flag_assertion = [100, 1000]
    return dios.DictOfSeries(s), flag_assertion


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagSpikesSpektrumBased(spiky_data, flagger):
    data = spiky_data[0]
    field, *_ = data.columns
    flagger = flagger.initFlags(data)
    data, flagger_result = spikes_flagSpektrumBased(data, field, flagger)
    flag_result = flagger_result.getFlags(field)
    test_sum = (flag_result[spiky_data[1]] == flagger.BAD).sum()
    assert test_sum == len(spiky_data[1])


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagMad(spiky_data, flagger):
    data = spiky_data[0]
    field, *_ = data.columns
    flagger = flagger.initFlags(data)
    data, flagger_result = spikes_flagMad(data, field, flagger, "1H")
    flag_result = flagger_result.getFlags(field)
    test_sum = (flag_result[spiky_data[1]] == flagger.BAD).sum()
    assert test_sum == len(spiky_data[1])


@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("method", ["modZ", "zscore"])
def test_slidingOutlier(spiky_data, flagger, method):
    # test for numeric input
    data = spiky_data[0]
    field, *_ = data.columns
    flagger = flagger.initFlags(data)

    tests = [
        spikes_flagSlidingZscore(data, field, flagger, window=300, offset=50, method=method),
        spikes_flagSlidingZscore(data, field, flagger, window="1500min", offset="250min", method=method),
    ]

    for _, flagger_result in tests:
        flag_result = flagger_result.getFlags(field)
        test_sum = (flag_result.iloc[spiky_data[1]] == flagger.BAD).sum()
        assert int(test_sum) == len(spiky_data[1])


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagSpikesBasic(spiky_data, flagger):
    data = spiky_data[0]
    field, *_ = data.columns
    flagger = flagger.initFlags(data)
    data, flagger_result = spikes_flagBasic(data, field, flagger, thresh=60, tolerance=10, window_size="20min")
    flag_result = flagger_result.getFlags(field)
    test_sum = (flag_result[spiky_data[1]] == flagger.BAD).sum()
    assert test_sum == len(spiky_data[1])


# see test/functs/conftest.py for the 'course_N'
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_1"),
                                 pytest.lazy_fixture("course_2"),
                                 pytest.lazy_fixture("course_3"),
                                 pytest.lazy_fixture("course_4"), ], )
def test_flagSpikesLimitRaise(dat, flagger):
    data, characteristics = dat()
    field, *_ = data.columns
    flagger = flagger.initFlags(data)
    _, flagger_result = spikes_flagRaise(
        data, field, flagger, thresh=2, intended_freq="10min", raise_window="20min", numba_boost=False
    )
    assert flagger_result.isFlagged(field)[characteristics["raise"]].all()
    assert not flagger_result.isFlagged(field)[characteristics["return"]].any()
    assert not flagger_result.isFlagged(field)[characteristics["drop"]].any()


# see test/functs/conftest.py for the 'course_N'
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_3")])
def test_flagMultivarScores(dat, flagger):
    data1, characteristics = dat(periods=1000, initial_level=5, final_level=15, out_val=50)
    data2, characteristics = dat(periods=1000, initial_level=20, final_level=1, out_val=30)
    field = "dummy"
    fields = ["data1", "data2"]
    s1, s2 = data1.squeeze(), data2.squeeze()
    s1 = pd.Series(data=s1.values, index=s1.index)
    s2 = pd.Series(data=s2.values, index=s1.index)
    data = dios.DictOfSeries([s1, s2], columns=["data1", "data2"])
    flagger = flagger.initFlags(data)
    _, flagger_result = spikes_flagMultivarScores(
        data, field, flagger, fields=fields, binning=50, trafo=np.log,
        iter_start=0.95, n_neighbors=10
    )
    for field in fields:
        isflagged = flagger_result.isFlagged(field)
        assert isflagged[characteristics['raise']].all()
        assert not isflagged[characteristics['return']].any()
        assert not isflagged[characteristics['drop']].any()

@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_3")])
def test_grubbs(dat, flagger):
    data, char_dict = dat(freq='10min', periods=45, initial_level=0, final_level=0, crowd_size=1, crowd_spacing=3, out_val=-10)
    flagger = flagger.initFlags(data)
    data, result_flagger = spikes_flagGrubbs(data, 'data', flagger, winsz=20, min_periods=15)
    assert result_flagger.isFlagged('data')[char_dict["drop"]].all()