Skip to content
Snippets Groups Projects
test_generic_config_functions.py 8.49 KiB
#! /usr/bin/env python

# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later

# -*- coding: utf-8 -*-

import ast

import numpy as np
import pandas as pd
import pytest

from saqc import BAD, UNFLAGGED, SaQC
from saqc.core import DictOfSeries, Flags, initFlagsLike, register
from saqc.funcs.generic import _execGeneric
from saqc.parsing.reader import _ConfigReader
from saqc.parsing.visitor import ConfigFunctionParser
from tests.common import initData


@pytest.fixture
def data():
    return initData()


@pytest.fixture
def data_diff():
    data = initData(cols=3)
    col0 = data[data.columns[0]]
    col1 = data[data.columns[1]]
    mid = len(col0) // 2
    offset = len(col0) // 8
    return DictOfSeries(
        {
            data.columns[0]: col0.iloc[: mid + offset],
            data.columns[1]: col1.iloc[mid - offset :],
        }
    )


def _compileGeneric(expr):
    tree = ast.parse(expr, mode="eval")
    _, kwargs = ConfigFunctionParser().parse(tree.body)
    return kwargs["func"]


@pytest.mark.parametrize(
    "expr",
    [
        "range(x=5",
        "rangex=5)",
        "range[x=5]" "range{x=5}" "int->float(x=4)" "int*float(x=4)",
    ],
)
def test_syntaxError(expr):
    with pytest.raises(SyntaxError):
        _compileGeneric(f"flag(func={expr})")


# TODO: think about cases that should be forbidden
@pytest.mark.parametrize("expr", ["lambda x: x * 2"])
def test_typeError(expr):
    """
    test that forbidden constructs actually throw an error
    """
    with pytest.raises(TypeError):
        _compileGeneric(f"flagGeneric(func={expr})")


@pytest.mark.parametrize(
    "fields,expr,expected",
    [
        (["var1"], "x > 100", 'data["var1"] > 100'),
        (["var2"], "10 >= y", '10 >= data["var2"]'),
        (["var2"], f"y < 100", 'data["var2"] < 100'),
        (["var1", "var2"], "x <= y", 'data["var1"] <= data["var2"]'),
        (["var1", "var2"], "x == y", 'data["var1"] == data["var2"]'),
        (["var1", "var2"], "x != y", 'data["var1"] != data["var2"]'),
    ],
)
def test_comparisonOperators(data, fields, expr, expected):
    expected = eval(expected)
    flags = initFlagsLike(data)
    func = _compileGeneric(f"flagGeneric(func={expr})")
    result = _execGeneric(Flags({f: flags[f] for f in fields}), data[fields], func)
    assert (result == expected).all(axis=None)


@pytest.mark.parametrize(
    "expr,expected",
    [
        ("var1 + 100 > 110", 'data["var1"] + 100 > 110'),
        ("var1 - 100 > 0", 'data["var1"] - 100 > 0'),
        ("var1 * 100 > 200", 'data["var1"] * 100 > 200'),
        ("var1 / 100 > .1", 'data["var1"] / 100 > 0.1'),
        ("var1 % 2 == 1", 'data["var1"] % 2 == 1'),
        ("var1 ** 2 == 0", 'data["var1"]**2 == 0'),
    ],
)
def test_arithmeticOperators(data, expr, expected):
    expected = eval(expected)
    flags = Flags({"var1": pd.Series(UNFLAGGED, index=data["var1"].index)})
    func = _compileGeneric(f"processGeneric(func={expr})")
    result = _execGeneric(flags, data["var1"], func)
    assert (result == expected).all(axis=None)


def test_nonReduncingBuiltins(data):
    var1, *_ = data.columns
    data = data[var1].iloc[1:10]
    flags = Flags({var1: pd.Series(UNFLAGGED, index=data.index)})

    tests = [
        ("abs(x)", np.abs(data)),
        ("log(x)", np.log(data)),
        ("exp(x)", np.exp(data)),
    ]

    for test, expected in tests:
        func = _compileGeneric(f"processGeneric(func={test})")
        result = _execGeneric(flags, data, func)
        assert (result == expected).all(axis=None)


def test_bitOps(data):
    var1, var2, *_ = data.columns
    flags = initFlagsLike(data)

    tests = [
        ([var1], "~(x > mean(x))", ~(data[var1] > np.nanmean(data[var1]))),
        ([var1], "(x <= 0) | (0 < x)", (data[var1] <= 0) | (0 < data[var1])),
        ([var1, var2], "(y>= 0) & (0 > x)", (data[var2] >= 0) & (0 > data[var1])),
    ]

    for field, test, expected in tests:
        func = _compileGeneric(f"flagGeneric(func={test})")
        result = _execGeneric(Flags({f: flags[f] for f in field}), data[field], func)
        assert (result == expected).all(axis=None)


def test_variableAssignments(data):
    config = f"""
    varname ; test
    dummy1  ; processGeneric(field=["var1", "var2"], func=x + y)
    dummy2  ; flagGeneric(field=["var1", "var2"], func=x + y > 0)
    """

    cr = _ConfigReader(data)
    saqc = cr.readString(config).run()

    expected_columns = set(data.columns) | {"dummy1", "dummy2"}
    assert set(saqc.data.columns) == expected_columns
    assert set(saqc.flags.columns) == expected_columns


def test_processExistingTarget(data):
    config = f"""
    varname ; test
    var2   ; flagMissing()
    var2   ; processGeneric(func=y - 1)
    """

    cr = _ConfigReader(data)
    saqc = cr.readString(config).run()
    assert (saqc._data["var2"] == data["var2"] - 1).all()
    assert len(saqc._flags.history["var2"]) == 2
    assert saqc._flags.history["var2"].hist[0].isna().all()
    assert saqc._flags.history["var2"].hist[1].isna().all()


def test_flagTargetExisting(data):
    config = f"""
    varname ; test
    dummy   ; processGeneric(field="var1", func=x < 1)
    dummy   ; processGeneric(field="var2", func=y >1)
    """

    cr = _ConfigReader(data)
    saqc = cr.readString(config).run()
    assert len(saqc.data["dummy"]) == len(saqc.flags["dummy"])


def test_processTargetExistingFail(data_diff):
    config = f"""
    varname ; test
    dummy   ; processGeneric(field="var1", func=x + 1)
    dummy   ; processGeneric(field="var2", func=y - 1)
    """

    cr = _ConfigReader(data_diff).readString(config)
    with pytest.raises(ValueError):
        cr.run()


def test_flagTargetExistingFail(data_diff):
    config = f"""
    varname ; test
    dummy   ; flagGeneric(field="var1", func=x < 1)
    dummy   ; flagGeneric(field="var2", func=y > 1)
    """

    cr = _ConfigReader(data_diff).readString(config)
    with pytest.raises(ValueError):
        cr.run()


def test_callableArgumentsUnary(data):
    window = 5

    @register(mask=["field"], demask=["field"], squeeze=["field"])
    def testFuncUnary(saqc, field, func, **kwargs):
        value = saqc._data[field].rolling(window=window).apply(func)
        saqc._data[field] = value
        return saqc

    var = data.columns[0]

    # we slice the data, because the test is very slow otherwise
    data[var] = data[var].iloc[:100]

    config = f"""
    varname ; test
    {var}   ; testFuncUnary(func={{0}})
    """

    tests = [
        # ("sum", np.nansum),
        ("std(exp(x))", lambda x: np.std(np.exp(x))),
    ]

    for name, func in tests:
        cr = _ConfigReader(data).readString(config.format(name))
        result_config = cr.run().data
        result_api = SaQC(data).testFuncUnary(var, func=func).data
        expected = data[var].rolling(window=window).apply(func)
        assert (result_config[var].dropna() == expected.dropna()).all(axis=None)
        assert (result_api[var].dropna() == expected.dropna()).all(axis=None)


def test_callableArgumentsBinary(data):
    var1, var2 = data.columns[:2]

    @register(mask=["field"], demask=["field"], squeeze=["field"])
    def testFuncBinary(saqc, field, func, **kwargs):
        saqc._data[field] = func(data[var1], data[var2])
        return saqc

    config = f"""
    varname ; test
    {var1}  ; testFuncBinary(func={{0}})
    """

    tests = [
        ("x + y", lambda x, y: x + y),
        ("y - (x * 2)", lambda y, x: y - (x * 2)),
    ]

    for name, func in tests:
        cr = _ConfigReader(data).readString(config.format(name))
        result_config = cr.run().data
        result_api = SaQC(data).testFuncBinary(var1, func=func).data
        expected = func(data[var1], data[var2])
        assert (result_config[var1].dropna() == expected.dropna()).all(axis=None)
        assert (result_api[var1].dropna() == expected.dropna()).all(axis=None)


def test_isflagged(data):
    var1, var2, *_ = data.columns
    flags = initFlagsLike(data)
    flags[data[var1].index[::2], var1] = BAD

    tests = [
        ([var1], f"isflagged(x)", flags[var1] > UNFLAGGED),
        ([var1], f"isflagged(x)", flags[var1] >= BAD),
        ([var2], f"~isflagged(x)", flags[var2] == UNFLAGGED),
        (
            [var1, var2],
            f"~(x > 999) & (~isflagged(y))",
            ~(data[var1] > 999) & (flags[var2] == UNFLAGGED),
        ),
    ]

    for field, test, expected in tests:
        func = _compileGeneric(f"flagGeneric(func={test}, flag=BAD)")
        result = _execGeneric(Flags({f: flags[f] for f in field}), data[field], func)
        assert (result == expected).all(axis=None)