#! /usr/bin/env python # -*- coding: utf-8 -*- import ast import pytest import numpy as np import pandas as pd import dios from saqc.constants import * from saqc.core import initFlagsLike, Flags from saqc.core.visitor import ConfigFunctionParser from saqc.core.config import Fields as F from saqc.core.register import register from saqc.funcs.generic import _execGeneric from saqc import SaQC from tests.common import TESTNODATA, initData, writeIO @pytest.fixture def data(): return initData() @pytest.fixture def data_diff(): data = initData(cols=3) col0 = data[data.columns[0]] col1 = data[data.columns[1]] mid = len(col0) // 2 offset = len(col0) // 8 return dios.DictOfSeries( data={ col0.name: col0.iloc[: mid + offset], col1.name: col1.iloc[mid - offset :], } ) def _compileGeneric(expr, flags): tree = ast.parse(expr, mode="eval") _, kwargs = ConfigFunctionParser(flags).parse(tree.body) return kwargs["func"] def test_missingIdentifier(data): flags = Flags() # NOTE: # - the error is only raised at runtime during parsing would be better tests = [ "fff(var2) < 5", "var3 != NODATA", ] for test in tests: func = _compileGeneric(f"generic.flag(func={test})", flags) with pytest.raises(NameError): _execGeneric(flags, data, func, field="", nodata=np.nan) def test_syntaxError(): flags = Flags() tests = [ "range(x=5", "rangex=5)", "range[x=5]" "range{x=5}" "int->float(x=4)" "int*float(x=4)", ] for test in tests: with pytest.raises(SyntaxError): _compileGeneric(f"flag(func={test})", flags) def test_typeError(): """ test that forbidden constructs actually throw an error TODO: find a few more cases or get rid of the test """ flags = Flags() # : think about cases that should be forbidden tests = ("lambda x: x * 2",) for test in tests: with pytest.raises(TypeError): _compileGeneric(f"generic.flag(func={test})", flags) def test_comparisonOperators(data): flags = initFlagsLike(data) var1, var2, *_ = data.columns this = var1 tests = [ ("this > 100", data[this] > 100), (f"10 >= {var2}", 10 >= data[var2]), (f"{var2} < 100", data[var2] < 100), (f"this <= {var2}", data[this] <= data[var2]), (f"{var1} == {var2}", data[this] == data[var2]), (f"{var1} != {var2}", data[this] != data[var2]), ] for test, expected in tests: func = _compileGeneric(f"generic.flag(func={test})", flags) result = _execGeneric(flags, data, func, field=var1, nodata=np.nan) assert np.all(result == expected) def test_arithmeticOperators(data): flags = initFlagsLike(data) var1, *_ = data.columns this = data[var1] tests = [ ("var1 + 100 > 110", this + 100 > 110), ("var1 - 100 > 0", this - 100 > 0), ("var1 * 100 > 200", this * 100 > 200), ("var1 / 100 > .1", this / 100 > 0.1), ("var1 % 2 == 1", this % 2 == 1), ("var1 ** 2 == 0", this ** 2 == 0), ] for test, expected in tests: func = _compileGeneric(f"generic.process(func={test})", flags) result = _execGeneric(flags, data, func, field=var1, nodata=np.nan) assert np.all(result == expected) def test_nonReduncingBuiltins(data): flags = initFlagsLike(data) var1, *_ = data.columns this = var1 mean = data[var1].mean() tests = [ (f"abs({this})", np.abs(data[this])), (f"log({this})", np.log(data[this])), (f"exp({this})", np.exp(data[this])), ( f"ismissing(mask({this} < {mean}))", data[this].mask(data[this] < mean).isna(), ), ] for test, expected in tests: func = _compileGeneric(f"generic.process(func={test})", flags) result = _execGeneric(flags, data, func, field=this, nodata=np.nan) assert (result == expected).all() @pytest.mark.parametrize("nodata", TESTNODATA) def test_reduncingBuiltins(data, nodata): data.loc[::4] = nodata flags = initFlagsLike(data) var1 = data.columns[0] this = data.iloc[:, 0] tests = [ ("min(this)", np.nanmin(this)), (f"max({var1})", np.nanmax(this)), (f"sum({var1})", np.nansum(this)), ("mean(this)", np.nanmean(this)), (f"std({this.name})", np.std(this)), (f"len({this.name})", len(this)), ] for test, expected in tests: func = _compileGeneric(f"generic.process(func={test})", flags) result = _execGeneric(flags, data, func, field=this.name, nodata=nodata) assert result == expected @pytest.mark.parametrize("nodata", TESTNODATA) def test_ismissing(data, nodata): flags = initFlagsLike(data) data.iloc[: len(data) // 2, 0] = np.nan data.iloc[(len(data) // 2) + 1 :, 0] = -9999 this = data.iloc[:, 0] tests = [ (f"ismissing({this.name})", (pd.isnull(this) | (this == nodata))), (f"~ismissing({this.name})", (pd.notnull(this) & (this != nodata))), ] for test, expected in tests: func = _compileGeneric(f"generic.flag(func={test})", flags) result = _execGeneric(flags, data, func, this.name, nodata) assert np.all(result == expected) @pytest.mark.parametrize("nodata", TESTNODATA) def test_bitOps(data, nodata): var1, var2, *_ = data.columns this = var1 flags = initFlagsLike(data) tests = [ ("~(this > mean(this))", ~(data[this] > np.nanmean(data[this]))), (f"(this <= 0) | (0 < {var1})", (data[this] <= 0) | (0 < data[var1])), (f"({var2} >= 0) & (0 > this)", (data[var2] >= 0) & (0 > data[this])), ] for test, expected in tests: func = _compileGeneric(f"generic.flag(func={test})", flags) result = _execGeneric(flags, data, func, this, nodata) assert np.all(result == expected) def test_isflagged(data): var1, var2, *_ = data.columns flags = initFlagsLike(data) flags[data[var1].index[::2], var1] = BAD tests = [ (f"isflagged({var1})", flags[var1] > UNFLAGGED), (f"isflagged({var1}, flag=BAD)", flags[var1] >= BAD), (f"isflagged({var1}, UNFLAGGED, '==')", flags[var1] == UNFLAGGED), (f"~isflagged({var2})", flags[var2] == UNFLAGGED), ( f"~({var2}>999) & (~isflagged({var2}))", ~(data[var2] > 999) & (flags[var2] == UNFLAGGED), ), ] for i, (test, expected) in enumerate(tests): try: func = _compileGeneric(f"generic.flag(func={test}, flag=BAD)", flags) result = _execGeneric(flags, data, func, field=None, nodata=np.nan) assert np.all(result == expected) except Exception: print(i, test) raise # test bad combination for comp in [">", ">=", "==", "!=", "<", "<="]: fails = f"isflagged({var1}, comparator='{comp}')" func = _compileGeneric(f"generic.flag(func={fails}, flag=BAD)", flags) with pytest.raises(ValueError): _execGeneric(flags, data, func, field=None, nodata=np.nan) def test_variableAssignments(data): var1, var2, *_ = data.columns config = f""" {F.VARNAME} ; {F.TEST} dummy1 ; generic.process(func=var1 + var2) dummy2 ; generic.flag(func=var1 + var2 > 0) """ fobj = writeIO(config) saqc = SaQC(data).readConfig(fobj) result_data, result_flags = saqc.getResult(raw=True) assert set(result_data.columns) == set(data.columns) | { "dummy1", } assert set(result_flags.columns) == set(data.columns) | {"dummy1", "dummy2"} # TODO: why this must(!) fail ? - a comment would be helpful @pytest.mark.xfail(strict=True) def test_processMultiple(data_diff): var1, var2, *_ = data_diff.columns config = f""" {F.VARNAME} ; {F.TEST} dummy ; generic.process(func=var1 + 1) dummy ; generic.process(func=var2 - 1) """ fobj = writeIO(config) saqc = SaQC(data_diff).readConfig(fobj) result_data, result_flags = saqc.getResult() assert len(result_data["dummy"]) == len(result_flags["dummy"]) def test_callableArgumentsUnary(data): window = 5 @register(masking="field") def testFuncUnary(data, field, flags, func, **kwargs): data[field] = data[field].rolling(window=window).apply(func) return data, initFlagsLike(data) var = data.columns[0] config = f""" {F.VARNAME} ; {F.TEST} {var} ; testFuncUnary(func={{0}}) """ tests = [ ("sum", np.nansum), ("std(exp(x))", lambda x: np.std(np.exp(x))), ] for (name, func) in tests: fobj = writeIO(config.format(name)) result_config, _ = SaQC(data).readConfig(fobj).getResult() result_api, _ = SaQC(data).testFuncUnary(var, func=func).getResult() expected = data[var].rolling(window=window).apply(func) assert (result_config[var].dropna() == expected.dropna()).all(axis=None) assert (result_api[var].dropna() == expected.dropna()).all(axis=None) def test_callableArgumentsBinary(data): var1, var2 = data.columns[:2] @register(masking="field") def testFuncBinary(data, field, flags, func, **kwargs): data[field] = func(data[var1], data[var2]) return data, initFlagsLike(data) config = f""" {F.VARNAME} ; {F.TEST} {var1} ; testFuncBinary(func={{0}}) """ tests = [ ("x + y", lambda x, y: x + y), ("y - (x * 2)", lambda y, x: y - (x * 2)), ] for (name, func) in tests: fobj = writeIO(config.format(name)) result_config, _ = SaQC(data).readConfig(fobj).getResult() result_api, _ = SaQC(data).testFuncBinary(var1, func=func).getResult() expected = func(data[var1], data[var2]) assert (result_config[var1].dropna() == expected.dropna()).all(axis=None) assert (result_api[var1].dropna() == expected.dropna()).all(axis=None)