Skip to content
Snippets Groups Projects
Commit 0d9d51f1 authored by Lennart Schmidt's avatar Lennart Schmidt
Browse files

Merge branch 'master' of https://git.ufz.de/rdm/saqc

updating function reference
parents bd8fd267 e8e5a0a6
No related branches found
No related tags found
No related merge requests found
......@@ -17,17 +17,15 @@ def _collectVariables(meta, data):
find every relevant variable
"""
# NOTE: get to know every variable from meta
flags = [] # data.columns.tolist()
variables = []
for idx, configrow in meta.iterrows():
varname = configrow[Fields.VARNAME]
assign = configrow[Fields.ASSIGN]
if varname in flags:
if varname in variables:
continue
if varname in data:
flags.append(varname)
elif varname not in flags and assign is True:
flags.append(varname)
return flags
if (varname in data) or (varname not in variables and assign is True):
variables.append(varname)
return variables
def _checkInput(data, flags, flagger):
......@@ -75,14 +73,14 @@ def runner(metafname, flagger, data, flags=None, nodata=np.nan, error_policy="ra
meta = config[config.columns.difference(tests.columns)]
# # prepapre the flags
varnames = _collectVariables(meta, data)
fresh = flagger.initFlags(pd.DataFrame(index=data.index, columns=varnames))
flagger = fresh if flags is None else flags._flags.join(fresh._flags)
# if flags is None:
# flag_cols = _collectVariables(meta, data)
# flagger = flagger.initFlags(pd.DataFrame(index=data.index, columns=flag_cols))
# else:
# flagger = flagger.initFlags(flags=flags)
# varnames = _collectVariables(meta, data)
# fresh = flagger.initFlags(pd.DataFrame(index=data.index, columns=varnames))
# flagger = fresh if flags is None else flags.join(fresh._flags)
flag_cols = _collectVariables(meta, data)
flagger = flagger.initFlags(data=pd.DataFrame(index=data.index, columns=flag_cols))
if flags is not None:
flagger = flagger.setFlagger(flagger.initFlags(flags=flags))
# this checks comes late, but the compiling of the user-test need fully prepared flags
checkConfig(config, data, flagger, nodata)
......@@ -132,7 +130,14 @@ def runner(metafname, flagger, data, flags=None, nodata=np.nan, error_policy="ra
flagger = flagger.setFlagger(flagger_chunk_result)
plotHook(dchunk, flagger_chunk, flagger_chunk_result, varname, configrow[Fields.PLOT], flag_test)
plotHook(
dchunk,
flagger_chunk,
flagger_chunk_result,
varname,
configrow[Fields.PLOT],
flag_test,
)
plotAllHook(data, flagger)
......
......@@ -49,15 +49,13 @@ class BaseFlagger(ABC):
if 'data' is not None: return a flagger with flagger.UNFALGGED values
if 'flags' is not None: return a flagger with the given flags
"""
if data is None and flags is None:
raise TypeError("either 'data' or 'flags' are required")
if data is not None:
assertDataFrame(data, "data", allow_multiindex=False)
flags = pd.DataFrame(
data=self.UNFLAGGED, index=data.index, columns=data.columns
)
elif flags is not None:
assertDataFrame(flags, "flags", allow_multiindex=False)
else:
raise TypeError("either 'data' or 'flags' are required")
return self._copy(self._assureDtype(flags))
def setFlagger(self, other: BaseFlaggerT):
......@@ -67,11 +65,20 @@ class BaseFlagger(ABC):
# NOTE: add more checks !?
if not isinstance(other, self.__class__):
raise TypeError(f"flagger of type '{self.__class__}' needed")
out = deepcopy(self)
# NOTE: for a weird reason, this only works with the loop
for v in other._flags.columns:
out._flags.loc[other._flags.index, v] = other._flags[v]
return out
this = self._flags
other = other._flags
flags = this.reindex(
index=this.index.union(other.index),
columns=this.columns.union(other.columns, sort=False),
fill_value=self.UNFLAGGED,
)
for key, values in other.iteritems():
flags.loc[other.index, key] = values
return self._copy(self._assureDtype(flags))
def getFlagger(
self, field: str = None, loc: LocT = None, iloc: IlocT = None
......
......@@ -50,15 +50,14 @@ class DmpFlagger(CategoricalBaseFlagger):
if 'data' is not None: return a flagger with flagger.UNFALGGED values
if 'flags' is not None: return a flagger with the given flags
"""
if data is not None:
assertDataFrame(data, "data", allow_multiindex=False)
flags = pd.DataFrame(
data=self.UNFLAGGED,
columns=self._getColumnIndex(data.columns),
index=data.index,
)
elif flags is not None:
assertDataFrame(flags, "flags", allow_multiindex=False)
if not isinstance(flags.columns, pd.MultiIndex):
flags = flags.T.set_index(
keys=self._getColumnIndex(flags.columns, [FlagFields.FLAG])
......@@ -133,5 +132,7 @@ class DmpFlagger(CategoricalBaseFlagger):
col_data = flags[(var, flag_field)]
if flag_field == FlagFields.FLAG:
col_data = col_data.astype(self.dtype)
else:
col_data = col_data.astype(str)
tmp[(var, flag_field)] = col_data
return pd.DataFrame(tmp, columns=flags.columns, index=flags.index)
......@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import pytest
import numpy as np
import pandas as pd
from saqc.funcs import register, flagRange
......@@ -11,9 +12,7 @@ from saqc.lib.plotting import _plot
from test.common import initData, initMetaDict, TESTFLAGGER
@pytest.fixture
def data():
return initData(3)
OPTIONAL = [False, True]
@register("flagAll")
......@@ -22,8 +21,31 @@ def flagAll(data, field, flagger, **kwargs):
return data, flagger.setFlags(field=field, flag=flagger.BAD)
@pytest.fixture
def data():
return initData(3)
def _initFlags(flagger, data, optional):
return None
if optional:
return flagger.initFlags(data[data.columns[::2]])._flags
@pytest.fixture
def flags(flagger, data, optional):
if not optional:
return flagger.initFlags(data[data.columns[::2]])._flags
# NOTE: there is a lot of pytest magic involved:
# the parametrize parameters are implicitly available
# within the used fixtures, that is why we need the optional
# parametrization without actually using it in the
# function
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_temporalPartitioning(data, flagger):
@pytest.mark.parametrize("optional", OPTIONAL)
def test_temporalPartitioning(data, flagger, flags):
"""
Check if the time span in meta is respected
"""
......@@ -36,7 +58,7 @@ def test_temporalPartitioning(data, flagger):
{F.VARNAME: var3, F.TESTS: "flagAll()", F.START: split_date},
]
meta_file, meta_frame = initMetaDict(metadict, data)
pdata, pflagger = runner(meta_file, flagger, data)
pdata, pflagger = runner(meta_file, flagger, data, flags=flags)
fields = [F.VARNAME, F.START, F.END]
for _, row in meta_frame.iterrows():
......@@ -47,8 +69,11 @@ def test_temporalPartitioning(data, flagger):
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_positionalPartitioning(data, flagger):
@pytest.mark.parametrize("optional", OPTIONAL)
def test_positionalPartitioning(data, flagger, flags):
data = data.reset_index(drop=True)
if flags is not None:
flags = flags.reset_index(drop=True)
var1, var2, var3, *_ = data.columns
split_index = int(len(data.index) // 2)
......@@ -59,7 +84,7 @@ def test_positionalPartitioning(data, flagger):
]
meta_file, meta_frame = initMetaDict(metadict, data)
pdata, pflagger = runner(meta_file, flagger, data)
pdata, pflagger = runner(meta_file, flagger, data, flags=flags)
fields = [F.VARNAME, F.START, F.END]
for _, row in meta_frame.iterrows():
......@@ -72,7 +97,8 @@ def test_positionalPartitioning(data, flagger):
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_missingConfig(data, flagger):
@pytest.mark.parametrize("optional", OPTIONAL)
def test_missingConfig(data, flagger, flags):
"""
Test if variables available in the dataset but not the config
are handled correctly, i.e. are ignored
......@@ -82,7 +108,7 @@ def test_missingConfig(data, flagger):
metadict = [{F.VARNAME: var1, F.TESTS: "flagAll()"}]
metafobj, meta = initMetaDict(metadict, data)
pdata, pflagger = runner(metafobj, flagger, data)
pdata, pflagger = runner(metafobj, flagger, data, flags=flags)
assert var1 in pdata and var2 not in pflagger.getFlags()
......@@ -105,6 +131,28 @@ def test_missingVariable(flagger):
runner(metafobj, flagger, data)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_duplicatedVariable(flagger):
data = initData(1)
var1, *_ = data.columns
metadict = [
{F.VARNAME: var1, F.ASSIGN: False, F.TESTS: "flagAll()"},
{F.VARNAME: var1, F.ASSIGN: True, F.TESTS: "flagAll()"},
]
metafobj, meta = initMetaDict(metadict, data)
pdata, pflagger = runner(metafobj, flagger, data)
pflags = pflagger.getFlags()
if isinstance(pflags.columns, pd.MultiIndex):
cols = pflags.columns.get_level_values(0).drop_duplicates()
assert np.all(cols == [var1])
else:
assert (pflags.columns == [var1]).all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_assignVariable(flagger):
"""
......@@ -134,7 +182,8 @@ def test_assignVariable(flagger):
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_dtypes(data, flagger):
@pytest.mark.parametrize("optional", OPTIONAL)
def test_dtypes(data, flagger, flags):
"""
Test if the categorical dtype is preserved through the core functionality
"""
......@@ -147,12 +196,11 @@ def test_dtypes(data, flagger):
{F.VARNAME: var2, F.TESTS: "flagAll()"},
]
metafobj, meta = initMetaDict(metadict, data)
pdata, pflagger = runner(metafobj, flagger, data, flags)
pdata, pflagger = runner(metafobj, flagger, data, flags=flags)
pflags = pflagger.getFlags()
assert dict(flags.dtypes) == dict(pflags.dtypes)
@pytest.mark.skip(reason="not ported yet")
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_plotting(data, flagger):
"""
......@@ -169,4 +217,4 @@ def test_plotting(data, flagger):
data, field, flagger_range, min=40, max=60, flag=flagger.GOOD
)
mask = flagger.getFlags(field) != flagger_range.getFlags(field)
plot(data, mask, field, flagger, interactive_backend=False)
_plot(data, mask, field, flagger, interactive_backend=False)
......@@ -8,9 +8,7 @@ import pytest
import numpy as np
import pandas as pd
from pandas.api.types import is_bool_dtype
from pandas.core.indexing import IndexingError
from saqc.funcs.functions import flagRange, flagSesonalRange, forceFlags, clearFlags
from test.common import TESTFLAGGER
......@@ -37,6 +35,75 @@ DATASETS = [
]
@pytest.mark.parametrize("data", DATASETS)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_setFlagger(data, flagger):
field, *_ = data.columns
this_flagger = flagger.initFlags(data)
other_flagger = this_flagger.getFlagger(iloc=slice(None, None, 3)).setFlags(field)
result_flagger = this_flagger.setFlagger(other_flagger)
other_flags = other_flagger.getFlags()
result_flags = result_flagger.getFlags(field)
assert np.all(
result_flagger.getFlags(loc=other_flagger.getFlags().index) == other_flags
)
assert np.all(
result_flags[~result_flags.index.isin(other_flags.index)] == flagger.UNFLAGGED
)
@pytest.mark.parametrize("data", DATASETS)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_setFlaggerColumnsDiff(data, flagger):
field, *_ = data.columns
new_field = field + "_new"
iloc = slice(None, None, 2)
other_data = data.iloc[iloc]
other_data.columns = [new_field] + data.columns[1:].to_list()
this_flagger = flagger.initFlags(data).setFlags(field, flag=flagger.BAD)
other_flagger = flagger.initFlags(other_data)
result_flagger = this_flagger.setFlagger(other_flagger)
assert np.all(
result_flagger.getFlags(new_field, loc=other_data.index)
== other_flagger.getFlags(new_field)
)
assert np.all(
result_flagger.getFlags(new_field, loc=data.index) == flagger.UNFLAGGED
)
@pytest.mark.parametrize("data", DATASETS)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_setFlaggerIndexDiff(data, flagger):
field, *_ = data.columns
iloc = slice(None, None, 2)
other_data = data.iloc[iloc]
other_data.index = other_data.index + pd.Timedelta(minutes=2, seconds=25)
this_flagger = flagger.initFlags(data).setFlags(field, flag=flagger.BAD)
other_flagger = flagger.initFlags(other_data)
result_flagger = this_flagger.setFlagger(other_flagger)
assert np.all(
result_flagger.getFlags(field, loc=other_data.index)
== other_flagger.getFlags(field)
)
assert np.all(
result_flagger.getFlags(field, loc=data.index) == this_flagger.getFlags(field)
)
@pytest.mark.parametrize("data", DATASETS)
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_initFlags(data, flagger):
......@@ -81,9 +148,24 @@ def test_isFlaggedDataFrame(data, flagger):
df_tests = [
(flagger.isFlagged(), mask),
(flagger.setFlags(field).isFlagged(), ~mask),
(flagger.setFlags(field, flag=flagger.GOOD).isFlagged(flag=flagger.GOOD, comparator=">"), mask),
(flagger.setFlags(field, flag=flagger.GOOD).isFlagged(flag=flagger.GOOD, comparator="<"), mask),
(flagger.setFlags(field, flag=flagger.GOOD).isFlagged(flag=flagger.GOOD, comparator="=="), ~mask),
(
flagger.setFlags(field, flag=flagger.GOOD).isFlagged(
flag=flagger.GOOD, comparator=">"
),
mask,
),
(
flagger.setFlags(field, flag=flagger.GOOD).isFlagged(
flag=flagger.GOOD, comparator="<"
),
mask,
),
(
flagger.setFlags(field, flag=flagger.GOOD).isFlagged(
flag=flagger.GOOD, comparator="=="
),
~mask,
),
]
for flags, expected in df_tests:
assert np.all(flags[field] == expected)
......@@ -106,9 +188,24 @@ def test_isFlaggedSeries(data, flagger):
series_tests = [
(flagger.isFlagged(field), mask),
(flagger.setFlags(field).isFlagged(field), ~mask),
(flagger.setFlags(field, flag=flagger.GOOD).isFlagged(field, flag=flagger.GOOD, comparator=">"), mask),
(flagger.setFlags(field, flag=flagger.GOOD).isFlagged(field, flag=flagger.GOOD, comparator="<"), mask),
(flagger.setFlags(field, flag=flagger.GOOD).isFlagged(field, flag=flagger.GOOD, comparator="=="), ~mask),
(
flagger.setFlags(field, flag=flagger.GOOD).isFlagged(
field, flag=flagger.GOOD, comparator=">"
),
mask,
),
(
flagger.setFlags(field, flag=flagger.GOOD).isFlagged(
field, flag=flagger.GOOD, comparator="<"
),
mask,
),
(
flagger.setFlags(field, flag=flagger.GOOD).isFlagged(
field, flag=flagger.GOOD, comparator="=="
),
~mask,
),
]
for flags, expected in series_tests:
assert np.all(flags == expected)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment