Skip to content
Snippets Groups Projects
test_flagtools.py 11.2 KiB
Newer Older
#! /usr/bin/env python

# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later

import itertools
import operator

import numpy as np
import pandas as pd
David Schäfer's avatar
David Schäfer committed
import pytest

import saqc
David Schäfer's avatar
David Schäfer committed
from saqc import BAD as B
from saqc import UNFLAGGED as U
from saqc import SaQC
from saqc.funcs.flagtools import _groupOperation
from saqc.lib.tools import toSequence
from tests.fixtures import data

N = np.nan


@pytest.mark.parametrize(
    "got, expected, kwargs",
    [
        ([N, N, B, N, N], [N, N, N, B, N], {"window": 1, "method": "ffill"}),
        ([N, N, B, N, N], [N, B, N, N, N], {"window": 1, "method": "bfill"}),
        ([B, N, N, N, B], [N, B, N, N, N], {"window": 1, "method": "ffill"}),
        ([B, N, N, N, B], [N, N, N, B, N], {"window": 1, "method": "bfill"}),
        ([N, N, B, N, N], [N, N, N, B, N], {"window": "1D", "method": "ffill"}),
        ([N, N, B, N, N], [N, B, N, N, N], {"window": "1D", "method": "bfill"}),
        ([B, N, N, N, B], [N, B, N, N, N], {"window": "1D", "method": "ffill"}),
        ([B, N, N, N, B], [N, N, N, B, N], {"window": "1D", "method": "bfill"}),
        ([N, N, B, N, N], [N, N, N, B, B], {"window": 2, "method": "ffill"}),
        ([N, N, B, N, N], [B, B, N, N, N], {"window": 2, "method": "bfill"}),
        ([B, N, N, N, B], [N, B, B, N, N], {"window": 2, "method": "ffill"}),
        ([B, N, N, N, B], [N, N, B, B, N], {"window": 2, "method": "bfill"}),
        ([N, N, B, N, N], [N, N, N, B, B], {"window": "2D", "method": "ffill"}),
        ([N, N, B, N, N], [B, B, N, N, N], {"window": "2D", "method": "bfill"}),
        ([B, N, N, N, B], [N, B, B, N, N], {"window": "2D", "method": "ffill"}),
        ([B, N, N, N, B], [N, N, B, B, N], {"window": "2D", "method": "bfill"}),
        # window larger then data
        ([U, U, B, U, U], [N, N, N, B, B], {"window": 10, "method": "ffill"}),
        ([U, U, B, U, U], [B, B, N, N, N], {"window": 10, "method": "bfill"}),
        ([B, U, U, U, U], [N, B, B, B, B], {"window": "10D", "method": "ffill"}),
        ([B, U, U, U, U], [N, N, N, N, N], {"window": "10D", "method": "bfill"}),
        # playing with dfilter
        (
            [1, B, -1, -1, -1],
            [N, N, B, B, N],
            {"window": 2, "method": "ffill", "dfilter": 0},
            [-1, -1, -1, B, 1],
            [N, B, B, N, N],
            {"window": 2, "method": "bfill", "dfilter": 0},
            [B, 1, -1, 1, 1],
            [N, N, B, N, N],
            {"window": "2D", "method": "ffill", "dfilter": 0},
            [B, 1, 1, -1, 1],
            [N, N, N, B, N],
            {"window": "2D", "method": "bfill", "dfilter": 0},
        ),
    ],
)
def test_propagateFlagsRegularIndex(got, expected, kwargs):
    index = pd.date_range("2000-01-01", periods=len(got))
    flags = pd.DataFrame({"x": got}, index=index)
    expected = pd.Series(expected, index=index)
    data = pd.DataFrame({"x": np.nan}, index=index)
    saqc = SaQC(data=data, flags=flags).propagateFlags(field="x", **kwargs)
    result = saqc._history["x"].hist[1].astype(float)
    assert result.equals(expected)


