Skip to content
Snippets Groups Projects
test_harm_funcs.py 13.17 KiB
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import pytest

import numpy as np
import pandas as pd

from dios import dios
from test.common import TESTFLAGGER, initData

from saqc.funcs.harm_functions import (
    harm_harmonize,
    harm_deharmonize,
    _interpolateGrid,
    _insertGrid,
    _outsortCrap,
    harm_linear2Grid,
    harm_interpolate2Grid,
    harm_shift2Grid,
    harm_aggregate2Grid
)

RESHAPERS = ["nshift", "fshift", "bshift"]

COFLAGGING = [False, True]

SETSHIFTCOMMENT = [False, True]

INTERPOLATIONS = ["fshift", "bshift", "nshift", "nagg", "bagg"]

INTERPOLATIONS2 = ["fagg", "time", "polynomial"]

FREQS = ["15min", "30min"]


@pytest.fixture
def data():
    index = pd.date_range(start="1.1.2011 00:00:00", end="1.1.2011 01:00:00", freq="15min")
    index = index.insert(2, pd.Timestamp(2011, 1, 1, 0, 29, 0))
    index = index.insert(2, pd.Timestamp(2011, 1, 1, 0, 28, 0))
    index = index.insert(5, pd.Timestamp(2011, 1, 1, 0, 32, 0))
    index = index.insert(5, pd.Timestamp(2011, 1, 1, 0, 31, 0))
    index = index.insert(0, pd.Timestamp(2010, 12, 31, 23, 57, 0))
    index = index.drop(pd.Timestamp("2011-01-01 00:30:00"))
    dat = pd.Series(np.linspace(-50, 50, index.size), index=index, name="data")
    # good to have some nan
    dat[-3] = np.nan
    data = dios.DictOfSeries(dat)
    return data


@pytest.fixture
def multi_data():
    index = pd.date_range(start="1.1.2011 00:00:00", end="1.1.2011 01:00:00", freq="15min")
    index = index.insert(2, pd.Timestamp(2011, 1, 1, 0, 29, 0))
    index = index.insert(2, pd.Timestamp(2011, 1, 1, 0, 28, 0))
    index = index.insert(5, pd.Timestamp(2011, 1, 1, 0, 32, 0))
    index = index.insert(5, pd.Timestamp(2011, 1, 1, 0, 31, 0))
    index = index.insert(0, pd.Timestamp(2010, 12, 31, 23, 57, 0))
    index = index.drop(pd.Timestamp("2011-01-01 00:30:00"))
    dat = pd.Series(np.linspace(-50, 50, index.size), index=index, name="data")
    # good to have some nan
    dat[-3] = np.nan
    data = dat.to_frame()
    data.index = data.index.shift(1, "2min")
    dat2 = data.copy()
    dat2.index = dat2.index.shift(1, "17min")
    dat2.rename(columns={"data": "data2"}, inplace=True)
    dat3 = data.copy()
    dat3.index = dat3.index.shift(1, "1h")
    dat3.rename(columns={"data": "data3"}, inplace=True)
    dat3.drop(dat3.index[2:-2], inplace=True)
    # merge
    data = pd.merge(data, dat2, how="outer", left_index=True, right_index=True)
    data = pd.merge(data, dat3, how="outer", left_index=True, right_index=True)
    return dios.DictOfSeries(data)


@pytest.mark.parametrize("method", INTERPOLATIONS2)
def test_gridInterpolation(data, method):
    freq = "15min"
    data = data.squeeze()
    data = (data * np.sin(data)).append(data.shift(1, "2h")).shift(1, "3s")
    kwds = dict(agg_method="sum", downcast_interpolation=True)

    # we are just testing if the interpolation gets passed to the series without causing an error:
    _interpolateGrid(data, freq, method, order=1, **kwds)
    if method == "polynomial":
        _interpolateGrid(data, freq, method, order=2, **kwds)
        _interpolateGrid(data, freq, method, order=10, **kwds)
        data = _insertGrid(data, freq)


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_outsortCrap(data, flagger):
    field = data.columns[0]
    s = data[field]
    flagger = flagger.initFlags(data)

    drop_index = s.index[5:7]
    flagger = flagger.setFlags(field, loc=drop_index)
    res, *_ = _outsortCrap(s, field, flagger, drop_flags=flagger.BAD)
    assert drop_index.difference(res.index).equals(drop_index)

    flagger = flagger.setFlags(field, loc=s.iloc[0:1], flag=flagger.GOOD)
    drop_index = drop_index.insert(-1, s.index[0])
    to_drop = [flagger.BAD, flagger.GOOD]
    res, *_ = _outsortCrap(s, field, flagger, drop_flags=to_drop)
    assert drop_index.sort_values().difference(res.index).equals(drop_index.sort_values())


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_heapConsistency(data, flagger):

    freq = "15Min"

    # harmonize `other_data` and prefill the HEAP
    other_data = initData(3)
    other_flagger = flagger.initFlags(other_data)
    harm_harmonize(other_data, other_data.columns[0], other_flagger, freq, "time", "nshift")

    # harmonize and deharmonize `data`
    # -> we want both harmonizations (`data` and `other_data`) to not interfere
    flagger = flagger.initFlags(data)
    data_harm, flagger_harm = harm_harmonize(data, "data", flagger, freq, "time", "nshift")
    data_deharm, flagger_deharm = harm_deharmonize(data_harm, "data", flagger_harm)
    assert np.all(data.to_df().dropna() == data_deharm.to_df().dropna())


@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("reshaper", RESHAPERS)
@pytest.mark.parametrize("co_flagging", COFLAGGING)
def test_harmSingleVarIntermediateFlagging(data, flagger, reshaper, co_flagging):
    flagger = flagger.initFlags(data)
    # make pre harm copies:
    pre_data = data.copy()
    pre_flags = flagger.getFlags()
    freq = "15min"

    assert len(data.columns) == 1
    field = data.columns[0]

    # harmonize data:
    data, flagger = harm_harmonize(data, "data", flagger, freq, "time", reshaper)

    # flag something bad
    flagger = flagger.setFlags("data", loc=data[field].index[3:4])
    data, flagger = harm_deharmonize(data, "data", flagger, co_flagging=co_flagging)
    d = data[field]

    if reshaper == "nshift":
        if co_flagging is True:
            assert flagger.isFlagged(loc=d.index[3:7]).squeeze().all()
            assert (~flagger.isFlagged(loc=d.index[0:3]).squeeze()).all()
            assert (~flagger.isFlagged(loc=d.index[7:]).squeeze()).all()
        if co_flagging is False:
            assert (
                flagger.isFlagged().squeeze() == [False, False, False, False, True, False, False, False, False]
            ).all()
    if reshaper == "bshift":
        if co_flagging is True:
            assert flagger.isFlagged(loc=d.index[5:7]).squeeze().all()
            assert (~flagger.isFlagged(loc=d.index[0:5]).squeeze()).all()
            assert (~flagger.isFlagged(loc=d.index[7:]).squeeze()).all()
        if co_flagging is False:
            assert (
                flagger.isFlagged().squeeze() == [False, False, False, False, False, True, False, False, False]
            ).all()
    if reshaper == "fshift":
        if co_flagging is True:
            assert flagger.isFlagged(loc=d.index[3:5]).squeeze().all()
            assert (~flagger.isFlagged(loc=d.index[0:3]).squeeze()).all()
            assert (~flagger.isFlagged(loc=d.index[5:]).squeeze()).all()
        if co_flagging is False:
            assert (
                flagger.isFlagged().squeeze() == [False, False, False, False, True, False, False, False, False]
            ).all()

    flags = flagger.getFlags()
    assert pre_data[field].equals(data[field])
    assert len(data[field]) == len(flags[field])
    assert (pre_flags[field].index == flags[field].index).all()


@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("interpolation", INTERPOLATIONS)
@pytest.mark.parametrize("freq", FREQS)
def test_harmSingleVarInterpolations(data, flagger, interpolation, freq):
    flagger = flagger.initFlags(data)
    flags = flagger.getFlags()
    # make pre harm copies:
    pre_data = data.copy()
    pre_flags = flags.copy()

    assert len(data.columns) == 1
    field = data.columns[0]

    harm_start = data[field].index[0].floor(freq=freq)
    harm_end = data[field].index[-1].ceil(freq=freq)
    test_index = pd.date_range(start=harm_start, end=harm_end, freq=freq)
    data, flagger = harm_harmonize(
        data, "data", flagger, freq, interpolation, "fshift", reshape_shift_comment=False, inter_agg="sum",
    )
    if interpolation == "fshift":
        if freq == "15min":
            exp = pd.Series([np.nan, -37.5, -25.0, 0.0, 37.5, 50.0], index=test_index)
            assert data[field].equals(exp)
        if freq == "30min":
            exp = pd.Series([np.nan, -37.5, 0.0, 50.0], index=test_index)
            assert data[field].equals(exp)
    if interpolation == "bshift":
        if freq == "15min":
            exp = pd.Series([-50.0, -37.5, -25.0, 12.5, 37.5, 50.0], index=test_index)
            assert data[field].equals(exp)
        if freq == "30min":
            exp = pd.Series([-50.0, -37.5, 12.5, 50.0], index=test_index)
            assert data[field].equals(exp)
    if interpolation == "nshift":
        if freq == "15min":
            exp = pd.Series([np.nan, -37.5, -25.0, 12.5, 37.5, 50.0], index=test_index)
            assert data[field].equals(exp)
        if freq == "30min":
            exp = pd.Series([np.nan, -37.5, 12.5, 50.0], index=test_index)
            assert data[field].equals(exp)
    if interpolation == "nagg":
        if freq == "15min":
            exp = pd.Series([-87.5, -25.0, 0.0, 37.5, 50.0], index=test_index[1:])
            assert data[field].equals(exp)
        if freq == "30min":
            exp = pd.Series([-87.5, -25.0, 87.5], index=test_index[1:])
            assert data[field].equals(exp)
    if interpolation == "bagg":
        if freq == "15min":
            exp = pd.Series([-50.0, -37.5, -37.5, 12.5, 37.5, 50.0], index=test_index)
            assert data[field].equals(exp)
        if freq == "30min":
            exp = pd.Series([-50.0, -75.0, 50.0, 50.0], index=test_index)
            assert data[field].equals(exp)

    data, flagger = harm_deharmonize(data, "data", flagger, co_flagging=True)

    # data, flagger = harm_deharmonize(data, "data", flagger, co_flagging=True)
    flags = flagger.getFlags()

    assert pre_data[field].equals(data[field])
    assert len(data[field]) == len(flags[field])
    assert (pre_flags[field].index == flags[field].index).all()


