Skip to content
Snippets Groups Projects
test_generic_functions.py 6.59 KiB
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import pytest
import numpy as np
import pandas as pd

from test.common import initData, TESTFLAGGER, TESTNODATA

from saqc.core.evaluator import (
    DslTransformer,
    initLocalEnv,
    parseExpression,
    evalExpression,
    compileTree,
    evalCode,
)


def _evalDslExpression(expr, data, field, flagger, nodata=np.nan):
    env = initLocalEnv(data, field, flagger, nodata)
    tree = parseExpression(expr)
    dsl_transformer = DslTransformer(env, data.columns)
    transformed_tree = dsl_transformer.transform(tree)
    code = compileTree(transformed_tree)
    return evalCode(code, local_env=env)


@pytest.fixture
def data():
    return initData()


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagPropagation(data, flagger):
    var1, var2, *_ = data.columns
    this = var1

    flagger = flagger.initFlags(data).setFlags(var2, iloc=slice(None, None, 5))

    var2_flags = flagger.isFlagged(var2)
    var2_data = data[var2].mask(var2_flags)
    data, flagger_result = evalExpression(
        "flagGeneric(func=var2 < mean(var2))", data, this, flagger, np.nan
    )

    expected = var2_flags | (var2_data < var2_data.mean())
    result = flagger_result.isFlagged(this)
    assert (result == expected).all()


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_missingIdentifier(data, flagger):

    flagger = flagger.initFlags(data)
    tests = [
        "flagGeneric(func=fff(var2) < 5)",
        "flagGeneric(func=var3 != NODATA)"
    ]
    for expr in tests:
        with pytest.raises(NameError):
            evalExpression(expr, data, data.columns[0], flagger, np.nan)


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_comparisonOperators(data, flagger):
    flagger = flagger.initFlags(data)
    var1, var2, *_ = data.columns
    this = var1

    tests = [
        ("this > 100", data[this] > 100),
        (f"10 >= {var2}", 10 >= data[var2]),
        (f"{var2} < 100", data[var2] < 100),
        (f"this <= {var2}", data[this] <= data[var2]),
        (f"{var1} == {var2}", data[this] == data[var2]),
        (f"{var1} != {var2}", data[this] != data[var2]),
    ]

    # check within the usually enclosing scope
    for expr, mask in tests:
        _, result_flagger = evalExpression(
            f"flagGeneric(func={expr})", data, this, flagger, np.nan
        )
        expected_flagger = flagger.setFlags(this, loc=mask, test="generic")
        assert np.all(result_flagger.isFlagged() == expected_flagger.isFlagged())


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_arithmeticOperators(data, flagger):
    flagger = flagger.initFlags(data)
    var1, *_ = data.columns
    this = data[var1]

    tests = [
        ("this + 100 > 110", this + 100 > 110),
        ("this - 100 > 0", this - 100 > 0),
        ("this * 100 > 200", this * 100 > 200),
        ("this / 100 > .1", this / 100 > .1),
        ("this % 2 == 1", this % 2 == 1),
        ("this ** 2 == 0", this ** 2 == 0),
    ]

    # check within the usually enclosing scope
    for expr, mask in tests:
        _, result_flagger = evalExpression(
            f"flagGeneric(func={expr})", data, var1, flagger, np.nan
        )
        expected_flagger = flagger.setFlags(var1, loc=mask, test="generic")
        assert np.all(result_flagger.isFlagged() == expected_flagger.isFlagged())


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_nonReduncingBuiltins(data, flagger):
    flagger = flagger.initFlags(data)
    var1, *_ = data.columns
    this = var1

    tests = [
        ("abs(this)", np.abs(data[this])),
    ]

    for expr, expected in tests:
        result = _evalDslExpression(expr, data, this, flagger)
        assert (result == expected).all()


@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("nodata", TESTNODATA)
def test_reduncingBuiltins(data, flagger, nodata):
    data.loc[::4] = nodata
    flagger = flagger.initFlags(data)
    var1, var2, *_ = data.columns
    this = var1

    tests = [
        ("min(this)", np.min(data[this])),
        (f"max({var1})", np.max(data[var1])),
        (f"sum({var2})", np.sum(data[var2])),
        ("mean(this)", np.mean(data[this])),
        (f"std({var1})", np.std(data[var1])),
        (f"len({var2})", len(data[var2])),
    ]

    for expr, expected in tests:
        result = _evalDslExpression(expr, data, this, flagger, nodata)
        assert result == expected


@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("nodata", TESTNODATA)
def test_ismissing(data, flagger, nodata):

    data.iloc[: len(data) // 2, 0] = np.nan
    data.iloc[(len(data) // 2) + 1 :, 0] = -9999
    var1, *_ = data.columns

    flagger = flagger.initFlags(data)

    tests = [
        (f"ismissing({var1})", lambda data: (pd.isnull(data) | (data == nodata)).all()),
        (
            f"~ismissing({var1})",
            lambda data: (pd.notnull(data) & (data != nodata)).all(),
        ),
    ]

    for expr, checkFunc in tests:
        idx = _evalDslExpression(expr, data, var1, flagger, nodata)
        assert checkFunc(data.loc[idx, var1])


@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("nodata", TESTNODATA)
def test_bitOps(data, flagger, nodata):
    var1, var2, *_ = data.columns
    this = var1

    flagger = flagger.initFlags(data)

    tests = [
        (f"flagGeneric(func=~(this > mean(this)))", ~(data[this] > np.nanmean(data[this]))),
        (
            f"flagGeneric(func=(this <= 0) | (0 < {var1}))",
            (data[this] <= 0) | (0 < data[var1]),
        ),
        (
            f"flagGeneric(func=({var2} >= 0) & (0 > this))",
            (data[var2] >= 0) & (0 > data[this]),
        ),
    ]

    for expr, expected in tests:
        _, flagger_result = evalExpression(expr, data, this, flagger, nodata)
        assert (flagger_result.isFlagged(this) == expected).all()


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_isflagged(data, flagger):

    flagger = flagger.initFlags(data)
    var1, var2, *_ = data.columns

    flagger = flagger.setFlags(var1, iloc=slice(None, None, 2))
    flagger = flagger.setFlags(var2, iloc=slice(None, None, 2))

    idx = _evalDslExpression(f"isflagged({var1})", data, var2, flagger)

    flagged = flagger.isFlagged(var1)
    assert (flagged == idx).all


@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_isflaggedArgument(data, flagger):

    var1, var2, *_ = data.columns

    flagger = flagger.initFlags(data).setFlags(
        var1, iloc=slice(None, None, 2), flag=flagger.BAD
    )

    idx = _evalDslExpression(f"isflagged({var1}, {flagger.BAD})", data, var2, flagger)

    flagged = flagger.isFlagged(var1, flag=flagger.BAD)
    assert (flagged == idx).all()