@pytest.mark.parametrize(
    "got, expected, kwargs",
    [
        ([N, N, B, N, N], [N, N, N, N, N], {"window": "1D", "method": "ffill"}),
        ([N, N, B, N, N], [N, N, N, N, N], {"window": "1D", "method": "bfill"}),
        ([B, N, N, N, B], [N, B, N, N, N], {"window": "1D", "method": "ffill"}),
        ([B, N, N, N, B], [N, N, N, N, N], {"window": "1D", "method": "bfill"}),
        ([N, N, B, N, N], [N, N, N, B, N], {"window": "3D", "method": "ffill"}),
        ([N, N, B, N, N], [B, B, N, N, N], {"window": "3D", "method": "bfill"}),
        ([B, N, N, N, B], [N, B, N, N, N], {"window": "2D", "method": "ffill"}),
        ([B, N, N, N, B], [N, N, N, N, N], {"window": "2D", "method": "bfill"}),
        ([B, U, U, U, U], [N, B, B, B, N], {"window": "10D", "method": "ffill"}),
    ],
)
def test_propagateFlagsIrregularIndex(got, expected, kwargs):
    index = pd.to_datetime(
        ["2000-01-01", "2000-01-02", "2000-01-04", "2000-01-07", "2000-01-18"]
    )
    flags = pd.DataFrame({"x": got}, index=index)
    expected = pd.Series(expected, index=index)
    data = pd.DataFrame({"x": np.nan}, index=index)
    saqc = SaQC(data=data, flags=flags).propagateFlags(field="x", **kwargs)
    result = saqc._flags.history["x"].hist[1].astype(float)
    assert result.equals(expected)


@pytest.mark.parametrize(
    "left,right,expected",
    [
        ([B, U, U, B], [B, B, U, U], [B, U, U, U]),
        ([B, B, B, B], [B, B, B, B], [B, B, B, B]),
        ([U, U, U, U], [U, U, U, U], [U, U, U, U]),
    ],
)
def test_andGroup(left, right, expected):
    data = pd.DataFrame({"data": [1, 2, 3, 4]})

    base = SaQC(data=data)
    this = SaQC(data=data, flags=pd.DataFrame({"data": pd.Series(left)}))
    that = SaQC(data=data, flags=pd.DataFrame({"data": pd.Series(right)}))
    result = base.andGroup(field="data", group=[this, that])

    assert pd.Series(expected).equals(result.flags["data"])


@pytest.mark.parametrize(
    "left,right,expected",
    [
        ([B, U, U, B], [B, B, U, U], [B, B, U, B]),
        ([B, B, B, B], [B, B, B, B], [B, B, B, B]),
        ([U, U, U, U], [U, U, U, U], [U, U, U, U]),
    ],
)
def test_orGroup(left, right, expected):
    data = pd.DataFrame({"data": [1, 2, 3, 4]})

    base = SaQC(data=data)
    this = SaQC(data=data, flags=pd.DataFrame({"data": pd.Series(left)}))
    that = SaQC(data=data, flags=pd.DataFrame({"data": pd.Series(right)}))
    result = base.orGroup(field="data", group=[this, that])

    assert pd.Series(expected).equals(result.flags["data"])