@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("shift_comment", SETSHIFTCOMMENT)
def test_multivariatHarmonization(multi_data, flagger, shift_comment):
    flagger = flagger.initFlags(multi_data)
    flags = flagger.getFlags()
    # for comparison
    pre_data = multi_data.copy()
    pre_flags = flags.copy()
    freq = "15min"

    # harm:
    multi_data, flagger = harm_harmonize(
        multi_data, "data", flagger, freq, "time", "nshift", reshape_shift_comment=shift_comment,
    )

    multi_data, flagger = harm_harmonize(
        multi_data,
        "data2",
        flagger,
        freq,
        "bagg",
        "bshift",
        inter_agg="sum",
        reshape_agg="max",
        reshape_shift_comment=shift_comment,
    )

    multi_data, flagger = harm_harmonize(
        multi_data, "data3", flagger, freq, "fshift", "fshift", reshape_shift_comment=shift_comment,
    )

    for c in multi_data.columns:
        harm_start = multi_data[c].index[0].floor(freq=freq)
        harm_end = multi_data[c].index[-1].ceil(freq=freq)
        assert pd.Timedelta(pd.infer_freq(multi_data[c].index)) == pd.Timedelta(freq)

    multi_data, flagger = harm_deharmonize(multi_data, "data3", flagger, co_flagging=False)
    multi_data, flagger = harm_deharmonize(multi_data, "data2", flagger, co_flagging=True)
    multi_data, flagger = harm_deharmonize(multi_data, "data", flagger, co_flagging=True)

    for c in multi_data.columns:
        flags = flagger.getFlags()
        assert pre_data[c].equals(multi_data[pre_data.columns.to_list()][c])
        assert len(multi_data[c]) == len(flags[c])
        assert (pre_flags[c].index == flags[c].index).all()


@pytest.mark.parametrize("method", INTERPOLATIONS2)
def test_gridInterpolation(data, method):
    freq = "15min"
    data = data.squeeze()
    data = (data * np.sin(data)).append(data.shift(1, "2h")).shift(1, "3s")
    # we are just testing if the interpolation gets passed to the series without causing an error:
    _interpolateGrid(data, freq, method, order=1, agg_method="sum", downcast_interpolation=True)
    if method == "polynomial":
        _interpolateGrid(data, freq, method, order=2, agg_method="sum", downcast_interpolation=True)
        _interpolateGrid(data, freq, method, order=10, agg_method="sum", downcast_interpolation=True)
        data = _insertGrid(data, freq)


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_outsortCrap(data, flagger):

    field = data.columns[0]
    flagger = flagger.initFlags(data)
    flagger = flagger.setFlags(field, loc=data[field].index[5:7])

    drop_index = data[field].index[5:7]
    d, *_ = _outsortCrap(data[field], field, flagger, drop_flags=flagger.BAD)
    assert drop_index.difference(d.index).equals(drop_index)

    flagger = flagger.setFlags(field, loc=data[field].index[0:1], flag=flagger.GOOD)
    drop_index = drop_index.insert(-1, data[field].index[0])
    d, *_ = _outsortCrap(data[field], field, flagger, drop_flags=[flagger.BAD, flagger.GOOD],)
    assert drop_index.sort_values().difference(d.index).equals(drop_index.sort_values())


@pytest.mark.parametrize("flagger", TESTFLAGGER)


def test_wrapper(data, flagger):
    # we are only testing, whether the wrappers do pass processing:
    field = data.columns[0]
    freq = "15min"
    flagger = flagger.initFlags(data)
    harm_linear2Grid(data, field, flagger, freq, method="nagg", func="max", drop_flags=None)
    harm_aggregate2Grid(data, field, flagger, freq, value_func="sum", flag_func="max", method="nagg", drop_flags=None)
    harm_shift2Grid(data, field, flagger, freq, method="nshift", drop_flags=None)
    harm_interpolate2Grid(data, field, flagger, freq, method="spline")