#! /usr/bin/env python # -*- coding: utf-8 -*- import ast import pytest import numpy as np import pandas as pd from dios import DictOfSeries from test.common import TESTFLAGGER, TESTNODATA, initData, writeIO from saqc.core.visitor import ConfigFunctionParser from saqc.core.config import Fields as F from saqc.core.register import register from saqc import SaQC, SimpleFlagger from saqc.funcs.generic import _execGeneric @pytest.fixture def data(): return initData() @pytest.fixture def data_diff(): data = initData(cols=3) col0 = data[data.columns[0]] col1 = data[data.columns[1]] mid = len(col0) // 2 offset = len(col0) // 8 return DictOfSeries(data={col0.name: col0.iloc[: mid + offset], col1.name: col1.iloc[mid - offset :],}) def _compileGeneric(expr, flagger): tree = ast.parse(expr, mode="eval") _, kwargs = ConfigFunctionParser(flagger).parse(tree.body) return kwargs["func"] @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_missingIdentifier(data, flagger): # NOTE: # - the error is only raised at runtime during parsing would be better tests = [ "fff(var2) < 5", "var3 != NODATA", ] for test in tests: func = _compileGeneric(f"generic.flag(func={test})", flagger) with pytest.raises(NameError): _execGeneric(flagger, data, func, field="", nodata=np.nan) @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_syntaxError(flagger): tests = [ "range(x=5", "rangex=5)", "range[x=5]" "range{x=5}" "int->float(x=4)" "int*float(x=4)", ] for test in tests: with pytest.raises(SyntaxError): _compileGeneric(f"flag(func={test})", flagger) @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_typeError(flagger): """ test that forbidden constructs actually throw an error TODO: find a few more cases or get rid of the test """ # : think about cases that should be forbidden tests = ("lambda x: x * 2",) for test in tests: with pytest.raises(TypeError): _compileGeneric(f"generic.flag(func={test})", flagger) @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_comparisonOperators(data, flagger): flagger = flagger.initFlags(data) var1, var2, *_ = data.columns this = var1 tests = [ ("this > 100", data[this] > 100), (f"10 >= {var2}", 10 >= data[var2]), (f"{var2} < 100", data[var2] < 100), (f"this <= {var2}", data[this] <= data[var2]), (f"{var1} == {var2}", data[this] == data[var2]), (f"{var1} != {var2}", data[this] != data[var2]), ] for test, expected in tests: func = _compileGeneric(f"generic.flag(func={test})", flagger) result = _execGeneric(flagger, data, func, field=var1, nodata=np.nan) assert np.all(result == expected) @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_arithmeticOperators(data, flagger): flagger = flagger.initFlags(data) var1, *_ = data.columns this = data[var1] tests = [ ("var1 + 100 > 110", this + 100 > 110), ("var1 - 100 > 0", this - 100 > 0), ("var1 * 100 > 200", this * 100 > 200), ("var1 / 100 > .1", this / 100 > 0.1), ("var1 % 2 == 1", this % 2 == 1), ("var1 ** 2 == 0", this ** 2 == 0), ] for test, expected in tests: func = _compileGeneric(f"generic.process(func={test})", flagger) result = _execGeneric(flagger, data, func, field=var1, nodata=np.nan) assert np.all(result == expected) @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_nonReduncingBuiltins(data, flagger): flagger = flagger.initFlags(data) var1, *_ = data.columns this = var1 mean = data[var1].mean() tests = [ (f"abs({this})", np.abs(data[this])), (f"log({this})", np.log(data[this])), (f"exp({this})", np.exp(data[this])), (f"ismissing(mask({this} < {mean}))", data[this].mask(data[this] < mean).isna()), ] for test, expected in tests: func = _compileGeneric(f"generic.process(func={test})", flagger) result = _execGeneric(flagger, data, func, field=this, nodata=np.nan) assert (result == expected).all() @pytest.mark.parametrize("flagger", TESTFLAGGER) @pytest.mark.parametrize("nodata", TESTNODATA) def test_reduncingBuiltins(data, flagger, nodata): data.loc[::4] = nodata flagger = flagger.initFlags(data) var1 = data.columns[0] this = data.iloc[:, 0] tests = [ ("min(this)", np.nanmin(this)), (f"max({var1})", np.nanmax(this)), (f"sum({var1})", np.nansum(this)), ("mean(this)", np.nanmean(this)), (f"std({this.name})", np.std(this)), (f"len({this.name})", len(this)), ] for test, expected in tests: func = _compileGeneric(f"generic.process(func={test})", flagger) result = _execGeneric(flagger, data, func, field=this.name, nodata=nodata) assert result == expected @pytest.mark.parametrize("flagger", TESTFLAGGER) @pytest.mark.parametrize("nodata", TESTNODATA) def test_ismissing(data, flagger, nodata): data.iloc[: len(data) // 2, 0] = np.nan data.iloc[(len(data) // 2) + 1 :, 0] = -9999 this = data.iloc[:, 0] tests = [ (f"ismissing({this.name})", (pd.isnull(this) | (this == nodata))), (f"~ismissing({this.name})", (pd.notnull(this) & (this != nodata))), ] for test, expected in tests: func = _compileGeneric(f"generic.flag(func={test})", flagger) result = _execGeneric(flagger, data, func, this.name, nodata) assert np.all(result == expected) @pytest.mark.parametrize("flagger", TESTFLAGGER) @pytest.mark.parametrize("nodata", TESTNODATA) def test_bitOps(data, flagger, nodata): var1, var2, *_ = data.columns this = var1 flagger = flagger.initFlags(data) tests = [ ("~(this > mean(this))", ~(data[this] > np.nanmean(data[this]))), (f"(this <= 0) | (0 < {var1})", (data[this] <= 0) | (0 < data[var1])), (f"({var2} >= 0) & (0 > this)", (data[var2] >= 0) & (0 > data[this])), ] for test, expected in tests: func = _compileGeneric(f"generic.flag(func={test})", flagger) result = _execGeneric(flagger, data, func, this, nodata) assert np.all(result == expected) @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_isflagged(data, flagger): var1, var2, *_ = data.columns flagger = flagger.initFlags(data).setFlags(var1, loc=data[var1].index[::2], flag=flagger.BAD) tests = [ (f"isflagged({var1})", flagger.isFlagged(var1)), (f"isflagged({var1}, flag=BAD)", flagger.isFlagged(var1, flag=flagger.BAD, comparator=">=")), (f"isflagged({var1}, UNFLAGGED, '==')", flagger.isFlagged(var1, flag=flagger.UNFLAGGED, comparator="==")), (f"~isflagged({var2})", ~flagger.isFlagged(var2)), (f"~({var2}>999) & (~isflagged({var2}))", ~(data[var2] > 999) & (~flagger.isFlagged(var2))), ] for test, expected in tests: func = _compileGeneric(f"generic.flag(func={test}, flag=BAD)", flagger) result = _execGeneric(flagger, data, func, field=None, nodata=np.nan) assert np.all(result == expected) @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_variableAssignments(data, flagger): var1, var2, *_ = data.columns config = f""" {F.VARNAME} ; {F.TEST} dummy1 ; generic.process(func=var1 + var2) dummy2 ; generic.flag(func=var1 + var2 > 0) """ fobj = writeIO(config) saqc = SaQC(flagger, data).readConfig(fobj) result_data, result_flagger = saqc.getResult(raw=True) assert set(result_data.columns) == set(data.columns) | { "dummy1", } assert set(result_flagger.getFlags().columns) == set(data.columns) | {"dummy1", "dummy2"} @pytest.mark.xfail(strict=True) @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_processMultiple(data_diff, flagger): var1, var2, *_ = data_diff.columns config = f""" {F.VARNAME} ; {F.TEST} dummy ; generic.process(func=var1 + 1) dummy ; generic.process(func=var2 - 1) """ fobj = writeIO(config) saqc = SaQC(flagger, data_diff).readConfig(fobj) result_data, result_flagger = saqc.getResult() assert len(result_data["dummy"]) == len(result_flagger.getFlags("dummy")) def test_callableArgumentsUnary(data): window = 5 @register(masking='field') def testFuncUnary(data, field, flagger, func, **kwargs): data[field] = data[field].rolling(window=window).apply(func) return data, flagger.initFlags(data=data) flagger = SimpleFlagger() var = data.columns[0] config = f""" {F.VARNAME} ; {F.TEST} {var} ; testFuncUnary(func={{0}}) """ tests = [ ("sum", np.nansum), ("std(exp(x))", lambda x: np.std(np.exp(x))), ] for (name, func) in tests: fobj = writeIO(config.format(name)) result_config, _ = SaQC(flagger, data).readConfig(fobj).getResult() result_api, _ = SaQC(flagger, data).testFuncUnary(var, func=func).getResult() expected = data[var].rolling(window=window).apply(func) assert (result_config[var].dropna() == expected.dropna()).all(axis=None) assert (result_api[var].dropna() == expected.dropna()).all(axis=None) def test_callableArgumentsBinary(data): flagger = SimpleFlagger() var1, var2 = data.columns[:2] @register(masking='field') def testFuncBinary(data, field, flagger, func, **kwargs): data[field] = func(data[var1], data[var2]) return data, flagger.initFlags(data=data) config = f""" {F.VARNAME} ; {F.TEST} {var1} ; testFuncBinary(func={{0}}) """ tests = [ ("x + y", lambda x, y: x + y), ("y - (x * 2)", lambda y, x: y - (x * 2)), ] for (name, func) in tests: fobj = writeIO(config.format(name)) result_config, _ = SaQC(flagger, data).readConfig(fobj).getResult() result_api, _ = SaQC(flagger, data).testFuncBinary(var1, func=func).getResult() expected = func(data[var1], data[var2]) assert (result_config[var1].dropna() == expected.dropna()).all(axis=None) assert (result_api[var1].dropna() == expected.dropna()).all(axis=None)