@pytest.mark.parametrize(
    "field, target, expected, copy",
        ("x", "a", [B, B, U, B], True),
        (["y", "x"], "a", [B, B, U, B], False),
        (["y", "x"], ["a", "b"], [B, B, U, B], True),
        (["y", ["x", "y"]], "a", [B, B, B, B], False),
        (["y", ["x", "y"]], ["c", ["a", "b"]], [B, B, B, B], True),
def test__groupOperation(field, target, expected, copy):
    base = SaQC(
        data=pd.DataFrame(
            {"x": [0, 1, 2, 3], "y": [0, 11, 22, 33], "z": [0, 111, 222, 333]}
        ),
        flags=pd.DataFrame({"x": [B, U, U, B], "y": [B, B, U, U], "z": [B, B, U, B]}),
    )
    that = SaQC(
        data=pd.DataFrame({"x": [0, 1, 2, 3], "y": [0, 11, 22, 33]}),
        flags=pd.DataFrame({"x": [U, B, U, B], "y": [U, U, B, U]}),
    result = _groupOperation(
        saqc=base, field=field, target=target, func=operator.or_, group=[base, that]
    targets = toSequence(itertools.chain.from_iterable(target))
    for t in targets:
        assert pd.Series(expected).equals(result.flags[t])

    # check source-target behavior
    if copy:
        fields = toSequence(itertools.chain.from_iterable(field))
        for f, t in zip(fields, targets):
            assert (result._data[f] == result._data[t]).all(axis=None)
def test_transferFlags():
    qc = SaQC(
        data=pd.DataFrame(
            {"x": [0, 1, 2, 3], "y": [0, 11, 22, 33], "z": [0, 111, 222, 333]}
        ),
        flags=pd.DataFrame({"x": [B, U, U, B], "y": [B, B, U, U], "z": [B, B, U, B]}),
    )

    # no squueze
    qc1 = qc.transferFlags("x", target="a")
    assert qc1._history["a"].hist.iloc[:, :-1].equals(qc1._history["x"].hist)
    assert qc1._history["a"].hist.iloc[:, -1].isna().all()

    qc2 = qc.transferFlags(["x", "y"], target=["a", "b"])
    assert qc2._history["a"].hist.iloc[:, :-1].equals(qc2._history["x"].hist)
    assert qc2._history["a"].hist.iloc[:, -1].isna().all()
    assert qc2._history["b"].hist.iloc[:, :-1].equals(qc2._history["y"].hist)
    assert qc2._history["b"].hist.iloc[:, -1].isna().all()

    # we use the overwrite option here for easy checking against the origin
    # flags, because otherwise we would need to respect the inserted nan
    qc3 = qc.transferFlags(["x", "y", "z"], target="a", overwrite=True)
    assert qc3._history["a"].hist.iloc[:, 0].equals(qc3._history["x"].hist.squeeze())
    assert qc3._history["a"].hist.iloc[:, 1].equals(qc3._history["y"].hist.squeeze())
    assert qc3._history["a"].hist.iloc[:, 2].equals(qc3._history["z"].hist.squeeze())
    assert qc3._history["a"].hist.iloc[:, -1].isna().all()

    # squueze
    qc1 = qc.transferFlags("x", target="a", squeeze=True)
    assert qc1._history["a"].hist.equals(qc1._history["x"].hist)

    qc2 = qc.transferFlags(["x", "y"], target=["a", "b"], squeeze=True)
    assert qc2._history["a"].hist.equals(qc2._history["x"].hist)
    assert qc2._history["b"].hist.equals(qc2._history["y"].hist)

    # we use the overwrite option here for easy checking against the origin
    # flags, because otherwise we would need to respect the inserted nan
    qc3 = qc.transferFlags(["x", "y", "z"], target="a", overwrite=True, squeeze=True)
    assert qc3._history["a"].hist.iloc[:, 0].equals(qc3._history["x"].hist.squeeze())
    assert qc3._history["a"].hist.iloc[:, 1].equals(qc3._history["y"].hist.squeeze())
    assert qc3._history["a"].hist.iloc[:, 2].equals(qc3._history["z"].hist.squeeze())


@pytest.mark.parametrize(
    "f_data",
    [
        (
            pd.Series(
                ["2000-01-01T00:30:00", "2000-01-01T01:30:00"],
                index=["2000-01-01T00:00:00", "2000-01-01T01:00:00"],
            )
        ),
        (
            np.array(
                [
                    ("2000-01-01T00:00:00", "2000-01-01T00:30:00"),
                    ("2000-01-01T01:00:00", "2000-01-01T01:30:00"),
                ]
            )
        ),
        (
            [
                ("2000-01-01T00:00:00", "2000-01-01T00:30:00"),
                ("2000-01-01T01:00:00", "2000-01-01T01:30:00"),
            ]
        ),
        ("maint"),
    ],
)
def test_setFlags_intervals(f_data):
    start = ["2000-01-01T00:00:00", "2000-01-01T01:00:00"]
    end = ["2000-01-01T00:30:00", "2000-01-01T01:30:00"]
    maint_data = pd.Series(data=end, index=pd.DatetimeIndex(start), name="maint")
    data = pd.Series(
        np.arange(30),
        index=pd.date_range("2000", freq="11min", periods=30),
        name="data",
    )
    qc = saqc.SaQC([data, maint_data])
    qc = qc.setFlags("data", data=f_data)
    assert (qc.flags["data"].iloc[np.r_[0:3, 6:9]] > 0).all()
    assert (qc.flags["data"].iloc[np.r_[4:6, 10:30]] < 0).all()


@pytest.mark.parametrize(
    "f_data",
    [
        (
            np.array(
                [
                    "2000-01-01T00:00:00",
                    "2000-01-01T00:30:00",
                    "2000-01-01T01:00:00",
                    "2000-01-01T01:30:00",
                ]
            )
        ),
        (
            [
                "2000-01-01T00:00:00",
                "2000-01-01T00:30:00",
                "2000-01-01T01:00:00",
                "2000-01-01T01:30:00",
            ]
        ),
    ],
)
def test_setFlags_ontime(f_data):
    start = ["2000-01-01T00:00:00", "2000-01-01T01:00:00"]
    end = ["2000-01-01T00:30:00", "2000-01-01T01:30:00"]
    maint_data = pd.Series(data=end, index=pd.DatetimeIndex(start), name="maint")
    data = pd.Series(
        np.arange(30),
        index=pd.date_range("2000", freq="11min", periods=30),
        name="data",
    )
    qc = saqc.SaQC([data, maint_data])
    qc = qc.setFlags("data", data=f_data)
    assert qc.flags["data"].iloc[0] > 0
    assert (qc.flags["data"].iloc[1:] < 0).all()