Skip to content
Snippets Groups Projects
Commit 0437d4e4 authored by Bert Palm's avatar Bert Palm 🎇
Browse files

fixed DmpFlagger

parent cec77ccc
No related branches found
No related tags found
4 merge requests!193Release 1.4,!188Release 1.4,!32WIP: Plot rework,!24Dios integration
Pipeline #3204 passed with stage
in 7 minutes
Subproject commit d6f3ab174632016726f49fca715ded29cea57765
Subproject commit 20321d9d8ce7c9a0eaedcd581c87773b1e59db70
......@@ -9,6 +9,7 @@ import pandas as pd
from saqc.core import run
from saqc.flagger import CategoricalFlagger
from saqc.flagger.dmpflagger import DmpFlagger, FlagFields
import dios
FLAGGERS = {
......@@ -34,6 +35,7 @@ FLAGGERS = {
def main(config, data, flagger, outfile, nodata, fail):
data = pd.read_csv(data, index_col=0, parse_dates=True,)
data = dios.DictOfSeries(data)
data_result, flagger_result = run(
config_file=config,
......
......@@ -22,13 +22,13 @@ COMPARATOR_MAP = {
"<": op.lt,
}
BaseFlaggerT = TypeVar("BaseFlaggerT")
# fixme: does DictOfSeries is pd-like ?
PandasT = Union[pd.Series, dios.DictOfSeries]
# TODO: get some real types here (could be tricky...)
LocT = Any
IlocT = Any
FlagT = Any
diosT = dios.DictOfSeries
BaseFlaggerT = TypeVar("BaseFlaggerT")
# fixme: does DictOfSeries is pd-like ?
PandasT = Union[pd.Series, diosT]
class BaseFlagger(ABC):
......@@ -39,9 +39,13 @@ class BaseFlagger(ABC):
# NOTE: the arggumens of setFlags supported from
# the configuration functions
self.signature = ("flag",)
self._flags: dios.DictOfSeries = dios.DictOfSeries()
self._flags: diosT = dios.DictOfSeries()
def initFlags(self, data: dios.DictOfSeries = None, flags: dios.DictOfSeries = None) -> BaseFlaggerT:
@property
def flags(self):
return self._flags
def initFlags(self, data: diosT = None, flags: diosT = None) -> BaseFlaggerT:
"""
initialize a flagger based on the given 'data' or 'flags'
if 'data' is not None: return a flagger with flagger.UNFALGGED values
......@@ -52,14 +56,15 @@ class BaseFlagger(ABC):
raise TypeError("either 'data' or 'flags' are required")
if data is not None:
assert isinstance(data, dios.DictOfSeries)
assert isinstance(data, diosT)
flags = data.copy()
flags[:] = self.UNFLAGGED
else:
assert isinstance(flags, dios.DictOfSeries)
assert isinstance(flags, diosT)
# self._flags ist set implicit by copy()
return self.copy(flags.astype(self.dtype))
newflagger = self.copy()
newflagger._flags = flags.astype(self.dtype)
return newflagger
def setFlagger(self, other: BaseFlaggerT):
"""
......@@ -69,8 +74,8 @@ class BaseFlagger(ABC):
if not isinstance(other, self.__class__):
raise TypeError(f"flagger of type '{self.__class__}' needed")
this = self._flags
other = other._flags
this = self.flags
other = other.flags
# use dios.merge() as soon as it implemented
# see https://git.ufz.de/rdm/dios/issues/15
......@@ -86,13 +91,17 @@ class BaseFlagger(ABC):
for c in newcols:
new[c] = other[c].copy()
return self.copy(new)
newflagger = self.copy()
newflagger._flags = new
return newflagger
def getFlagger(self, field: str = None, loc: LocT = None) -> BaseFlaggerT:
""" Return a potentially trimmed down copy of self. """
flags = self.getFlags(field=field, loc=loc)
flags = dios.to_dios(flags)
return self.copy(flags)
newflagger = self.copy()
newflagger._flags = flags
return newflagger
def getFlags(self, field: str = None, loc: LocT = None) -> PandasT:
""" Return a potentially, to `loc`, trimmed down version of flags.
......@@ -111,7 +120,7 @@ class BaseFlagger(ABC):
# loc should be a valid 2D-indexer and
# then field must be None. Otherwise aloc
# will fail and throw the correct Error.
if isinstance(loc, dios.DictOfSeries) and field is None:
if isinstance(loc, diosT) and field is None:
indexer = loc
else:
......@@ -119,7 +128,7 @@ class BaseFlagger(ABC):
field = slice(None) if field is None else self._check_field(field)
indexer = (loc, field)
return self._flags.aloc[indexer]
return self.flags.aloc[indexer]
def setFlags(self, field: str, loc: LocT = None, flag: FlagT = None, force: bool = False, **kwargs) -> BaseFlaggerT:
"""Overwrite existing flags at loc.
......@@ -162,12 +171,8 @@ class BaseFlagger(ABC):
flagged = flags.notna() & cp(flags, flag)
return flagged
def copy(self, flags: dios.DictOfSeries = None) -> BaseFlaggerT:
assert isinstance(flags, dios.DictOfSeries)
out = deepcopy(self)
if flags is not None:
out._flags = flags
return out
def copy(self) -> BaseFlaggerT:
return deepcopy(self)
def _check_field(self, field):
""" Check if (all) field(s) in self._flags. """
......@@ -176,12 +181,12 @@ class BaseFlagger(ABC):
# https://git.ufz.de/rdm-software/saqc/issues/46
failed = []
if isinstance(field, str):
if field not in self._flags:
if field not in self.flags:
failed += [field]
else:
try:
for f in field:
if f not in self._flags:
if f not in self.flags:
failed += [f]
# not iterable, probably a slice or
# any indexer we dont have to check
......
......@@ -25,14 +25,6 @@ class CategoricalFlagger(BaseFlagger):
super().__init__(dtype=Flags(flags))
self._categories = self.dtype.categories
def _isDtype(self, flag) -> bool:
"""
not needed here, move out
"""
if isinstance(flag, pd.Series):
return isinstance(flag.dtype, pd.CategoricalDtype) and flag.dtype == self.dtype
return flag in self.dtype.categories
@property
def UNFLAGGED(self):
return self._categories[0]
......
......@@ -10,6 +10,7 @@ import pandas as pd
import dios.dios as dios
from saqc.flagger.categoricalflagger import CategoricalFlagger
from saqc.flagger.baseflagger import diosT
from saqc.lib.tools import assertDictOfSeries, toSequence, assertScalar
......@@ -33,9 +34,6 @@ FLAGS = ["NIL", "OK", "DOUBTFUL", "BAD"]
class DmpFlagger(CategoricalFlagger):
def __init__(self):
# fixme: DmpFlagger
raise NotImplementedError
super().__init__(FLAGS)
self.flags_fields = [FlagFields.FLAG, FlagFields.CAUSE, FlagFields.COMMENT]
version = subprocess.run(
......@@ -44,6 +42,21 @@ class DmpFlagger(CategoricalFlagger):
self.project_version = version.decode().strip()
self.signature = ("flag", "comment", "cause", "force")
self._flags = None
self._causes = None
self._comments = None
# defined in BaseFlagger
# @property
# def flags(self):
# return self._flags
@property
def causes(self):
return self._causes
@property
def comments(self):
return self._comments
def initFlags(self, data: dios.DictOfSeries = None, flags: dios.DictOfSeries = None):
"""
......@@ -52,69 +65,40 @@ class DmpFlagger(CategoricalFlagger):
if 'flags' is not None: return a flagger with the given flags
"""
if data is not None:
flags = dios.DictOfSeries(data="", columns=self._getColumnIndex(data.columns), index=data.index,)
flags.loc[:, self._getColumnIndex(data.columns, [FlagFields.FLAG])] = self.UNFLAGGED
elif flags is not None:
if not isinstance(flags.columns, pd.MultiIndex):
cols = flags.columns
flags = flags.copy()
flags.columns = self._getColumnIndex(cols, [FlagFields.FLAG])
flags = flags.reindex(columns=self._getColumnIndex(cols), fill_value="")
else:
raise TypeError("either 'data' or 'flags' are required")
# implicit set self._flags, and make deepcopy of self aka. DmpFlagger
newflagger = super().initFlags(data=data, flags=flags)
newflagger._causes = newflagger.flags.astype(str)
newflagger._comments = newflagger.flags.astype(str)
newflagger.causes[:], newflagger.comments[:] = "", ""
return newflagger
return self.copy(self._assureDtype(flags))
def getFlagger(self, field=None, loc=None):
newflagger = super().getFlagger(field=field, loc=loc)
flags = newflagger.flags
newflagger._causes = self._causes.aloc[flags, ...]
newflagger._comments = self._comments.aloc[flags, ...]
return newflagger
def getFlagger(self, field=None, loc=None, iloc=None):
# NOTE: we need to preserve all indexing levels
assertScalar("field", field, optional=True)
variables = self._flags.columns.get_level_values(ColumnLevels.VARIABLES).drop_duplicates()
cols = toSequence(field, variables)
out = super().getFlagger(field, loc, iloc)
out._flags.columns = self._getColumnIndex(cols)
return out
def getFlags(self, field=None, loc=None, iloc=None):
assertScalar("field", field, optional=True)
field = field or slice(None)
mask = self._locatorMask(field, loc, iloc)
flags = self._flags.xs(FlagFields.FLAG, level=ColumnLevels.FLAGS, axis=1).copy()
return super()._assureDtype(flags.loc[mask, field])
def getFlags(self, field=None, loc=None):
return super().getFlags(field=field, loc=loc)
def setFlags(self, field, loc=None, iloc=None, flag=None, force=False, comment="", cause="", **kwargs):
assertScalar("field", field, optional=True)
def setFlags(self, field, loc=None, flag=None, force=False, comment="", cause="", **kwargs):
assert "iloc" not in kwargs, "deprecated keyword, iloc"
assertScalar("field", field, optional=False)
flag = self.BAD if flag is None else self._checkFlag(flag)
flag = self.BAD if flag is None else flag
comment = json.dumps(dict(comment=comment, commit=self.project_version, test=kwargs.get("func_name", "")))
comment = json.dumps({"comment": comment, "commit": self.project_version, "test": kwargs.get("func_name", ""),})
this = self.getFlags(field=field)
other = self._broadcastFlags(field=field, flag=flag)
mask = self._locatorMask(field, loc, iloc)
if not force:
mask &= (this < other).values
if force:
row_indexer = loc
else:
# trim flags to loc, we always get a pd.Series returned
this = self.getFlags(field=field, loc=loc)
row_indexer = this < flag
out = deepcopy(self)
out._flags.loc[mask, field] = other[mask], cause, comment
out._flags.aloc[row_indexer, field] = flag
out._causes.aloc[row_indexer, field] = cause
out._comments.aloc[row_indexer, field] = comment
return out
def _getColumnIndex(
self, cols: Union[str, Sequence[str]], fields: Union[str, Sequence[str]] = None
) -> pd.MultiIndex:
cols = toSequence(cols)
fields = toSequence(fields, self.flags_fields)
return pd.MultiIndex.from_product([cols, fields], names=[ColumnLevels.VARIABLES, ColumnLevels.FLAGS])
def _assureDtype(self, flags):
# NOTE: building up new DataFrames is significantly
# faster than assigning into existing ones
tmp = OrderedDict()
for (var, flag_field) in flags.columns:
col_data = flags[(var, flag_field)]
if flag_field == FlagFields.FLAG:
col_data = col_data.astype(self.dtype)
else:
col_data = col_data.astype(str)
tmp[(var, flag_field)] = col_data
return dios.DictOfSeries(tmp, columns=flags.columns, index=flags.index)
......@@ -23,7 +23,7 @@ TESTNODATA = (np.nan, -9999)
TESTFLAGGER = (
CategoricalFlagger(["NIL", "GOOD", "BAD"]),
SimpleFlagger(),
# DmpFlagger(),
DmpFlagger(),
# ContinuousFlagger(),
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment