Skip to content
Snippets Groups Projects
Commit 24845912 authored by Bert Palm's avatar Bert Palm 🎇
Browse files

fixed some bugs, improved setFlags, added tests for setFlags

parent 8044ded7
No related branches found
No related tags found
No related merge requests found
......@@ -66,15 +66,30 @@ class BaseFlagger:
check_ispdlike(flags, 'flags', allow_multiindex=False)
return flags
def setFlags(self, flags: pd.DataFrame, field, loc=None, iloc=None, flag=None, **kwargs) -> pd.DataFrame:
def setFlags(self, flags: pd.DataFrame, field, loc=None, iloc=None, flag=None, force=False, **kwargs):
check_isdf(flags, 'flags', allow_multiindex=False)
# prepare
flags = self._assureDtype(flags, field)
flags = self._assureDtype(flags, field).copy()
flag = self.BAD if flag is None else self._checkFlag(flag)
flags_loc, rows, col = self._getIndexer(flags, field, loc, iloc)
# set
mask = flags_loc[rows, col] < flag
idx = mask[mask].index
if isinstance(flag, pd.Series):
if len(flags.index) != len(flags):
raise ValueError('Length of flags and flag must match')
i, r, _ = self._getIndexer(flag, field, loc, iloc)
flag = i[r].squeeze()
if force:
mask = [True] * len(rows)
idx = flags_loc[rows, col].index
else:
mask = flags_loc[rows, col] < flag
idx = mask[mask].index
if isinstance(flag, pd.Series):
flag = flag[mask]
flags.loc[idx, field] = flag
return self._assureDtype(flags, field)
......@@ -86,9 +101,9 @@ class BaseFlagger:
def _checkFlag(self, flag):
if isinstance(flag, pd.Series):
if flag.dtype != self.flags:
raise TypeError(f"Passed flags series is of invalid '{flag.dtype}' dtype. "
f"Expected {self.flags} type with ordered categories {list(self.flags.categories)}")
if not self._isFlagsDtype(flag.dtype):
raise TypeError(f"flag(-series) is not of expected '{self.flags}'-dtype with ordered categories "
f"{list(self.flags.categories)}, '{flag.dtype}'-dtype was passed.")
else:
if flag not in self.flags:
raise ValueError(f"Invalid flag '{flag}'. Possible choices are {list(self.flags.categories)[1:]}")
......@@ -108,10 +123,13 @@ class BaseFlagger:
def _assureDtype(self, flags, field=None):
if field is None: # we got a df
flags = flags.astype(self.flags)
elif not isinstance(flags[field].dtype, pd.Categorical):
elif not self._isFlagsDtype(flags[field].dtype):
flags[field] = flags[field].astype(self.flags)
return flags
def _isFlagsDtype(self, dtype):
return isinstance(dtype, pd.CategoricalDtype) and dtype == self.flags
def nextTest(self):
pass
......
......@@ -66,19 +66,26 @@ class DmpFlagger(BaseFlagger):
check_isdfmi(flags, 'flags')
# prepare
comment = json.dumps(dict(comment=comment, commit=self.project_version, test=kwargs.get("func_name", "")))
flags = self._assureDtype(flags, field)
flags = self._assureDtype(flags, field).copy()
flag = self.BAD if flag is None else self._checkFlag(flag)
# set
flags = flags.copy()
indexer, rows, col = self._getIndexer(self.getFlags(flags), field, loc, iloc)
flags_loc, rows, col = self._getIndexer(self.getFlags(flags), field, loc, iloc)
if isinstance(flag, pd.Series):
if len(flags.index) != len(flags):
raise ValueError('Length of flags and flag must match')
i, r, _ = self._getIndexer(flag, field, loc, iloc)
flag = i[r]
flag = i[r].squeeze()
if force:
idx = indexer[rows, col].index
mask = [True] * len(rows)
idx = flags_loc[rows, col].index
else:
mask = indexer[rows, col] < flag
mask = flags_loc[rows, col] < flag
idx = mask[mask].index
if isinstance(flag, pd.Series):
flag = flag[mask]
flags.loc[idx, field] = flag, cause, comment
return self._assureDtype(flags, field)
......@@ -95,6 +102,6 @@ class DmpFlagger(BaseFlagger):
flags = super()._assureDtype(flags, None)
else: # we got a df with a multi-index
flags = flags.astype({c: self.flags for c in flags.columns if FlagFields.FLAG in c})
elif not isinstance(flags[(field, FlagFields.FLAG)].dtype, pd.Categorical):
elif not isinstance(flags[(field, FlagFields.FLAG)].dtype, pd.CategoricalDtype):
flags[(field, FlagFields.FLAG)] = flags[(field, FlagFields.FLAG)].astype(self.flags)
return flags
#!/usr/bin/env python
"""
docstring: TODO
"""
__author__ = "Bert Palm"
__email__ = "bert.palm@ufz.de"
__copyright__ = "Copyright 2018, Helmholtz-Zentrum für Umweltforschung GmbH - UFZ"
\ No newline at end of file
__copyright__ = "Copyright 2018, Helmholtz-Zentrum für Umweltforschung GmbH - UFZ"
import pytest
import numpy as np
import pandas as pd
from saqc.flagger.baseflagger import BaseFlagger
from saqc.flagger.dmpflagger import DmpFlagger
from saqc.flagger.simpleflagger import SimpleFlagger
from pandas.core.indexing import IndexingError
from saqc.funcs.functions import flagRange, flagSesonalRange, forceFlags, clearFlags
TESTFLAGGERS = [
BaseFlagger(['NIL', 'GOOD', 'BAD']),
DmpFlagger(),
SimpleFlagger()]
@pytest.mark.parametrize('flagger', TESTFLAGGERS)
def test_initFlags(flagger):
field = 'testdata'
index = pd.date_range(start='2011-01-01', end='2011-01-02', periods=100)
data = pd.DataFrame(data={field: np.linspace(0, index.size - 1, index.size)}, index=index)
flags = flagger.initFlags(data)
assert len(flags) == 100
assert isinstance(flags, pd.DataFrame)
@pytest.mark.parametrize('flagger', TESTFLAGGERS)
def test_getsetFlags(flagger):
field = 'testdata'
index = pd.date_range(start='2011-01-01', end='2011-01-02', periods=100)
data = pd.DataFrame(data={field: np.linspace(0, index.size - 1, index.size)}, index=index)
flags = flagger.initFlags(data)
flags = flagger.setFlags(flags, field, flag=flagger.GOOD)
flagged = flagger.getFlags(flags)[field]
assert isinstance(flagged.dtype, pd.CategoricalDtype)
assert (flagged == flagger.GOOD).all()
flags = flagger.setFlags(flags, field, flag=flagger.BAD)
flagged = flagger.getFlags(flags)[field]
assert (flagged == flagger.BAD).all()
flags = flagger.setFlags(flags, field, flag=flagger.GOOD)
flagged = flagger.getFlags(flags)[field]
assert (flagged == flagger.BAD).all()
@pytest.mark.parametrize('flagger', TESTFLAGGERS)
def test_setFlags_isFlagged(flagger, **kwargs):
field = 'testdata'
index = pd.date_range(start='2011-01-01', end='2011-01-02', periods=100)
data = pd.DataFrame(data={field: np.linspace(0, index.size - 1, index.size)}, index=index)
flags = flagger.initFlags(data)
d = data[field]
mask = d < (d.max() - d.min()) // 2
assert len(mask) == len(flags.index)
f = flagger.setFlags(flags, field, loc=mask.values, flag=flagger.BAD)
# test isFlagged
isflagged = flagger.isFlagged(f[field])
assert (isflagged == mask).all()
# test setFlag with mask
flagged = flagger.getFlags(f[field])
isflagged = flagged == flagger.BAD
assert (isflagged == mask).all()
# ok we can use isFlagged now :D
# test with mask and iloc
f = flagger.setFlags(flags, field, iloc=mask.values, flag=flagger.BAD)
isflagged = flagger.isFlagged(f[field])
assert (isflagged == mask).all()
try:
m = mask[mask]
m.iloc[0:10] = False
m = m[m]
f = flagger.setFlags(flags, field, loc=m, flag=flagger.BAD)
except IndexingError:
pass
else:
raise AssertionError
# test setFlags with loc and index
idx = mask[mask].index
assert len(idx) < len(flags.index)
f = flagger.setFlags(flags, field, loc=idx, flag=flagger.BAD)
isflagged = flagger.isFlagged(f[field])
assert (isflagged == mask).all()
# test setFlags with iloc and index
idx = mask[mask].reset_index(drop=True).index
assert len(idx) < len(flags.index)
f = flagger.setFlags(flags, field, iloc=idx, flag=flagger.BAD)
isflagged = flagger.isFlagged(f[field])
assert (isflagged == mask).all()
# test passing a series of flags as flag-arg
every = 5
flagseries = pd.Series(data=flagger.GOOD, index=flags.index)
flagseries.iloc[::every] = flagger.BAD
flagseries = flagseries.astype(flagger.flags)
idx = mask[mask].index
assert len(flags) == len(flagseries)
assert len(flags) != len(idx)
f = flagger.setFlags(flags, field, loc=idx, flag=flagseries)
bads = flagger.isFlagged(f[field], flag=flagger.BAD, comparator='==')
bads = bads[bads]
valid = mask[mask].iloc[::every]
assert len(valid) == len(bads) and (valid == bads).all()
# test passing a series of flags as flag-arg and force
f = flagger.setFlags(flags, field, flag=flagger.BAD)
every = 5
flagseries = pd.Series(data=flagger.GOOD, index=flags.index)
flagseries.iloc[::every] = flagger.UNFLAGGED
flagseries = flagseries.astype(flagger.flags)
idx = mask[mask].index
assert len(flags) == len(flagseries)
assert len(flags) != len(idx)
f = flagger.setFlags(f, field, loc=idx, flag=flagseries, force=True)
unflagged = flagger.isFlagged(f[field], flag=flagger.UNFLAGGED, comparator='==')
unflagged = unflagged[unflagged]
valid = mask[mask].iloc[::every]
assert len(valid) == len(unflagged) and (valid == unflagged).all()
if __name__ == '__main__':
flagger = TESTFLAGGERS[0]
test_setFlags_isFlagged(flagger)
print('done')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment