diff --git a/saqc/__init__.py b/saqc/__init__.py index 295297f3684e14ab02404f0e5f82fa75b01e8266..4cdea6565e403738206c7be3e6f39349c19f8d4b 100644 --- a/saqc/__init__.py +++ b/saqc/__init__.py @@ -6,7 +6,7 @@ __version__ = "1.4" # import order: from small to big from saqc.constants import * from saqc.core import ( - register, + flagging, initFlagsLike, Flags, SaQC, diff --git a/saqc/core/__init__.py b/saqc/core/__init__.py index 6f1838ff287bd30f7317d3c9f39e240f742f7391..c76d19296d937779c8601a6eeee3823b898e9554 100644 --- a/saqc/core/__init__.py +++ b/saqc/core/__init__.py @@ -1,7 +1,7 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- -from saqc.core.register import register +from saqc.core.register import flagging, processing from saqc.core.flags import Flags, initFlagsLike from saqc.core.core import SaQC, logger from saqc.core.translator import FloatTranslator, DmpTranslator, PositionalTranslator diff --git a/saqc/core/core.py b/saqc/core/core.py index 12a3ce059512d4f200f879b2ac5ffde1438ab542..7bd235e5c1bb90ca83a2238671e75abbeacbb89c 100644 --- a/saqc/core/core.py +++ b/saqc/core/core.py @@ -23,7 +23,6 @@ from saqc.lib.tools import toSequence from saqc.lib.types import ( ExternalFlag, CallGraph, - MaterializedGraph, PandasLike, ) @@ -162,9 +161,6 @@ class SaQC(FuncModules): # with regular expressions, we can't just reuse the original execution # plan to infer all translation related information. self._planned: CallGraph = [] # will be filled by calls to `_wrap` - self._computed: MaterializedGraph = self._translator.buildGraph( - self._flags - ) # will be filled in `evaluate` @staticmethod def _initFlags(data: DictOfSeries, flags: Optional[Flags]) -> Flags: @@ -240,10 +236,7 @@ class SaQC(FuncModules): An updated SaQC Object incorporating the requested computations """ - # NOTE: It would be nicer to separate the plotting into an own - # method instead of intermingling it with the computation data, flags = self._data, self._flags - computed: MaterializedGraph = [] for selector, control, function in self._planned: logger.debug( f"processing: {selector.field}, {function.name}, {function.keywords}" @@ -257,7 +250,6 @@ class SaQC(FuncModules): # (eg. `TypeError: got multiple values for argument 'data'`, # when the user pass data=...) _warnForUnusedKwargs(function, self._translator) - computed.append((selector, function)) except Exception as e: _handleErrors(e, selector.field, control, function, self._error_policy) continue @@ -265,9 +257,7 @@ class SaQC(FuncModules): data = data_result flags = flags_result - return self._construct( - _flags=flags, _data=data, _computed=self._computed + computed - ) + return self._construct(_flags=flags, _data=data) def getResult( self, raw=False @@ -286,7 +276,7 @@ class SaQC(FuncModules): if raw: return data, flags - return data.to_df(), self._translator.backward(flags, realization._computed) + return data.to_df(), self._translator.backward(flags) def _wrap(self, func: SaQCFunction): def inner( diff --git a/saqc/core/flags.py b/saqc/core/flags.py index 76bdd93180b6e79801d1113fc34b92f21d3598fe..730ec1e987cc2815d338e37786ead8c33fe1948c 100644 --- a/saqc/core/flags.py +++ b/saqc/core/flags.py @@ -24,7 +24,7 @@ SelectT = Union[ Tuple[pd.Index, _Field], Tuple[slice, _Field], ] -ValueT = Union[pd.Series, "Flags", Iterable, float] +ValueT = Union[pd.Series, Iterable, float] class _HistAccess: @@ -34,16 +34,11 @@ class _HistAccess: def __getitem__(self, key: str) -> History: return self.obj._data[key].copy() - def __setitem__(self, key: str, value: Union[History, pd.DataFrame]): - if not isinstance(value, History): - value = History(value) - + def __setitem__(self, key: str, value: History): if not isinstance(value, History): raise TypeError("Not a History") - History._validateHistWithMask(value.hist, value.mask) self.obj._validateHistForFlags(value) - self.obj._data[key] = value @@ -204,8 +199,7 @@ class Flags: f"cannot init from '{type(data).__name__}' of '{type(item).__name__}'" ) - # make a UNFLAGGED-column and then append the actual item - result[k] = _simpleHist(item.index).append(item, force=True) + result[k] = History(item.index).append(item, force=True) return result @@ -218,9 +212,6 @@ class Flags: if colname: errm += f"of column {colname} " - if (history.hist[0] != UNFLAGGED).any(): - raise ValueError(errm + "missing an UNFLAGGED-column at first position") - # this ensures that the mask does not shadow UNFLAGGED with a NaN. if history.max().hasnans: raise ValueError(errm + "is not valid (result of max() contains NaNs)") @@ -337,9 +328,9 @@ class Flags: return if key not in self._data: - self._data[key] = _simpleHist(value.index) + self._data[key] = History(value.index) - self._data[key].append(value, force=True) + self._data[key].append(value, force=True, meta=None) def __delitem__(self, key): self._data.pop(key) @@ -506,17 +497,6 @@ def initFlagsLike( if not isinstance(item, (pd.Series, History)): raise TypeError("items in reference must be of type pd.Series") - result[k] = _simpleHist(item.index) + result[k] = History(item.index) return Flags(result) - - -def _simpleHist(index) -> History: - """ - Make a single columned History from an index and an initial value. - - Notes - ----- - For internal use only. - """ - return History(pd.DataFrame(UNFLAGGED, index=index, columns=[0], dtype=float)) diff --git a/saqc/core/history.py b/saqc/core/history.py index 5be12bd61dadc2259a5f0b8807884c08bface08f..d30395c3cb623c6e7440681250b100f573d49ef4 100644 --- a/saqc/core/history.py +++ b/saqc/core/history.py @@ -1,10 +1,14 @@ #!/usr/bin/env python from __future__ import annotations -from typing import Tuple, Type, Union +from copy import deepcopy, copy +import itertools + +from typing import Dict, Tuple, Type, Union, List, Any from typing_extensions import Literal import pandas as pd import numpy as np + from saqc.constants import * @@ -15,9 +19,7 @@ class History: The flag-history (FH) stores the history of a flags column. Each time ``append`` is called a new column is appended to the FH. The column names are increasing integers starting with 0. After initialisation - the FH is empty and has no columns at all. If an initial `UNFLAGGED`- - column is desired, it must be created manually, or passed via the ``hist`` - parameter. The same way a new FH can be created from an existing one. + the FH is empty and has no columns at all. To get the worst flags (highest value) that are currently stored in the FH, we provide a ``max()`` method. It returns a pd.Series indicating @@ -34,52 +36,19 @@ class History: Parameters ---------- - hist : pd.Dataframe, default None - if None a empty FH is created, otherwise the existing dataframe - is taken as the initial history. - - mask : pd.Dataframe, default None - a mask holding the boolean force values. It must match the passed - ``hist``. If None an matching mask is created, assuming force never - was passed to any test. + index: pd.Index + A index that fit the flags to be insert. - copy : bool, default False - If True, the input data is copied, otherwise not. + See Also + -------- + createHistoryFromData: function to create History from existing data """ - def __init__( - self, hist: pd.DataFrame = None, mask: pd.DataFrame = None, copy: bool = False - ): + def __init__(self, index: pd.Index): - # this is a hidden _feature_ and not exposed by the type - # of the hist parameter and serve as a fastpath for internal - # fast creation of a new FH, where no checks are needed. - if isinstance(hist, History): - # keep this order, otherwise hist.mask - # will refer to pd.Dataframe.mask - mask = hist.mask - hist = hist.hist - - elif hist is None and mask is None: - hist = pd.DataFrame() - mask = pd.DataFrame() - - elif hist is None and mask is not None: - raise ValueError("Cannot take 'mask' without 'hist'") - - elif hist is not None and mask is None: - hist = self._validateHist(hist) - mask = pd.DataFrame(True, index=hist.index, columns=hist.columns) - - else: - hist, mask = self._validateHistWithMask(hist, mask) - - if copy: - hist = hist.copy() - mask = mask.copy() - - self.hist = hist.astype("category", copy=copy) - self.mask = mask + self.hist = pd.DataFrame(index=index) + self.mask = pd.DataFrame(index=index) + self.meta = [] @property def index(self) -> pd.Index: @@ -171,7 +140,7 @@ class History: return self def append( - self, value: Union[pd.Series, pd.DataFrame, History], force=False + self, value: Union[pd.Series, History], force: bool = False, meta: dict = None ) -> History: """ Create a new FH column and insert given pd.Series to it. @@ -200,6 +169,10 @@ class History: the first ``N`` columns of the passed history are discarded, where ``N`` is the number of existing columns in the current history. + meta : dict, default None + metadata dictionary to store with the series. Ignored if ``value`` is of + type History. None defaults to a empty dictionary. + Returns ------- history with appended series @@ -212,15 +185,21 @@ class History: if isinstance(value, History): return self._appendHistory(value, force=force) - if isinstance(value, pd.Series): - value = value.to_frame() + if not isinstance(value, pd.Series): + raise TypeError("'value' is not a pd.Series") + + if meta is None: + meta = {} - for _, val in value.items(): - val = self._validateValue(val) - if len(self) > 0 and not val.index.equals(self.index): - raise ValueError("Index does not match") + if not isinstance(meta, dict): + raise TypeError("'meta' must be of type None or dict") + + val = self._validateValue(value) + if not val.index.equals(self.index): + raise ValueError("Index does not match") - self._insert(val, pos=len(self), force=force) + self._insert(val, pos=len(self), force=force) + self.meta.append(deepcopy(meta)) return self def _appendHistory(self, value: History, force: bool = False): @@ -248,18 +227,20 @@ class History: ----- This ignores the column names of the passed History. """ - self._validateHistWithMask(value.hist, value.mask) - if len(self) > 0 and not value.index.equals(self.index): + self._validate(value.hist, value.mask, value.meta) + if not value.index.equals(self.index): raise ValueError("Index does not match") n = len(self.columns) # don't overwrite the `.columns` of the input down the line value_hist = value.hist.copy(deep=False) value_mask = value.mask.copy(deep=False) + value_meta = deepcopy(value.meta) if not force: value_hist = value_hist.iloc[:, n:] value_mask = value_mask.iloc[:, n:] + value_meta = value_meta[n:] # rename columns, to avoid ``pd.DataFrame.loc`` become confused columns = pd.Index(range(n, n + len(value_hist.columns))) @@ -269,56 +250,11 @@ class History: # clear the current mask self.mask.loc[(~value_mask & value_hist.notna()).any(axis="columns")] = False - self.hist.loc[:, columns] = value_hist.astype("category") + hist = self.hist.astype(float) + hist.loc[:, columns] = value_hist.astype(float) + self.hist = hist.astype("category", copy=True) self.mask.loc[:, columns] = value_mask.copy() - return self - - def squeeze(self, n: int) -> History: - """ - Squeeze last `n` columns to a single column. - - This **not** changes the result of ``History.max()``. - - Parameters - ---------- - n : int - last n columns to squeeze - - Notes - ----- - The new column number (column name) will be the lowest of - the squeezed. This ensure that the column numbers are always - monotonically increasing. - - Bear in mind, this works inplace, if a copy is needed, call ``copy`` before. - - Returns - ------- - History - squeezed history - """ - if n <= 1: - return self - - if n > len(self): - raise ValueError(f"'n={n}' cannot be greater than columns in the FH") - - # calc the squeezed series. - # we dont have to care about any forced series - # because anytime force was given, the False's in - # the mask were propagated back over the whole FH - mask = self.mask.iloc[:, -n:] - hist = self.hist.iloc[:, -n:] - s = hist[mask].max(axis=1) - - # slice self down - # this may leave us in an unstable state, because - # the last column maybe is not entirely True, but - # the following append, will fix this - self.hist = self.hist.iloc[:, :-n].astype("category") - self.mask = self.mask.iloc[:, :-n] - - self.append(s) + self.meta += value_meta return self def idxmax(self) -> pd.Series: @@ -329,9 +265,11 @@ class History: ------- pd.Series: maximum values """ + if self.mask.empty: + return pd.Series(np.nan, index=self.index) return self.hist[self.mask].astype(float).idxmax(axis=1) - def max(self) -> pd.Series: + def max(self, raw=False) -> pd.Series: """ Get the maximum value per row of the FH. @@ -339,34 +277,11 @@ class History: ------- pd.Series: maximum values """ - return self.hist[self.mask].max(axis=1) - - @property - def _constructor(self) -> Type["History"]: - return History - - def copy(self, deep=True) -> History: - """ - Make a copy of the FH. - - Parameters - ---------- - deep : bool, default True - - ``True``: make a deep copy - - ``False``: make a shallow copy - - Returns - ------- - copy : History - the copied FH - """ - new = self._constructor() - new.hist = self.hist - new.mask = self.mask - if deep: - new.hist = new.hist.copy() - new.mask = new.mask.copy() - return new + result = self.hist[self.mask].max(axis=1) + if raw: + return result + else: + return result.fillna(UNFLAGGED) def reindex(self, index: pd.Index, fill_value_last: float = UNFLAGGED) -> History: """ @@ -377,7 +292,8 @@ class History: index : pd.Index the index to reindex to. fill_value_last : float, default UNFLAGGED - value to fill nan's (UNTOUCHED) in the last column. Defaults to 0 (UNFLAGGED). + value to fill nan's (UNTOUCHED) in the last column. + Defaults to 0 (UNFLAGGED). Returns ------- @@ -393,21 +309,122 @@ class History: mask.iloc[:, -1:] = True self.mask = mask.astype(bool) - self.hist.hist = hist.astype("category") + self.hist = hist.astype("category") return self - def __copy__(self, deep: bool = True): - return self.copy(deep=deep) + def apply( + self, + index: pd.Index, + hist_func: callable, + hist_kws: dict, + mask_func: callable, + mask_kws: dict, + func_handle_df: bool = False, + copy: bool = True, + ): + """ + Apply a function on each column in history. + + Two functions must be given. Both are called for each column in the History + unless ``func_handle_df=True`` is given. One is called on ``History.hist``, + the other on ``History.mask``. Both function must take a `pd.Series` as first + arg, which is a column from `hist` respectively `mask`. If + ``func_handle_df=True`` each functions take a ``pd.DataFrame`` as first + argument, holding all columns at once. + Bear in mind: + - the functions mustn't alter the passed objects + - the functions are not allowed to add or remove columns + - the function must return same type as first argument + - the returned object must have same index as the passed ``index`` to ``apply`` + as first argument + + Parameters + ---------- + index: pd.Index + Index the new history should have. This is used to ensure the passed + functions worked correct and also used if the function does not apply, + because the initial history is empty. Then the altered empty history is + reindexed to this index. + + hist_func : callable + function to apply on `History.hist` (flags DataFrame) + + hist_kws : dict + hist-function keywords dict + + mask_func : callable + function to apply on `History.mask` (force mask DataFrame) + + mask_kws : dict + mask-function keywords dict + + func_handle_df : bool, default False + If `True`, the Dataframe under `History`.hist, respectively `History.mask` + is passed to the given functions, thus both(!) function must handle + `pd.Dataframes` as first input. If `False`, each column is passed separately, + thus the functions must handle those. + + copy : bool, default True + If False, alter the underlying history, otherwise return a copy. + + Notes + ----- + After the functions are called, all `NaN`'s in `History.mask` are replaced by + `False`, and the `.mask` is casted to bool, to ensure a consistent History. - def __deepcopy__(self, memo=None): + Returns + ------- + history with altered columns """ + hist = pd.DataFrame(index=index) + mask = pd.DataFrame(index=index) + + if func_handle_df: + # we need to pass the data as floats as functions may fail with Categorical + hist = hist_func(self.hist.astype(float), **hist_kws) + mask = mask_func(self.mask, **mask_kws) + + else: + for pos in self.columns: + hist[pos] = hist_func(self.hist[pos].astype(float), **hist_kws) + mask[pos] = mask_func(self.mask[pos], **mask_kws) + + # handle unstable state + mask.iloc[:, -1:] = True + + History._validate(hist, mask, self.meta) + + if copy: + history = History(index=None) # noqa + history.meta = deepcopy(self.meta) + else: + history = self + + history.hist = hist.astype("category") + history.mask = mask.fillna(True).astype(bool) + + return history + + def copy(self, deep=True) -> History: + """ + Make a copy of the FH. + Parameters ---------- - memo, default None - Standard signature. Unused + deep : bool, default True + - ``True``: make a deep copy + - ``False``: make a shallow copy + + Returns + ------- + copy : History + the copied FH """ - return self.copy(deep=True) + if deep: + return deepcopy(self) + else: + return copy(self) def __len__(self) -> int: return len(self.hist.columns) @@ -417,77 +434,85 @@ class History: if self.empty: return str(self.hist).replace("DataFrame", "History") - repr = self.hist.astype(str) + r = self.hist.astype(str) m = self.mask - repr[m] = " " + repr[m] + " " - repr[~m] = "(" + repr[~m] + ")" + r[m] = " " + r[m] + " " + r[~m] = "(" + r[~m] + ")" - return str(repr)[1:] + return str(r)[1:] # -------------------------------------------------------------------------------- # validation # @staticmethod - def _validateHistWithMask( - obj: pd.DataFrame, mask: pd.DataFrame - ) -> Tuple[pd.DataFrame, pd.DataFrame]: + def _validate( + hist: pd.DataFrame, mask: pd.DataFrame, meta: List[Any] + ) -> Tuple[pd.DataFrame, pd.DataFrame, List]: """ - check type, columns, index, dtype and if the mask fits the obj. + check type, columns, index, dtype of hist and mask and if the meta fits also. """ # check hist - History._validateHist(obj) + if not isinstance(hist, pd.DataFrame): + raise TypeError( + f"'hist' must be of type pd.DataFrame, but is of type {type(hist).__name__}" + ) + # isin([float, ..]) does not work ! + if not ( + (hist.dtypes == float) + | (hist.dtypes == np.float32) + | (hist.dtypes == np.float64) + | (hist.dtypes == "category") + ).all(): + raise ValueError( + "dtype of all columns in hist must be float or categorical" + ) + + if not hist.empty and ( + not hist.columns.equals(pd.Index(range(len(hist.columns)))) + or hist.columns.dtype != int + ): + raise ValueError( + "column names must be continuous increasing int's, starting with 0." + ) # check mask if not isinstance(mask, pd.DataFrame): raise TypeError( - f"'mask' must be of type pd.DataFrame, but {type(mask).__name__} was given" + f"'mask' must be of type pd.DataFrame, but is of type {type(mask).__name__}" ) - if (mask.dtypes != bool).any(): - raise ValueError("dtype of all columns in 'mask' must be bool") + if not (mask.dtypes == bool).all(): + raise ValueError("dtype of every columns in 'mask' must be bool") if not mask.empty and not mask.iloc[:, -1].all(): raise ValueError( "the values in the last column in mask must be 'True' everywhere." ) - # check combination of hist and mask - if not obj.columns.equals(mask.columns): - raise ValueError("'hist' and 'mask' must have same columns") - - if not obj.index.equals(mask.index): - raise ValueError("'hist' and 'mask' must have same index") - - return obj, mask - - @staticmethod - def _validateHist(obj: pd.DataFrame) -> pd.DataFrame: - """ - check type, columns, dtype of obj. - """ - - if not isinstance(obj, pd.DataFrame): + # check meta + if not isinstance(meta, list): raise TypeError( - f"'hist' must be of type pd.DataFrame, but {type(obj).__name__} was given" + f"'meta' must be of type list, but is of type {type(meta).__name__}" ) + if not all([isinstance(e, dict) for e in meta]): + raise TypeError("All elements in meta must be of type 'dict'") - if obj.dtypes.isin([float, pd.Categorical]).any() is False: - raise ValueError( - "dtype of all columns in hist must be float or categorical" - ) + # check combinations of hist and mask and meta + if not hist.columns.equals(mask.columns): + raise ValueError("'hist' and 'mask' must have the same columns") - if not obj.empty and ( - not obj.columns.equals(pd.Index(range(len(obj.columns)))) - or obj.columns.dtype != int - ): + if not hist.index.equals(mask.index): + raise ValueError("'hist' and 'mask' must have the same index") + + if not len(hist.columns) == len(meta): raise ValueError( - "column names must be continuous increasing int's, starting with 0." + "'meta' must have as many entries as columns exist in hist" ) - return obj + return hist, mask, meta @staticmethod def _validateValue(obj: pd.Series) -> pd.Series: @@ -505,79 +530,56 @@ class History: return obj -def applyFunctionOnHistory( - history: History, - hist_func: callable, - hist_kws: dict, - mask_func: callable, - mask_kws: dict, - last_column: Union[pd.Series, Literal["dummy"], None] = None, - func_handle_df: bool = False, +def createHistoryFromData( + hist: pd.DataFrame, + mask: pd.DataFrame, + meta: List[Dict], + copy: bool = False, ): """ - Apply function on each column in history. - - Two functions must be given. Both are called for each column in the History unless ``func_handle_df=True`` is - given. One is called on ``History.hist``, the other on ``History.mask``. - Both function must take a pd.Series as first arg, which is the column from hist or mask respectively. If - ``func_handle_df=True`` each functions must take a ``pd.DataFrame`` as first argument, holding all columns - at once. The function must return same type as first argument. + Create a History from existing data. Parameters ---------- - history : History - History object to alter - hist_func : callable - function to apply on `History.hist` (flags DataFrame) - hist_kws : dict - hist-function keywords dict - mask_func : callable - function to apply on `History.mask` (force mask DataFrame) - mask_kws : dict - mask-function keywords dict - last_column : pd.Series or None, default None - The last column to apply. If None, no extra column is appended. - func_handle_df : bool - If `True`, the whole History{.hist, .mask} are passed to the given functions, thus the - function must handle `pd.Dataframes` as first input. If `False`, each column is passed - separately, thus the functions must handle those. - - Notes - ----- - After the functions are called, all `NaN`'s in `History.mask` are replaced with `False`, - and the `.mask` is casted to bool, to ensure a consistent History. + hist : pd.Dataframe + Data that define the flags of the history. - Returns - ------- - history with altered columns - """ - new_history = History() + mask : pd.Dataframe + The mask holding the boolean force values. The following + points must hold: - if func_handle_df: - # we need to pass the data as floats as functions may fail with Categorical - history.hist = hist_func(history.hist.astype(float), **hist_kws).astype( - "category" - ) - history.mask = hist_func(history.mask, **mask_kws) + * columns must be equal to the columns of `hist` + * the last column must be entirely `True` + * at most one change from False to True is allowed per row - else: - for pos in history.columns: - new_history.hist[pos] = hist_func( - history.hist[pos].astype(float), **hist_kws - ).astype("category") - new_history.mask[pos] = mask_func(history.mask[pos], **mask_kws) + meta : List of dict + A list holding meta information for each column, therefore it must + have the same number of entries as columns exist in `hist`. - # handle unstable state - if last_column is None: - new_history.mask.iloc[:, -1:] = True - else: - if isinstance(last_column, str) and last_column == "dummy": - last_column = pd.Series(UNTOUCHED, index=new_history.index, dtype=float) + copy : bool, default False + If `True`, the input data is copied, otherwise not. - new_history.append(last_column.astype("category"), force=True) - # assure a boolean mask and UNFLAGGED column - new_history.mask = new_history.mask.fillna(True).astype(bool) - new_history.hist.loc[:, :0] = UNFLAGGED + Notes + ----- + To create a very simple History from a flags dataframe ``f`` use + ``mask = pd.DataFrame(True, index=f.index, columns=f.columns`` + and + ``meta = [{}] * len(f.columns)``. - return new_history + Returns + ------- + History + """ + History._validate(hist, mask, meta) + + if copy: + hist = hist.copy() + mask = mask.copy() + meta = deepcopy(meta) + + history = History(index=None) # noqa + history.hist = hist.astype("category", copy=False) + history.mask = mask + history.meta = meta + return history diff --git a/saqc/core/register.py b/saqc/core/register.py index 6654d68d060b5e35f08a390562cd2361683887aa..072c68b6e6cd1cdeba677dadfac8a819cd09b10e 100644 --- a/saqc/core/register.py +++ b/saqc/core/register.py @@ -1,17 +1,16 @@ #!/usr/bin/env python -from typing import Dict, Optional, Union, Tuple +from typing import Dict, Optional, Union, Tuple, Callable from typing_extensions import Literal from functools import wraps import dataclasses import numpy as np import pandas as pd import dios -import warnings +import saqc.core.history from saqc.constants import * from saqc.core.lib import SaQCFunction -from saqc.core.flags import initFlagsLike, Flags - +from saqc.core.flags import initFlagsLike, Flags, History # NOTE: # the global SaQC function store, @@ -24,7 +23,8 @@ FuncReturnT = Tuple[dios.DictOfSeries, Flags] @dataclasses.dataclass class CallState: - func: callable + func: Callable + func_name: str data: dios.DictOfSeries flags: Flags @@ -38,7 +38,26 @@ class CallState: mask: dios.DictOfSeries -def register(masking: MaskingStrT = "all", module: Optional[str] = None): +def processing(module: Optional[str] = None): + + # executed on module import + def inner(func): + func_name = func.__name__ + if module: + func_name = f"{module}.{func_name}" + + @wraps(func) + def callWrapper(data, field, flags, *args, **kwargs): + kwargs["to_mask"] = _getMaskingThresh(kwargs) + return func(data, field, flags, *args, **kwargs) + + FUNC_MAP[func_name] = SaQCFunction(func_name, callWrapper) + return callWrapper + + return inner + + +def flagging(masking: MaskingStrT = "all", module: Optional[str] = None): # executed on module import def inner(func): @@ -65,7 +84,7 @@ def register(masking: MaskingStrT = "all", module: Optional[str] = None): def _preCall( - func: callable, args: tuple, kwargs: dict, masking: MaskingStrT, fname: str + func: Callable, args: tuple, kwargs: dict, masking: MaskingStrT, fname: str ): """ Handler that runs before any call to a saqc-function. @@ -101,7 +120,7 @@ def _preCall( control keyword-arguments passed to `_postCall` """ - mthresh = _getMaskingThresh(masking, kwargs, fname) + mthresh = _getMaskingThresh(kwargs) kwargs["to_mask"] = mthresh data, field, flags, *args = args @@ -113,6 +132,7 @@ def _preCall( # store current state state = CallState( func=func, + func_name=fname, data=data, flags=flags, field=field, @@ -157,6 +177,13 @@ def _postCall(result, old_state: CallState) -> FuncReturnT: def _getMaskingColumns(data: dios.DictOfSeries, field: str, masking: MaskingStrT): """ + Return columns to mask, by `masking` (decorator keyword) + + Depending on the `masking` kw, the following s returned: + * 'all' : all columns from data + * 'None' : empty pd.Index + * 'field': single entry Index + Returns ------- columns: pd.Index @@ -176,19 +203,14 @@ def _getMaskingColumns(data: dios.DictOfSeries, field: str, masking: MaskingStrT raise ValueError(f"wrong use of `register(masking={masking})`") -def _getMaskingThresh(masking, kwargs, fname): +def _getMaskingThresh(kwargs): """ Check the correct usage of the `to_mask` keyword, iff passed, otherwise return a default. Parameters ---------- - masking : str - The function-scope masking keyword a saqc-function is decorated with. kwargs : dict The kwargs that will be passed to the saqc-function, possibly contain ``to_mask``. - fname : str - The name of the saqc-function to be called later (not here), to use in meaningful - error messages Returns ------- @@ -228,8 +250,10 @@ def _maskData( data, flags, columns, thresh ) -> Tuple[dios.DictOfSeries, dios.DictOfSeries]: """ - Mask data with Nans by flags worse that a threshold and according to ``masking`` keyword - from the functions decorator. + Mask data with Nans, if the flags are worse than a threshold. + + - mask only passed `columns` (preselected by `masking`-kw from decorator) + - copies data in any case Returns ------- @@ -300,37 +324,65 @@ def _restoreFlags(flags: Flags, old_state: CallState): ------- Flags """ - if old_state.masking == "none": - return flags - columns = flags.columns + + if old_state.masking == "all": + pass + + # The function processed a copy of the original flags and may or may not added some + # columns. So we take only new history columns and define new flags with it, which + # are enriched with meta later + elif old_state.masking == "none": + flags = flags.copy(deep=False) + + for c in flags.columns: + # if a new field (aka. variable) was inserted, we take the full history and + # no slicing is needed, which is the hidden else-case. + if c in old_state.flags.columns: + l = len(old_state.flags.history[c].columns) + flags.history[c] = _sliceHistory(flags.history[c], slice(l, None)) + # take field column and all possibly newly added columns - if old_state.masking == "field": + elif old_state.masking == "field": columns = columns.difference(old_state.flags.columns) columns = columns.append(pd.Index([old_state.field])) + else: + raise RuntimeError(old_state.masking) + out = old_state.flags.copy() - # this implicitly squash the new flags history (RHS) - # to a single column, which than is appended to the - # old history (LHS). The new flags history possibly - # consists of multiple columns, one for each time a - # series or scalar was passed to the flags. + # this implicitly squash the new flags history (RHS) to a single column, which than + # is appended to the old history (LHS). Thus because the new flags history possibly + # consists of multiple columns, one for each time a series or scalar was passed to + # the flags. for c in columns: - if c not in out: - out[c] = flags[c] - - # Guard to avoid adding the dummy column only (`UNFLAGGED`-column). - if len(flags.history[c].columns) <= 1: - continue - - # We reset the dummy column, which make the initial - # UNFLAGGED column completely transparent, so we can 'see' - # which positions the current test really touched. h = flags.history[c] - h.hist[0] = UNTOUCHED - out[c] = h.max() + hmax = h.max(raw=True) + + # # handle empty case (i.e. test didn't set any flags, can happen on early returns), + # # to prevent a missing (empty) flags column + # if h.empty: + # out.history[c] = h.copy() + # continue + + # # if nothing was touched we have no need to clutter the history + # if (hmax == UNTOUCHED).all(): + # continue + + out[c] = hmax + + # we enrich the (already existing !) empty meta with some infos + history = out.history[c] + history.meta[-1].update( + { + "func": old_state.func_name, + "args": old_state.args, + "keywords": old_state.kwargs, + } + ) + out.history[c] = history return out @@ -378,10 +430,6 @@ def _unmaskData(data: dios.DictOfSeries, old_state: CallState) -> dios.DictOfSer if old_state.data[c].empty or data[c].empty or old_state.mask[c].empty: continue - # on index changed, we simply ignore the old data - if not old_state.data[c].index.equals(data[c].index): - continue - restore_old_mask = old_state.mask[c].to_numpy() & data[c].isna().to_numpy() # we have nothing to restore @@ -393,3 +441,10 @@ def _unmaskData(data: dios.DictOfSeries, old_state: CallState) -> dios.DictOfSer data.loc[:, c] = np.where(restore_old_mask, old, new) return data + + +def _sliceHistory(history: History, sl: slice) -> History: + history.mask = history.mask.iloc[:, sl] + history.hist = history.hist.iloc[:, sl] + history.meta = history.meta[sl] + return history diff --git a/saqc/core/translator/basetranslator.py b/saqc/core/translator/basetranslator.py index 033849ddf550d2fae8f151bd9d9ae63b1301ba69..1cba5019e29dc9e5ce6c170fa3e483ec7f5186fe 100644 --- a/saqc/core/translator/basetranslator.py +++ b/saqc/core/translator/basetranslator.py @@ -3,8 +3,7 @@ from __future__ import annotations -from saqc.core.lib import SaQCFunction, ColumnSelector -from typing import Dict, Optional, Union, Any, Tuple, Callable +from typing import Dict, Union, Any import numpy as np import pandas as pd @@ -16,7 +15,7 @@ from saqc.core.flags import ( UNFLAGGED, BAD, ) -from saqc.lib.types import ExternalFlag, MaterializedGraph, DiosLikeT +from saqc.lib.types import ExternalFlag ForwardMap = Dict[ExternalFlag, float] @@ -123,65 +122,10 @@ class Translator: if flag not in self._forward: if flag not in self._backward: raise ValueError(f"invalid flag: {flag}") - return ( - flag - ) # type: # ignore -> if flag is in `self._backward` it is of type float + return float(flag) return self._forward[flag] - @staticmethod - def _generateInitFunction( - flag_name: str, history: pd.Series - ) -> Callable[[DictOfSeries, str, Flags, Any], Tuple[DictOfSeries, Flags]]: - # NOTE: - # Close over `flags_column` and `history_column` - # to immitate the original function application, - # that we cannot replicate directly because of - # lacking information. - # I am not entirely sure, if closing over - # `flag_column` is really necessary or if we - # even should close over `flags` - def mapFlags(data: DictOfSeries, field: str, flags: Flags, **kwargs): - flags[history.index, flag_name] = history - return data, flags - - return mapFlags - - @staticmethod - def buildGraph(flags: Flags) -> MaterializedGraph: - """ - build a call graph from the external flags - - Build an `MaterializedGraph`, that can be used - in replays of the original `SaQC` run yielding the - same result for the same input data set. - - As we usually don't have enough information (i.e. SaQC - function name and all used parameters) we generate dummy - functions here. These dummy functions unconditionally set - the `field` to the provided flags. - - Parameters - ---------- - flags : flags to generate a call graph for - """ - out = [] - for flag_name in flags.columns: - # skip the default column - for _, hist_column in tuple(flags.history[flag_name].hist.items())[1:]: - out.append( - ( - ColumnSelector(flag_name), - SaQCFunction( - name="initFlags", - function=Translator._generateInitFunction( - flag_name, hist_column - ), - ), - ) - ) - return out - - def forward(self, flags: pd.DataFrame) -> Tuple[Flags, MaterializedGraph]: + def forward(self, flags: pd.DataFrame) -> Flags: """ Translate from 'external flags' to 'internal flags' @@ -194,10 +138,9 @@ class Translator: ------- Flags object """ - tflags = Flags(self._translate(flags, self._forward)) - return tflags, self.buildGraph(tflags) + return Flags(self._translate(flags, self._forward)) - def backward(self, flags: Flags, call_graph: MaterializedGraph) -> pd.DataFrame: + def backward(self, flags: Flags) -> pd.DataFrame: """ Translate from 'internal flags' to 'external flags' @@ -205,10 +148,6 @@ class Translator: ---------- flags : pd.DataFrame The external flags to translate - call_stack : List - The saqc functions called to generate the given `flags` (i.e. `SaQC._computed`) - `call_stack` is not evaluated here, it's presence only ensures, that subclasses - have access to it. Returns ------- diff --git a/saqc/core/translator/dmptranslator.py b/saqc/core/translator/dmptranslator.py index fd37cc0ce158764a91baf7d0f3b38e78d87be3ee..bc6a674de0cdda374f5692a4fc9f8e42751b615f 100644 --- a/saqc/core/translator/dmptranslator.py +++ b/saqc/core/translator/dmptranslator.py @@ -2,24 +2,44 @@ # -*- coding: utf-8 -*- from __future__ import annotations -from dios import DictOfSeries import json -from typing import List, Tuple +from saqc.core.history import History +from typing import Any import numpy as np import pandas as pd -from saqc.core.lib import SaQCFunction, ColumnSelector from saqc.core.flags import ( Flags, UNFLAGGED, + UNTOUCHED, GOOD, DOUBTFUL, BAD, ) -from saqc.lib.types import MaterializedGraph -from saqc.core.translator.basetranslator import Translator, ForwardMap +from saqc.core.translator.basetranslator import BackwardMap, Translator, ForwardMap + + +_QUALITY_CAUSES = [ + "", + "BATTERY_LOW", + "BELOW_MINIMUM", + "ABOVE_MAXIMUM", + "BELOW_OR_ABOVE_MIN_MAX", + "ISOLATED_SPIKE", + "DEFECTIVE_SENSOR", + "LEFT_CENSORED_DATA", + "RIGHT_CENSORED_DATA", + "OTHER", + "AUTOFLAGGED", +] + +_QUALITY_LABELS = [ + "quality_flag", + "quality_cause", + "quality_comment", +] class DmpTranslator(Translator): @@ -29,7 +49,7 @@ class DmpTranslator(Translator): the UFZ - Datamanagementportal """ - ARGUMENTS = {"comment": "", "cause": ""} + ARGUMENTS = {"comment": "", "cause": "AUTOFLAGGED"} _FORWARD: ForwardMap = { "NIL": UNFLAGGED, @@ -38,208 +58,157 @@ class DmpTranslator(Translator): "BAD": BAD, } - _QUALITY_CAUSES = { - "BATTERY_LOW", - "BELOW_MINIMUM", - "ABOVE_MAXIMUM", - "BELOW_OR_ABOVE_MIN_MAX", - "ISOLATED_SPIKE", - "DEFECTIVE_SENSOR", - "LEFT_CENSORED_DATA", - "RIGHT_CENSORED_DATA", - "OTHER", - "AUTO_FLAGGED", + _BACKWARD: BackwardMap = { + UNFLAGGED: "NIL", + UNTOUCHED: "NIL", + GOOD: "OK", + DOUBTFUL: "DOUBTFUL", + BAD: "BAD", } def __init__(self): - raise NotImplementedError - super().__init__( - forward=self._FORWARD, backward={v: k for k, v in self._FORWARD.items()} - ) + super().__init__(forward=self._FORWARD, backward=self._BACKWARD) - @staticmethod - def _getFieldFunctions( - field: str, call_stack: MaterializedGraph - ) -> List[SaQCFunction]: - """ - Return the names of all functions called on `field` - - Parameters - ---------- - field: str - variable/column name - - call_stack : List - The saqc functions called to generate the given `flags` (i.e. `SaQC._computed`) - - Note - ---- - Could (and maybe should) be implemented as a method of `CallGraph` - - Currently we work around the issue, that we keep track of the - computations we do on a variable using the variable name, but also - allow mutations of that name (i.e. our key) through `tools.rename` - in a somewhat hacky way. There are better ideas, to solve this (i.e. - global function pointers), but for the moment this has to do the trick - """ - # backtrack name changes and let's look, if our field - # originally had another name - for sel, func in call_stack[::-1]: - if func.name == "tools.rename": - new_name = func.keywords.get("new_name") or func.args[3] - if new_name == field: - field = sel.field - - out = [SaQCFunction(name="")] - for sel, func in call_stack: - if sel.field == field: - out.append(func) - # forward track name changes - if func.name == "tools.rename": - field = func.keywords.get("new_name") or func.args[3] - - return out - - def forward(self, flags: pd.DataFrame) -> Tuple[Flags, MaterializedGraph]: + def forward(self, df: pd.DataFrame) -> Flags: """ Translate from 'extrnal flags' to 'internal flags' Parameters ---------- - flags : pd.DataFrame + df : pd.DataFrame The external flags to translate Returns ------- Flags object """ - cols = flags.columns - if not isinstance(cols, pd.MultiIndex): - raise TypeError("DMP-Flags need mult-index columns") - col_labels = {"quality_flag", "quality_comment", "quality_cause"} - if set(cols.get_level_values(1)) != col_labels: - raise TypeError( - f"DMP-Flags expect the labels '{list(col_labels)}' in the secondary level" - ) - qflags = flags.xs(key="quality_flag", axis="columns", level=1) + self.validityCheck(df) - # We want to build a call graph from the given flags and as the DMP flags - # contain the name of last function that set a certain flag, we want to - # leverage this information - graph: MaterializedGraph = [] + data = {} - for field in qflags.columns: + for field in df.columns.get_level_values(0): - # extract relevant information from the comments - data = pd.DataFrame( - flags.loc[:, (field, "quality_comment")].apply(json.loads).to_list(), - index=flags.index, - ) - data["causes"] = flags.loc[:, (field, "quality_cause")] - - loc = ColumnSelector(field=field, target="field", regex=False) - - # we can't infer information about the ordering of function calls, - # so we order the history by appearance - # for _, group in data.fillna("").groupby(["test", "comment", "causes"]): - for _, group in data.loc[data["test"].replace("", np.nan).notna()].groupby( - ["test", "comment", "causes"] - ): - fname, comment, cause = group.iloc[0] - func = SaQCFunction( - name=fname, - function=Translator._generateInitFunction( - field, qflags.loc[group.index] - ), - comment=comment, - cause=cause, - ) - graph.append((loc, func)) + field_flags = df[field] + field_history = History(field_flags.index) + + for (flag, cause, comment), values in field_flags.groupby(_QUALITY_LABELS): + try: + comment = json.loads(comment) + except json.decoder.JSONDecodeError: + comment = {"test": "unknown", "comment": ""} + + histcol = pd.Series(UNTOUCHED, index=field_flags.index) + histcol.loc[values.index] = self(flag) + + meta = { + "func": comment["test"], + "keywords": {"comment": comment["comment"], "cause": cause}, + } + field_history.append(histcol, force=True, meta=meta) - tflags, _ = super().forward(qflags) - return tflags, graph + data[str(field)] = field_history - def backward(self, flags: Flags, call_graph: MaterializedGraph) -> pd.DataFrame: + return Flags(data) + + def backward(self, flags: Flags) -> pd.DataFrame: """ Translate from 'internal flags' to 'external flags' Parameters ---------- - flags : pd.DataFrame - The external flags to translate - call_stack : List - The saqc functions called to generate the given `flags` (i.e. `SaQC._computed`) + flags : The external flags to translate Returns ------- - pd.DataFrame + translated flags """ - tflags = super().backward(flags, call_graph) + tflags = super().backward(flags) + + out = pd.DataFrame( + index=tflags.index, + columns=pd.MultiIndex.from_product([tflags.columns, _QUALITY_LABELS]), + ) - out = {} for field in tflags.columns: - flag_call_history = self._getFieldFunctions(field, call_graph) - flag_pos = flags.history[field].idxmax() - comments, causes = [], [] - # NOTE: - # Strangely enough, this loop withstood all my efforts - # to speed it up through vectorization - the simple - # loop always outperformed even careful `pd.DataFrame.apply` - # versions. The latest try is left as a comment below. - for p in flag_pos: - func = flag_call_history[p] - cause = func.keywords.get("cause", self.ARGUMENTS["cause"]) + df = pd.DataFrame( + { + "quality_flag": tflags[field], + "quality_cause": self.ARGUMENTS["cause"], + "quality_comment": self.ARGUMENTS["comment"], + } + ) + + history = flags.history[field] + + for col in history.columns: + + # NOTE: + # I really dislike the fact, that the implementationd + # detail `mask`, leaks into the translator. For the + # the limited time available it is a pragmatic solution + # however... + h, m = history.hist[col], history.mask[col] + h[~m | (h == UNFLAGGED)] = np.nan + valid = h.notna() + + # extract from meta + meta = history.meta[col] + keywords = meta.get("keywords", {}) comment = json.dumps( { - "test": func.name, - "comment": func.keywords.get( - "comment", self.ARGUMENTS["comment"] - ), + "test": meta.get("func", "unknown"), + "comment": keywords.get("comment", self.ARGUMENTS["comment"]), } ) - causes.append(cause) - comments.append(comment) - - # DMP quality_cause needs some special care as only certain values - # and combinations are allowed. - # See: https://wiki.intranet.ufz.de/wiki/dmp/index.php/Qualit%C3%A4tsflags - causes = pd.Series(causes, index=flags[field].index) - causes[ - (causes == self.ARGUMENTS["cause"]) & (flags[field] > GOOD) - ] = "OTHER" - if not ((causes == "") | causes.isin(self._QUALITY_CAUSES)).all(): - raise ValueError( - f"quality causes needs to be one of {self._QUALITY_CAUSES}" - ) + cause = keywords.get("cause", self.ARGUMENTS["cause"]) + df.loc[valid, "quality_comment"] = comment + df.loc[valid, "quality_cause"] = cause + + out[field] = df + + self.validityCheck(out) + return out + + @classmethod + def validityCheck(cls, df: pd.DataFrame) -> None: + """ + Check wether the given causes and comments are valid. + + Parameters + ---------- + df : external flags + """ - var_flags = { - "quality_flag": tflags[field], - "quality_comment": pd.Series(comments, index=flags[field].index), - "quality_cause": causes, - } - out[field] = pd.DataFrame(var_flags) - return pd.concat(out, axis="columns") - - # for field in tflags.columns: - # call_history = [] - # for func in self._getFieldFunctions(field, call_graph): - # func_info = { - # "cause": func.keywords.get("cause", self.ARGUMENTS["comment"]), - # "comment": json.dumps({ - # "test": func.name, - # "comment": func.keywords.get("comment", self.ARGUMENTS["comment"]), - # }) - # } - # call_history.append(func_info) - - # functions = pd.DataFrame(call_history) - # flag_pos = flags.history[field].idxmax() - - # var_flags = { - # "quality_flag": tflags[field].reset_index(drop=True), - # "quality_comment": functions.loc[flag_pos, "comment"].reset_index(drop=True), - # "quality_cause": functions.loc[flag_pos, "cause"].reset_index(drop=True), - # } - # out[field] = pd.DataFrame(var_flags, index=flag_pos.index) - # return pd.concat(out, axis="columns") + cols = df.columns + if not isinstance(cols, pd.MultiIndex): + raise TypeError("DMP-Flags need multi-index columns") + + if not cols.get_level_values(1).isin(_QUALITY_LABELS).all(axis=None): + raise TypeError( + f"DMP-Flags expect the labels {list(_QUALITY_LABELS)} in the secondary level" + ) + + flags = df.xs(axis="columns", level=1, key="quality_flag") + causes = df.xs(axis="columns", level=1, key="quality_cause") + comments = df.xs(axis="columns", level=1, key="quality_comment") + + if not flags.isin(cls._FORWARD.keys()).all(axis=None): + raise ValueError( + f"invalid quality flag(s) found, only the following values are supported: {set(cls._FORWARD.keys())}" + ) + + if not causes.isin(_QUALITY_CAUSES).all(axis=None): + raise ValueError( + f"invalid quality cause(s) found, only the following values are supported: {_QUALITY_CAUSES}" + ) + + if (~flags.isin(("OK", "NIL")) & (causes == "")).any(axis=None): + raise ValueError( + "quality flags other than 'OK and 'NIL' need a non-empty quality cause" + ) + + if ((causes == "OTHER") & (comments == "")).any(None): + raise ValueError( + "quality comment 'OTHER' needs a non-empty quality comment" + ) diff --git a/saqc/core/translator/positionaltranslator.py b/saqc/core/translator/positionaltranslator.py index e51871f2b23ded41a5b5f1c8cc8fe726bd1aae6e..fab5f00c774e66bb1272d7af54421602a71fe24a 100644 --- a/saqc/core/translator/positionaltranslator.py +++ b/saqc/core/translator/positionaltranslator.py @@ -9,7 +9,7 @@ import pandas as pd from saqc.core.flags import ( Flags, - _simpleHist, + History, UNTOUCHED, UNFLAGGED, GOOD, @@ -38,7 +38,7 @@ class PositionalTranslator(Translator): def __init__(self): super().__init__(forward=self._FORWARD, backward=self._BACKWARD) - def forward(self, flags: pd.DataFrame) -> Tuple[Flags, MaterializedGraph]: + def forward(self, flags: pd.DataFrame) -> Flags: """ Translate from 'external flags' to 'internal flags' @@ -61,16 +61,16 @@ class PositionalTranslator(Translator): index=field_flags.index, ).astype(int) - # the exploded values + the an initial column are the History of `field` + # the exploded values form the History of `field` fflags = super()._translate(df, self._FORWARD) - field_history = _simpleHist(field_flags.index).append(fflags.to_df()) - data[field] = field_history + field_history = History(field_flags.index) + for _, s in fflags.items(): + field_history.append(s, force=True) + data[str(field)] = field_history - tflags = Flags(data) - graph = self.buildGraph(tflags) - return tflags, graph + return Flags(data) - def backward(self, flags: Flags, call_stack: MaterializedGraph) -> pd.DataFrame: + def backward(self, flags: Flags) -> pd.DataFrame: """ Translate from 'internal flags' to 'external flags' @@ -78,9 +78,6 @@ class PositionalTranslator(Translator): ---------- flags : pd.DataFrame The external flags to translate - call_stack : List - The saqc functions called to generate the given `flags` (i.e. `SaQC._computed`) - `call_stack` is not evaluated here. Returns ------- @@ -92,9 +89,9 @@ class PositionalTranslator(Translator): # Concatenate the single flag values. There are faster and more # complicated approaches (see former `PositionalFlagger`), but # this method shouldn't be called that often - ncols = thist.shape[-1] - 1 + ncols = thist.shape[-1] init = 9 * 10 ** ncols - bases = 10 ** np.arange(ncols, -1, -1) + bases = 10 ** np.arange(ncols - 1, -1, -1) tflags = init + (thist * bases).sum(axis=1) out[field] = tflags diff --git a/saqc/funcs/__init__.py b/saqc/funcs/__init__.py index d4fabccfc98765f48221838d54f21f9db73f5301..006f061ab0cd5bbd2110d69906a5f05ef59c7c99 100644 --- a/saqc/funcs/__init__.py +++ b/saqc/funcs/__init__.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- # imports needed to make the functions register themself -from saqc.core.register import register +from saqc.core.register import flagging from saqc.funcs.breaks import * from saqc.funcs.changepoints import * from saqc.funcs.constants import * diff --git a/saqc/funcs/breaks.py b/saqc/funcs/breaks.py index e14d6826f6589e71c1c1b34c60c43dc1d15888a6..ae6791afd1e4ea7a3da60e907f6749daa090e9cc 100644 --- a/saqc/funcs/breaks.py +++ b/saqc/funcs/breaks.py @@ -20,10 +20,10 @@ from saqc.constants import * from saqc.lib.tools import groupConsecutives from saqc.lib.types import FreqString, ColumnName, IntegerWindow from saqc.funcs.changepoints import assignChangePointCluster -from saqc.core import register, Flags +from saqc.core import flagging, Flags -@register(masking="field", module="breaks") +@flagging(masking="field", module="breaks") def flagMissing( data: DictOfSeries, field: ColumnName, @@ -65,7 +65,7 @@ def flagMissing( return data, flags -@register(masking="field", module="breaks") +@flagging(masking="field", module="breaks") def flagIsolated( data: DictOfSeries, field: ColumnName, @@ -144,7 +144,7 @@ def flagIsolated( return data, flags -@register(masking="field", module="breaks") +@flagging(masking="field", module="breaks") def flagJumps( data: DictOfSeries, field: ColumnName, diff --git a/saqc/funcs/changepoints.py b/saqc/funcs/changepoints.py index 7325b646751ef798c8556bd63db3953dd9f4be9c..a9c5daf637579c66104b71baaa4bc2f0481d9093 100644 --- a/saqc/funcs/changepoints.py +++ b/saqc/funcs/changepoints.py @@ -13,13 +13,13 @@ from dios import DictOfSeries from saqc.constants import * from saqc.lib.tools import customRoller -from saqc.core import register, Flags +from saqc.core import flagging, Flags from saqc.lib.types import ColumnName, FreqString, IntegerWindow logger = logging.getLogger("SaQC") -@register(masking="field", module="changepoints") +@flagging(masking="field", module="changepoints") def flagChangePoints( data: DictOfSeries, field: str, @@ -109,7 +109,7 @@ def flagChangePoints( ) -@register(masking="field", module="changepoints") +@flagging(masking="field", module="changepoints") def assignChangePointCluster( data: DictOfSeries, field: str, diff --git a/saqc/funcs/constants.py b/saqc/funcs/constants.py index c406b40276ceb827627789612887d734b3d6d1c7..1abb3f73f74537f2188f5e995ec68bec652bbcd0 100644 --- a/saqc/funcs/constants.py +++ b/saqc/funcs/constants.py @@ -11,13 +11,13 @@ import operator from dios import DictOfSeries from saqc.constants import * -from saqc.core import register, Flags +from saqc.core import flagging, Flags from saqc.lib.ts_operators import varQC from saqc.lib.tools import customRoller, getFreqDelta, statPass from saqc.lib.types import FreqString, ColumnName -@register(masking="field", module="constants") +@flagging(masking="field", module="constants") def flagConstants( data: DictOfSeries, field: ColumnName, @@ -81,7 +81,7 @@ def flagConstants( return data, flags -@register(masking="field", module="constants") +@flagging(masking="field", module="constants") def flagByVariance( data: DictOfSeries, field: ColumnName, diff --git a/saqc/funcs/curvefit.py b/saqc/funcs/curvefit.py index 627d9c9530a2fde5bbfe81a0bd9f0b40aa53f4f4..406728dba32e397451d3dd0e792d163b8bd8cc5a 100644 --- a/saqc/funcs/curvefit.py +++ b/saqc/funcs/curvefit.py @@ -9,7 +9,7 @@ import pandas as pd from dios import DictOfSeries from saqc.constants import * -from saqc.core import register, Flags +from saqc.core import flagging, Flags from saqc.lib.tools import getFreqDelta from saqc.lib.ts_operators import ( polyRollerIrregular, @@ -20,7 +20,7 @@ from saqc.lib.ts_operators import ( ) -@register(masking="field", module="curvefit") +@flagging(masking="field", module="curvefit") def fitPolynomial( data: DictOfSeries, field: str, diff --git a/saqc/funcs/drift.py b/saqc/funcs/drift.py index d078c80717a67992e7f1fc1f71e3a554fdae2816..4431ab8321aae1f79477e420e0f24eb65965fc42 100644 --- a/saqc/funcs/drift.py +++ b/saqc/funcs/drift.py @@ -15,7 +15,7 @@ from scipy.optimize import curve_fit from scipy.spatial.distance import pdist from saqc.constants import * -from saqc.core.register import register +from saqc.core.register import flagging from saqc.core import Flags from saqc.funcs.resampling import shift from saqc.funcs.changepoints import assignChangePointCluster @@ -34,7 +34,7 @@ LinkageString = Literal[ ] -@register(masking="all", module="drift") +@flagging(masking="all", module="drift") def flagDriftFromNorm( data: DictOfSeries, field: ColumnName, @@ -156,7 +156,7 @@ def flagDriftFromNorm( return data, flags -@register(masking="all", module="drift") +@flagging(masking="all", module="drift") def flagDriftFromReference( data: DictOfSeries, field: ColumnName, @@ -237,7 +237,7 @@ def flagDriftFromReference( return data, flags -@register(masking="all", module="drift") +@flagging(masking="all", module="drift") def flagDriftFromScaledNorm( data: DictOfSeries, field: ColumnName, @@ -362,7 +362,7 @@ def flagDriftFromScaledNorm( return data, flags -@register(masking="all", module="drift") +@flagging(masking="all", module="drift") def correctDrift( data: DictOfSeries, field: ColumnName, @@ -502,7 +502,7 @@ def correctDrift( return data, flags -@register(masking="all", module="drift") +@flagging(masking="all", module="drift") def correctRegimeAnomaly( data: DictOfSeries, field: ColumnName, @@ -621,7 +621,7 @@ def correctRegimeAnomaly( return data, flags -@register(masking="all", module="drift") +@flagging(masking="all", module="drift") def correctOffset( data: DictOfSeries, field: ColumnName, @@ -721,7 +721,7 @@ def _driftFit(x, shift_target, cal_mean, driftModel): return data_fit, data_shift -@register(masking="all", module="drift") +@flagging(masking="all", module="drift") def flagRegimeAnomaly( data: DictOfSeries, field: ColumnName, @@ -798,7 +798,7 @@ def flagRegimeAnomaly( ) -@register(masking="all", module="drift") +@flagging(masking="all", module="drift") def assignRegimeAnomaly( data: DictOfSeries, field: ColumnName, diff --git a/saqc/funcs/flagtools.py b/saqc/funcs/flagtools.py index c675d1830972800d8eb850f4bd78dec19d9529b4..c5553dde9bfea49d499d2e3a4e934043c444a359 100644 --- a/saqc/funcs/flagtools.py +++ b/saqc/funcs/flagtools.py @@ -7,11 +7,11 @@ from dios import DictOfSeries from saqc.constants import BAD, UNFLAGGED from saqc.lib.types import ColumnName -from saqc.core import register, Flags +from saqc.core import flagging, processing, Flags import warnings -@register(masking="field", module="flagtools") +@flagging(masking="field", module="flagtools") def forceFlags( data: DictOfSeries, field: ColumnName, flags: Flags, flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flags]: @@ -46,7 +46,7 @@ def forceFlags( # masking='none' is sufficient because call is redirected -@register(masking="none", module="flagtools") +@flagging(masking="none", module="flagtools") def clearFlags( data: DictOfSeries, field: ColumnName, flags: Flags, **kwargs ) -> Tuple[DictOfSeries, Flags]: @@ -69,6 +69,14 @@ def clearFlags( data : DictOfSeries flags : saqc.Flags + Notes + ----- + This function ignores the ``to_mask`` keyword, because the data is not relevant + for processing. + A warning is triggered if the ``flag`` keyword is given, because the flags are + always set to `UNFLAGGED`. + + See Also -------- forceFlags : set whole column to a flag value @@ -82,7 +90,7 @@ def clearFlags( return forceFlags(data, field, flags, flag=UNFLAGGED, **kwargs) -@register(masking="none", module="flagtools") +@flagging(masking="none", module="flagtools") def flagUnflagged( data: DictOfSeries, field: ColumnName, flags: Flags, flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flags]: @@ -109,6 +117,11 @@ def flagUnflagged( flags : saqc.Flags The quality flags of data + Notes + ----- + This function ignores the ``to_mask`` keyword, because the data is not relevant + for processing. + See Also -------- clearFlags : set whole column to UNFLAGGED @@ -119,7 +132,7 @@ def flagUnflagged( return data, flags -@register(masking="field", module="flagtools") +@flagging(masking="field", module="flagtools") def flagManual( data: DictOfSeries, field: ColumnName, @@ -263,7 +276,7 @@ def flagManual( return data, flags -@register(masking="none", module="flagtools") +@flagging(module="flagtools") def flagDummy( data: DictOfSeries, field: ColumnName, flags: Flags, **kwargs ) -> Tuple[DictOfSeries, Flags]: diff --git a/saqc/funcs/generic.py b/saqc/funcs/generic.py index 06cc8dfe9889d0e6dd3a26703ff012f3517cd386..6beef3c53091b5f270002fc2898bb54d3c155203 100644 --- a/saqc/funcs/generic.py +++ b/saqc/funcs/generic.py @@ -12,7 +12,7 @@ from dios import DictOfSeries from saqc.constants import GOOD, BAD, UNFLAGGED from saqc.core.flags import initFlagsLike, Flags -from saqc.core.register import register, _maskData +from saqc.core.register import flagging, processing, _maskData from saqc.core.visitor import ENVIRONMENT import operator as op @@ -85,7 +85,7 @@ def _execGeneric( return func(*args) -@register(masking="none", module="generic") +@processing(module="generic") def process( data: DictOfSeries, field: str, @@ -157,7 +157,7 @@ def process( return data, flags -@register(masking="none", module="generic") +@flagging(masking="none", module="generic") def flag( data: DictOfSeries, field: str, @@ -255,6 +255,6 @@ def flag( if not np.issubdtype(mask.dtype, np.bool_): raise TypeError(f"generic expression does not return a boolean array") - flags[field] = mask.replace({False: UNFLAGGED, True: BAD}) + flags[field] = mask.replace({False: UNFLAGGED, True: flag}) return data, flags diff --git a/saqc/funcs/interpolation.py b/saqc/funcs/interpolation.py index 368954a226246f23a58921189af3374bb77863d1..0f2487616d7f46e7582723926f9a8af0e79f78e0 100644 --- a/saqc/funcs/interpolation.py +++ b/saqc/funcs/interpolation.py @@ -7,9 +7,8 @@ import pandas as pd from dios import DictOfSeries from saqc.constants import * -from saqc.core import register, Flags +from saqc.core import flagging, processing, Flags from saqc.core.register import _isflagged -from saqc.core.history import applyFunctionOnHistory from saqc.lib.ts_operators import interpolateNANs _SUPPORTED_METHODS = Literal[ @@ -31,7 +30,7 @@ _SUPPORTED_METHODS = Literal[ ] -@register(masking="field", module="interpolation") +@flagging(masking="field", module="interpolation") def interpolateByRolling( data: DictOfSeries, field: str, @@ -106,7 +105,7 @@ def interpolateByRolling( return data, flags -@register(masking="field", module="interpolation") +@flagging(masking="field", module="interpolation") def interpolateInvalid( data: DictOfSeries, field: str, @@ -190,7 +189,7 @@ def _resampleOverlapping(data: pd.Series, freq: str, fill_value): return data.fillna(fill_value).astype(dtype) -@register(masking="none", module="interpolation") +@processing(module="interpolation") def interpolateIndex( data: DictOfSeries, field: str, @@ -287,14 +286,14 @@ def interpolateIndex( # store interpolated grid data[field] = inter_data[grid_index] - # do the reshaping on the history - flags.history[field] = applyFunctionOnHistory( - flags.history[field], + history = flags.history[field].apply( + index=data[field].index, hist_func=_resampleOverlapping, - hist_kws=dict(freq=freq, fill_value=UNFLAGGED), mask_func=_resampleOverlapping, + hist_kws=dict(freq=freq, fill_value=UNFLAGGED), mask_kws=dict(freq=freq, fill_value=True), - last_column="dummy", + copy=False, ) + flags.history[field] = history return data, flags diff --git a/saqc/funcs/noise.py b/saqc/funcs/noise.py index c96f0f49ca7f2aa1ac7700f57d94083e8955c242..aedc4c2a6504d56a8280bc29afbb491841451896 100644 --- a/saqc/funcs/noise.py +++ b/saqc/funcs/noise.py @@ -6,12 +6,12 @@ import operator from dios import DictOfSeries from typing import Callable from saqc.constants import * -from saqc.core import register, Flags +from saqc.core import flagging, Flags from saqc.lib.types import ColumnName, FreqString, PositiveInt, PositiveFloat, Literal from saqc.lib.tools import statPass -@register(masking="field", module="noise") +@flagging(masking="field", module="noise") def flagByStatLowPass( data: DictOfSeries, field: ColumnName, diff --git a/saqc/funcs/outliers.py b/saqc/funcs/outliers.py index 4fc57a6a7402eaab1120c895f38f8abd878a36c4..cb3f71178539ad9f1d9059e479c992153067441e 100644 --- a/saqc/funcs/outliers.py +++ b/saqc/funcs/outliers.py @@ -12,14 +12,14 @@ from outliers import smirnov_grubbs from scipy.optimize import curve_fit from saqc.constants import * -from saqc.core import register, Flags +from saqc.core import flagging, Flags from saqc.lib.types import ColumnName, FreqString, IntegerWindow from saqc.lib.tools import customRoller, findIndex, getFreqDelta from saqc.funcs.scores import assignKNNScore import saqc.lib.ts_operators as ts_ops -@register(masking="field", module="outliers") +@flagging(masking="field", module="outliers") def flagByStray( data: DictOfSeries, field: ColumnName, @@ -383,7 +383,7 @@ def _expFit( return val_frame.index[sorted_i[iter_index:]] -@register(masking="all", module="outliers") +@flagging(masking="all", module="outliers") def flagMVScores( data: DictOfSeries, field: ColumnName, @@ -552,7 +552,7 @@ def flagMVScores( return data, flags -@register(masking="field", module="outliers") +@flagging(masking="field", module="outliers") def flagRaise( data: DictOfSeries, field: ColumnName, @@ -737,7 +737,7 @@ def flagRaise( return data, flags -@register(masking="field", module="outliers") +@flagging(masking="field", module="outliers") def flagMAD( data: DictOfSeries, field: ColumnName, @@ -806,7 +806,7 @@ def flagMAD( return data, flags -@register(masking="field", module="outliers") +@flagging(masking="field", module="outliers") def flagOffset( data: DictOfSeries, field: ColumnName, @@ -976,7 +976,7 @@ def flagOffset( return data, flags -@register(masking="field", module="outliers") +@flagging(masking="field", module="outliers") def flagByGrubbs( data: DictOfSeries, field: ColumnName, @@ -1091,7 +1091,7 @@ def flagByGrubbs( return data, flags -@register(masking="field", module="outliers") +@flagging(masking="field", module="outliers") def flagRange( data: DictOfSeries, field: ColumnName, @@ -1134,7 +1134,7 @@ def flagRange( return data, flags -@register(masking="all", module="outliers") +@flagging(masking="all", module="outliers") def flagCrossStatistic( data: DictOfSeries, field: ColumnName, diff --git a/saqc/funcs/pattern.py b/saqc/funcs/pattern.py index be633b9af98ff18c3e8c55ad40361325d671bce2..89ac4b3542e62e4f2fa1d176c4f25890ef9aaa63 100644 --- a/saqc/funcs/pattern.py +++ b/saqc/funcs/pattern.py @@ -8,11 +8,11 @@ import pywt from mlxtend.evaluate import permutation_test from saqc.constants import * -from saqc.core.register import register +from saqc.core.register import flagging from saqc.lib.tools import customRoller -@register(masking="field", module="pattern") +@flagging(masking="field", module="pattern") def flagPatternByWavelet( data, field, @@ -180,7 +180,7 @@ def calculateDistanceByDTW( return distances.reindex(index=data.index) # reinsert NaNs -@register(masking="field", module="pattern") +@flagging(masking="field", module="pattern") def flagPatternByDTW( data, field, flags, ref_field, max_distance=0.0, normalize=True, flag=BAD, **kwargs ): diff --git a/saqc/funcs/resampling.py b/saqc/funcs/resampling.py index ddd162219037a0493197dacd9a49dd170764d0d2..e6f7ced618c7d027deafaedaf3fa8535261ca1a5 100644 --- a/saqc/funcs/resampling.py +++ b/saqc/funcs/resampling.py @@ -9,12 +9,11 @@ import pandas as pd from dios import DictOfSeries from saqc.constants import * -from saqc.core import register, Flags +from saqc.core import processing, Flags from saqc.core.register import _isflagged -from saqc.core.history import applyFunctionOnHistory from saqc.lib.tools import evalFreqStr, getFreqDelta from saqc.lib.ts_operators import shift2Freq, aggregate2Freq -from saqc.funcs.tools import copy, drop, rename +from saqc.funcs.tools import copy from saqc.funcs.interpolation import interpolateIndex, _SUPPORTED_METHODS @@ -31,7 +30,7 @@ METHOD2ARGS = { } -@register(masking="none", module="resampling") +@processing(module="resampling") def linear( data: DictOfSeries, field: str, flags: Flags, freq: str, **kwargs ) -> Tuple[DictOfSeries, Flags]: @@ -74,7 +73,7 @@ def linear( return interpolateIndex(data, field, flags, freq, "time", **kwargs) -@register(masking="none", module="resampling") +@processing(module="resampling") def interpolate( data: DictOfSeries, field: str, @@ -139,7 +138,7 @@ def interpolate( ) -@register(masking="none", module="resampling") +@processing(module="resampling") def shift( data: DictOfSeries, field: str, @@ -205,21 +204,24 @@ def shift( # do the shift on the history history = flags.history[field] - history.hist = shift2Freq(history.hist, method, freq, fill_value=UNTOUCHED) - history.mask = shift2Freq(history.mask, method, freq, fill_value=True) - # The last 2 lines left the history in an unstable state, Also we want to - # append a dummy column, that represent the 'shift' in the history. - history.hist.loc[:, :0] = UNFLAGGED - dummy = pd.Series(UNTOUCHED, index=datcol.index, dtype=float) - history.append(dummy, force=True) + kws = dict(method=method, freq=freq) + history = history.apply( + index=datcol.index, + func_handle_df=True, + copy=False, + hist_func=shift2Freq, + hist_kws={**kws, "fill_value": UNTOUCHED}, + mask_func=shift2Freq, + mask_kws={**kws, "fill_value": True}, + ) flags.history[field] = history data[field] = datcol return data, flags -@register(masking="none", module="resampling") +@processing(module="resampling") def resample( data: DictOfSeries, field: str, @@ -236,25 +238,30 @@ def resample( **kwargs, ) -> Tuple[DictOfSeries, Flags]: """ - Function to resample the data. Afterwards the data will be sampled at regular (equidistant) timestamps - (or Grid points). Sampling intervals therefor get aggregated with a function, specifyed by 'agg_func' parameter and - the result gets projected onto the new timestamps with a method, specified by "method". The following method - (keywords) are available: - - * ``'nagg'``: all values in the range (+/- `freq`/2) of a grid point get aggregated with agg_func and assigned to it. - * ``'bagg'``: all values in a sampling interval get aggregated with agg_func and the result gets assigned to the last - grid point. - * ``'fagg'``: all values in a sampling interval get aggregated with agg_func and the result gets assigned to the next - grid point. - - - Note, that. if possible, functions passed to agg_func will get projected internally onto pandas.resample methods, - wich results in some reasonable performance boost - however, for this to work, you should pass functions that have - the __name__ attribute initialised and the according methods name assigned to it. - Furthermore, you shouldnt pass numpys nan-functions - (``nansum``, ``nanmean``,...) because those for example, have ``__name__ == 'nansum'`` and they will thus not - trigger ``resample.func()``, but the slower ``resample.apply(nanfunc)``. Also, internally, no nans get passed to - the functions anyway, so that there is no point in passing the nan functions. + Function to resample the data. + + The data will be sampled at regular (equidistant) timestamps aka. Grid points. + Sampling intervals therefore get aggregated with a function, specified by + 'agg_func' parameter and the result gets projected onto the new timestamps with a + method, specified by "method". The following method (keywords) are available: + + * ``'nagg'``: all values in the range (+/- `freq`/2) of a grid point get + aggregated with agg_func and assigned to it. + * ``'bagg'``: all values in a sampling interval get aggregated with agg_func and + the result gets assigned to the last grid point. + * ``'fagg'``: all values in a sampling interval get aggregated with agg_func and + the result gets assigned to the next grid point. + + + Note, that. if possible, functions passed to agg_func will get projected + internally onto pandas.resample methods, wich results in some reasonable + performance boost - however, for this to work, you should pass functions that + have the __name__ attribute initialised and the according methods name assigned + to it. Furthermore, you shouldnt pass numpys nan-functions (``nansum``, + ``nanmean``,...) because those for example, have ``__name__ == 'nansum'`` and + they will thus not trigger ``resample.func()``, but the slower ``resample.apply( + nanfunc)``. Also, internally, no nans get passed to the functions anyway, + so that there is no point in passing the nan functions. Parameters ---------- @@ -268,45 +275,50 @@ def resample( Container to store flags of the data. freq : str - An Offset String, that will be interpreted as the frequency you want to resample your data with. + An Offset String, that will be interpreted as the frequency you want to + resample your data with. agg_func : Callable The function you want to use for aggregation. method: {'fagg', 'bagg', 'nagg'}, default 'bagg' - Specifies which intervals to be aggregated for a certain timestamp. (preceding, succeeding or - "surrounding" interval). See description above for more details. + Specifies which intervals to be aggregated for a certain timestamp. (preceding, + succeeding or "surrounding" interval). See description above for more details. max_invalid_total_d : {None, int}, default None - Maximum number of invalid (nan) datapoints, allowed per resampling interval. If max_invalid_total_d is - exceeded, the interval gets resampled to nan. By default (``np.inf``), there is no bound to the number of nan - values in an interval and only intervals containing ONLY nan values or those, containing no values at all, - get projected onto nan + Maximum number of invalid (nan) datapoints, allowed per resampling interval. + If max_invalid_total_d is exceeded, the interval gets resampled to nan. By + default ( ``np.inf``), there is no bound to the number of nan values in an + interval and only intervals containing ONLY nan values or those, containing + no values at all, get projected onto nan max_invalid_consec_d : {None, int}, default None - Maximum number of consecutive invalid (nan) data points, allowed per resampling interval. - If max_invalid_consec_d is exceeded, the interval gets resampled to nan. By default (np.inf), - there is no bound to the number of consecutive nan values in an interval and only intervals - containing ONLY nan values, or those containing no values at all, get projected onto nan. + Maximum number of consecutive invalid (nan) data points, allowed per + resampling interval. If max_invalid_consec_d is exceeded, the interval gets + resampled to nan. By default (np.inf), there is no bound to the number of + consecutive nan values in an interval and only intervals containing ONLY nan + values, or those containing no values at all, get projected onto nan. max_invalid_total_f : {None, int}, default None - Same as `max_invalid_total_d`, only applying for the flags. The flag regarded as "invalid" value, - is the one passed to empty_intervals_flag (default=``BAD``). - Also this is the flag assigned to invalid/empty intervals. + Same as `max_invalid_total_d`, only applying for the flags. The flag regarded + as "invalid" value, is the one passed to empty_intervals_flag ( + default=``BAD``). Also this is the flag assigned to invalid/empty intervals. max_invalid_consec_f : {None, int}, default None - Same as `max_invalid_total_f`, only applying onto flags. The flag regarded as "invalid" value, is the one passed - to empty_intervals_flag. Also this is the flag assigned to invalid/empty intervals. + Same as `max_invalid_total_f`, only applying onto flags. The flag regarded as + "invalid" value, is the one passed to empty_intervals_flag. Also this is the + flag assigned to invalid/empty intervals. flag_agg_func : Callable, default: max - The function you want to aggregate the flags with. It should be capable of operating on the flags dtype - (usually ordered categorical). + The function you want to aggregate the flags with. It should be capable of + operating on the flags dtype (usually ordered categorical). freq_check : {None, 'check', 'auto'}, default None * ``None``: do not validate frequency-string passed to `freq` - * ``'check'``: estimate frequency and log a warning if estimate miss matchs frequency string passed to 'freq', or - if no uniform sampling rate could be estimated + * ``'check'``: estimate frequency and log a warning if estimate miss matchs + frequency string passed to 'freq', or if no uniform sampling rate could be + estimated * ``'auto'``: estimate frequency and use estimate. (Ignores `freq` parameter.) Returns @@ -342,16 +354,17 @@ def resample( max_invalid_consec=max_invalid_consec_f, ) - flags.history[field] = applyFunctionOnHistory( - flags.history[field], + history = flags.history[field].apply( + index=datcol.index, hist_func=aggregate2Freq, - hist_kws=kws, mask_func=aggregate2Freq, - mask_kws=kws, - last_column="dummy", + hist_kws=kws, + mask_kws={**kws, "agg_func": any, "fill_value": True}, + copy=False, ) data[field] = datcol + flags.history[field] = history return data, flags @@ -413,7 +426,7 @@ def _inverseShift( return source.fillna(fill_value).astype(dtype, copy=False) -@register(masking="none", module="resampling") +@processing(module="resampling") def reindexFlags( data: DictOfSeries, field: str, @@ -504,8 +517,8 @@ def reindexFlags( freq = getFreqDelta(flagscol.index) if freq is None and not method == "match": raise ValueError( - 'To project irregularly sampled data, either use method="match", or pass custom ' - "projection range to freq parameter" + 'To project irregularly sampled data, either use method="match", or ' + "pass custom projection range to freq parameter." ) if method[-13:] == "interpolation": @@ -537,8 +550,8 @@ def reindexFlags( else: raise ValueError(f"unknown method {method}") - history = applyFunctionOnHistory( - flags.history[source], func, func_kws, func, mask_kws, last_column=dummy + history = flags.history[source].apply( + dummy.index, func, func_kws, func, mask_kws, copy=False ) flags.history[field] = flags.history[field].append(history, force=False) return data, flags diff --git a/saqc/funcs/residues.py b/saqc/funcs/residues.py index 79a5cf80cdc6c4fd43a8f939709406cef3cee2fb..11141542d4b6e8653217317edddd339cffbb0b7f 100644 --- a/saqc/funcs/residues.py +++ b/saqc/funcs/residues.py @@ -7,12 +7,12 @@ import numpy as np from dios import DictOfSeries from saqc.constants import * -from saqc.core import register, Flags +from saqc.core import flagging, Flags from saqc.funcs.rolling import roll from saqc.funcs.curvefit import fitPolynomial -@register(masking="field", module="residues") +@flagging(masking="field", module="residues") def calculatePolynomialResidues( data: DictOfSeries, field: str, @@ -115,7 +115,7 @@ def calculatePolynomialResidues( ) -@register(masking="field", module="residues") +@flagging(masking="field", module="residues") def calculateRollingResidues( data: DictOfSeries, field: str, diff --git a/saqc/funcs/rolling.py b/saqc/funcs/rolling.py index 918d77be5fef8ea4dc4c0dc341804eea77f65026..3e23ee7e1ee9b71bf9e88053b96b955f441b9431 100644 --- a/saqc/funcs/rolling.py +++ b/saqc/funcs/rolling.py @@ -7,11 +7,11 @@ import pandas as pd from dios import DictOfSeries from saqc.constants import * -from saqc.core import register, Flags +from saqc.core import flagging, Flags from saqc.lib.tools import getFreqDelta -@register(masking="field", module="rolling") +@flagging(masking="field", module="rolling") def roll( data: DictOfSeries, field: str, diff --git a/saqc/funcs/scores.py b/saqc/funcs/scores.py index 346ba9a67f5b71f5f49e34014534cd2b3a9b0099..fc1d5748abed5bc32dabddcbaae44876141f7feb 100644 --- a/saqc/funcs/scores.py +++ b/saqc/funcs/scores.py @@ -7,12 +7,12 @@ import pandas as pd from dios import DictOfSeries from saqc.constants import * -from saqc.core import register, Flags +from saqc.core import flagging, Flags from saqc.lib.tools import toSequence import saqc.lib.ts_operators as ts_ops -@register(masking="all", module="scores") +@flagging(masking="all", module="scores") def assignKNNScore( data: DictOfSeries, field: str, diff --git a/saqc/funcs/tools.py b/saqc/funcs/tools.py index daa79f2f18df2df1c93cd86d81089563bbc6d222..55e3b8b72f00f83c0b8d00d98fbad419b745e069 100644 --- a/saqc/funcs/tools.py +++ b/saqc/funcs/tools.py @@ -13,14 +13,14 @@ import pickle from saqc.lib.types import FreqString from saqc.constants import * -from saqc.core import register, Flags +from saqc.core import flagging, processing, Flags from saqc.lib.tools import periodicMask from saqc.lib.plotting import makeFig _MPL_DEFAULT_BACKEND = mpl.get_backend() -@register(masking="none", module="tools") +@processing(module="tools") def copy( data: DictOfSeries, field: str, flags: Flags, new_field: str, **kwargs ) -> Tuple[DictOfSeries, Flags]: @@ -57,7 +57,7 @@ def copy( return data, flags -@register(masking="none", module="tools") +@processing(module="tools") def drop( data: DictOfSeries, field: str, flags: Flags, **kwargs ) -> Tuple[DictOfSeries, Flags]: @@ -87,7 +87,7 @@ def drop( return data, flags -@register(masking="none", module="tools") +@processing(module="tools") def rename( data: DictOfSeries, field: str, flags: Flags, new_name: str, **kwargs ) -> Tuple[DictOfSeries, Flags]: @@ -119,7 +119,7 @@ def rename( return data, flags -@register(masking="none", module="tools") +@processing(module="tools") def mask( data: DictOfSeries, field: str, @@ -244,7 +244,7 @@ def mask( return data, flags -@register(masking="none", module="tools") +@processing(module="tools") def plot( data: DictOfSeries, field: str, diff --git a/saqc/funcs/transformation.py b/saqc/funcs/transformation.py index 7d4a5d75e31772df8566112073770f6645cfd9b4..032579467a82c969a2c24fb6befc51d501063079 100644 --- a/saqc/funcs/transformation.py +++ b/saqc/funcs/transformation.py @@ -6,10 +6,10 @@ import numpy as np import pandas as pd from dios import DictOfSeries -from saqc.core import register, Flags +from saqc.core import flagging, Flags -@register(masking="field", module="transformation") +@flagging(masking="field", module="transformation") def transform( data: DictOfSeries, field: str, diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index e3a393d5ae67c5223cf29eefd047d55282d726bb..37d775ce33dfc464daa5a9cd527b0f14d074b00b 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -24,22 +24,22 @@ def identity(ts): def count(ts): - # count is a dummy to trigger according built in count method of - # resamplers when passed to aggregate2freq. For consistency reasons, it works accordingly when + # count is a dummy to trigger according built in count method of resamplers when + # passed to aggregate2freq. For consistency reasons, it works accordingly when # applied directly: return ts.count() def first(ts): - # first is a dummy to trigger according built in count method of - # resamplers when passed to aggregate2freq. For consistency reasons, it works accordingly when + # first is a dummy to trigger according built in count method of resamplers when + # passed to aggregate2freq. For consistency reasons, it works accordingly when # applied directly: return ts.first() def last(ts): - # last is a dummy to trigger according built in count method of - # resamplers when passed to aggregate2freq. For consistency reasons, it works accordingly when + # last is a dummy to trigger according built in count method of resamplers when + # passed to aggregate2freq. For consistency reasons, it works accordingly when # applied directly: return ts.last() @@ -126,14 +126,16 @@ def kNN(in_arr, n_neighbors, algorithm="ball_tree", metric="minkowski", p=2): def maxGap(in_arr): """ - Search for the maximum gap in an array of sorted distances (func for scoring kNN distance matrice) + Search for the maximum gap in an array of sorted distances (func for scoring kNN + distance matrice) """ return max(in_arr[0], max(np.diff(in_arr))) @nb.njit def _maxConsecutiveNan(arr, max_consec): - # checks if arr (boolean array) has not more then "max_consec" consecutive True values + # checks if arr (boolean array) has not more then "max_consec" consecutive True + # values current = 0 idx = 0 while idx < arr.size: @@ -148,8 +150,9 @@ def _maxConsecutiveNan(arr, max_consec): def validationTrafo(data, max_nan_total, max_nan_consec): - # data has to be boolean. False=Valid Value, True=invalid Value - # function returns True-array of input array size for invalid input arrays False array for valid ones + # data has to be boolean. False=Valid Value, True=invalid Value function returns + # True-array of input array size for invalid input arrays False array for valid + # ones data = data.copy() if (max_nan_total is np.inf) & (max_nan_consec is np.inf): return data @@ -192,10 +195,11 @@ def interpolateNANs( data, method, order=2, inter_limit=2, downgrade_interpolation=False ): """ - The function interpolates nan-values (and nan-grids) in timeseries data. It can be passed all the method keywords - from the pd.Series.interpolate method and will than apply this very methods. Note, that the inter_limit keyword - really restricts the interpolation to chunks, not containing more than "inter_limit" nan entries - (thereby not being identical to the "limit" keyword of pd.Series.interpolate). + The function interpolates nan-values (and nan-grids) in timeseries data. It can + be passed all the method keywords from the pd.Series.interpolate method and will + than apply this very methods. Note, that the inter_limit keyword really restricts + the interpolation to chunks, not containing more than "inter_limit" nan entries ( + thereby not being identical to the "limit" keyword of pd.Series.interpolate). :param data: pd.Series or np.array. The data series to be interpolated :param method: String. Method keyword designating interpolation method to use. @@ -246,8 +250,8 @@ def interpolateNANs( return x.interpolate(method=wrap_method, order=int(wrap_order)) except (NotImplementedError, ValueError): logger.warning( - f"Interpolation with method {method} is not supported at order {wrap_order}. " - f"and will be performed at order {wrap_order-1}" + f"Interpolation with method {method} is not supported at order " + f"{wrap_order}. and will be performed at order {wrap_order-1}" ) return _interpolWrapper(x, int(wrap_order - 1), wrap_method) elif x.size < 3: @@ -259,7 +263,8 @@ def interpolateNANs( return x data = data.groupby(data.columns[0]).transform(_interpolWrapper) - # squeezing the 1-dimensional frame resulting from groupby for consistency reasons + # squeezing the 1-dimensional frame resulting from groupby for consistency + # reasons data = data.squeeze(axis=1) data.name = dat_name data = data.reindex(pre_index) @@ -287,7 +292,8 @@ def aggregate2Freq( "fagg": lambda _: (0, "right", "right"), } - # filter data for invalid patterns (since filtering is expensive we pre-check if it is demanded) + # filter data for invalid patterns (since filtering is expensive we pre-check if + # it is demanded) if max_invalid_total is not None or max_invalid_consec is not None: if pd.isna(fill_value): temp_mask = data.isna() @@ -305,29 +311,32 @@ def aggregate2Freq( base, label, closed = methods[method](seconds_total) # In the following, we check for empty intervals outside resample.apply, because: - # - resample AND groupBy do insert value zero for empty intervals if resampling with any kind of "sum" application - - # we want "fill_value" to be inserted - # - we are aggregating data and flags with this function and empty intervals usually would get assigned BAD - # flag (where resample inserts np.nan or 0) + # - resample AND groupBy do insert value zero for empty intervals if resampling + # with any kind of "sum" application - we want "fill_value" to be inserted - we + # are aggregating data and flags with this function and empty intervals usually + # would get assigned BAD flag (where resample inserts np.nan or 0) data_resampler = data.resample( f"{seconds_total:.0f}s", base=base, closed=closed, label=label ) empty_intervals = data_resampler.count() == 0 - # great performance gain can be achieved, when avoiding .apply and using pd.resampler - # methods instead. (this covers all the basic func aggregations, such as median, mean, sum, count, ...) + # great performance gain can be achieved, when avoiding .apply and using + # pd.resampler methods instead. (this covers all the basic func aggregations, + # such as median, mean, sum, count, ...) try: check_name = re.sub("^nan", "", agg_func.__name__) - # a nasty special case: if function "count" was passed, we not want empty intervals to be replaced by nan: + # a nasty special case: if function "count" was passed, we not want empty + # intervals to be replaced by nan: if check_name == "count": empty_intervals[:] = False data = getattr(data_resampler, check_name)() except AttributeError: data = data_resampler.apply(agg_func) - # since loffset keyword of pandas.resample "discharges" after one use of the resampler (pandas logic), - # we correct the resampled labels offset manually, if necessary. + # since loffset keyword of pandas.resample "discharges" after one use of the + # resampler (pandas logic), we correct the resampled labels offset manually, + # if necessary. if method == "nagg": data.index = data.index.shift(freq=pd.Timedelta(freq) / 2) empty_intervals.index = empty_intervals.index.shift(freq=pd.Timedelta(freq) / 2) @@ -372,14 +381,17 @@ def butterFilter( ---------- x: pd.Series input timeseries + cutoff: float The cutoff-frequency, expressed in multiples of the sampling rate. + nyq: float The niquist-frequency. expressed in multiples if the sampling rate. + fill_method: Literal[‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘spline’, ‘barycentric’, ‘polynomial’] - Fill method to be applied on the data before filtering (butterfilter cant handle ''np.nan''). See - documentation of pandas.Series.interpolate method for details on the methods associated with the different - keywords. + Fill method to be applied on the data before filtering (butterfilter cant + handle ''np.nan''). See documentation of pandas.Series.interpolate method for + details on the methods associated with the different keywords. Returns @@ -465,8 +477,9 @@ def polyRollerNoMissing(in_slice, val_range, center_index, poly_deg): def polyRollerIrregular(in_slice, center_index_ser, poly_deg): - # a function to roll with, for polynomial fitting of data not having an equidistant frequency grid. - # (expects to get passed pandas timeseries), so raw parameter of rolling.apply should be set to False. + # a function to roll with, for polynomial fitting of data not having an + # equidistant frequency grid. (expects to get passed pandas timeseries), + # so raw parameter of rolling.apply should be set to False. x_data = ((in_slice.index - in_slice.index[0]).total_seconds()) / 60 fitted = poly.polyfit(x_data, in_slice.values, poly_deg) center_pos = int(len(in_slice) - center_index_ser[in_slice.index[-1]]) diff --git a/tests/common.py b/tests/common.py index 162da2bb1f2d0a7d5660667d653cd04e32a69eb2..cab0892a247b38bfb0e3ce98850fa7a5602d7497 100644 --- a/tests/common.py +++ b/tests/common.py @@ -8,7 +8,7 @@ import dios from saqc.constants import * from saqc.core import initFlagsLike, Flags - +from saqc.core.history import History, createHistoryFromData TESTNODATA = (np.nan, -9999) @@ -36,6 +36,22 @@ def initData( return di +def dummyHistory( + hist: pd.DataFrame = None, mask: pd.DataFrame = None, meta: list = None +): + if hist is None: + return History() + + if mask is None: + mask = hist.copy() + mask[:] = True + + if meta is None: + meta = [{}] * len(hist.columns) + + return createHistoryFromData(hist, mask, meta, copy=True) + + def writeIO(content): f = io.StringIO() f.write(content) diff --git a/tests/core/test_core.py b/tests/core/test_core.py index 62790d1d142ae1ff61235f21467022b39ab2575b..e99e94e4d6395811b993c60ee18ce65ab6856e4f 100644 --- a/tests/core/test_core.py +++ b/tests/core/test_core.py @@ -5,14 +5,11 @@ import logging import pytest import numpy as np import pandas as pd -import dios from saqc.constants import * from saqc.core import initFlagsLike -from saqc.funcs import flagRange -from saqc.lib import plotting as splot -from saqc import SaQC, register +from saqc import SaQC, flagging from tests.common import initData, flagAll @@ -24,7 +21,7 @@ logging.disable(logging.CRITICAL) OPTIONAL = [False, True] -register(masking="field")(flagAll) +flagging(masking="field")(flagAll) @pytest.fixture @@ -39,7 +36,7 @@ def flags(data, optional): def test_errorHandling(data): - @register(masking="field") + @flagging(masking="field") def raisingFunc(data, field, flags, **kwargs): raise TypeError diff --git a/tests/core/test_history.py b/tests/core/test_history.py index 1dd94b71ef5829acc6c28565838990153503330f..af19090b10ee04b584a86fef32b9e1971168d61e 100644 --- a/tests/core/test_history.py +++ b/tests/core/test_history.py @@ -4,7 +4,8 @@ import pytest import numpy as np import pandas as pd -from saqc.core.history import History +from saqc.core.history import History, createHistoryFromData +from tests.common import dummyHistory # see #GH143 combined backtrack # (adjusted to current implementation) @@ -87,14 +88,16 @@ def check_invariants(hist): assert isinstance(hist, History) assert isinstance(hist.hist, pd.DataFrame) assert isinstance(hist.mask, pd.DataFrame) + assert isinstance(hist.meta, list) assert all( [isinstance(dtype, (float, pd.CategoricalDtype)) for dtype in hist.hist.dtypes] ) assert all(hist.mask.dtypes == bool) + assert all([isinstance(e, dict) for e in hist.meta]) assert hist.hist.columns.equals(hist.mask.columns) assert hist.columns is hist.hist.columns assert hist.index is hist.hist.index - assert len(hist) == len(hist.columns) + assert len(hist) == len(hist.columns) == len(hist.meta) # advanced assert hist.columns.equals(pd.Index(range(len(hist)))) @@ -128,31 +131,18 @@ def is_equal(hist1: History, hist2: History): def test_init(data: np.array): # init df = pd.DataFrame(data, dtype=float) - hist = History(hist=df) - + hist = History(df.index) check_invariants(hist) - # shape would fail - if data is not None: - assert len(hist.index) == data.shape[0] - assert len(hist.columns) == data.shape[1] - assert hist.mask.all(axis=None) - - # check fastpath - fast = History(hist=hist) - check_invariants(fast) - - assert is_equal(hist, fast) - @pytest.mark.parametrize("data", data + [None]) -def test_init_with_mask(data: np.array): +def test_createHistory(data: np.array): # init df = pd.DataFrame(data, dtype=float) mask = pd.DataFrame(data, dtype=bool) - if not mask.empty: - mask.iloc[:, -1] = True - hist = History(hist=df, mask=mask) + mask[:] = True + meta = [{}] * len(df.columns) + hist = createHistoryFromData(df, mask, meta) check_invariants(hist) @@ -161,18 +151,14 @@ def test_init_with_mask(data: np.array): assert len(hist.index) == data.shape[0] assert len(hist.columns) == data.shape[1] - # check fastpath - fast = History(hist=hist) - check_invariants(fast) - - assert is_equal(hist, fast) - @pytest.mark.parametrize("data", data + [None]) def test_copy(data): # init df = pd.DataFrame(data, dtype=float) - hist = History(hist=df) + hist = History(df.index) + for _, s in df.items(): + hist.append(s) shallow = hist.copy(deep=False) deep = hist.copy(deep=True) @@ -188,17 +174,17 @@ def test_copy(data): assert deep.hist is not hist.hist assert deep.mask is not hist.mask - # we need to convert to and from categoricals in order - # to allow all operations on `History`, that way we loose - # the identity - # assert shallow.hist is hist.hist + assert deep.meta is not hist.meta + + assert shallow.hist is hist.hist assert shallow.mask is hist.mask + assert shallow.meta is hist.meta @pytest.mark.parametrize("data", data + [None]) def test_reindex_trivial_cases(data): df = pd.DataFrame(data, dtype=float) - orig = History(hist=df) + orig = dummyHistory(hist=df) # checks for index in [df.index, pd.Index([])]: @@ -211,7 +197,7 @@ def test_reindex_trivial_cases(data): @pytest.mark.parametrize("data", data + [None]) def test_reindex_missing_indicees(data): df = pd.DataFrame(data, dtype=float) - hist = History(hist=df) + hist = dummyHistory(hist=df) index = df.index[1:-1] # checks ref = hist.reindex(index) @@ -222,7 +208,7 @@ def test_reindex_missing_indicees(data): @pytest.mark.parametrize("data", data + [None]) def test_reindex_extra_indicees(data): df = pd.DataFrame(data, dtype=float) - hist = History(hist=df) + hist = dummyHistory(hist=df) index = df.index.append(pd.Index(range(len(df.index), len(df.index) + 5))) # checks ref = hist.reindex(index) @@ -230,12 +216,36 @@ def test_reindex_extra_indicees(data): check_invariants(hist) +@pytest.mark.parametrize( + "s, meta", + [ + (pd.Series(0, index=range(6), dtype=float), None), + (pd.Series(0, index=range(6), dtype=float), {}), + (pd.Series(1, index=range(6), dtype=float), {"foo": "bar"}), + ], +) +def test_append_with_meta(s, meta): + hist = History(s.index) + hist.append(s, meta=meta) + check_invariants(hist) + + if meta is None: + meta = {} + + assert hist.meta[0] is not meta + assert hist.meta == [meta] + + hist.append(s, meta=meta) + check_invariants(hist) + assert hist.meta == [meta, meta] + + @pytest.fixture(scope="module") def __hist(): # this FH is filled by # - test_append # - test_append_force - return History() + return History(index=pd.Index(range(6))) @pytest.mark.parametrize( @@ -271,31 +281,3 @@ def test_append_force(__hist, s, max_val): hist.append(s, force=True) check_invariants(hist) assert all(hist.max() == max_val) - - -def test_squeeze(): - # init - d, m, exp = example2 - d = pd.DataFrame(d, dtype=float) - m = pd.DataFrame(m, dtype=bool) - orig = History(hist=d, mask=m) - - check_invariants(orig) - assert all(orig.max() == exp) - - # checks - - for n in range(len(orig) + 1): - hist = orig.copy() - hist.squeeze(n) - - check_invariants(hist) - - # squeeze for less then 2 rows does nothing - if n < 2: - assert is_equal(hist, orig) - else: - assert len(hist) == len(orig) - n + 1 - - # result does not change - assert all(hist.max() == exp) diff --git a/tests/core/test_reader.py b/tests/core/test_reader.py index 819da65cdd7001ecb3365947cf33ee3b33055322..10c4c048ee04d778907fe9521014c0e774683bc9 100644 --- a/tests/core/test_reader.py +++ b/tests/core/test_reader.py @@ -9,7 +9,7 @@ from pathlib import Path from saqc.core.config import Fields as F from saqc.core.core import SaQC -from saqc.core.register import FUNC_MAP, register +from saqc.core.register import FUNC_MAP, flagging from tests.common import initData, writeIO @@ -108,7 +108,7 @@ def test_configChecks(data): var1, _, var3, *_ = data.columns - @register(masking="none") + @flagging(masking="none") def flagFunc(data, field, flags, arg, opt_arg=None, **kwargs): return data, flags @@ -133,7 +133,7 @@ def test_supportedArguments(data): # TODO: necessary? - @register(masking="field") + @flagging(masking="field") def func(data, field, flags, kwarg, **kwargs): return data, flags diff --git a/tests/core/test_translator.py b/tests/core/test_translator.py index cd002c115e71b9bf4cbef883ca85c5b253b9adb7..1ed154fdfdc6c84e086921f428367ae1f279a41a 100644 --- a/tests/core/test_translator.py +++ b/tests/core/test_translator.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import json +from saqc.core.register import FUNC_MAP from typing import Dict, Union, Sequence import numpy as np @@ -67,7 +68,7 @@ def test_backwardTranslation(): for _, translator in _genTranslators(): keys = tuple(translator._backward.keys()) flags = _genFlags({field: np.array(keys)}) - translated = translator.backward(flags, None) + translated = translator.backward(flags) expected = set(translator._backward.values()) assert not (set(translated[field]) - expected) @@ -79,35 +80,32 @@ def test_backwardTranslationFail(): # add an scheme invalid value to the flags flags = _genFlags({field: np.array(keys + (max(keys) + 1,))}) with pytest.raises(ValueError): - translator.backward(flags, None) + translator.backward(flags) -@pytest.mark.skip(reason="dmp translator implementation is currently blocked") def test_dmpTranslator(): translator = DmpTranslator() + # generate a bunch of dummy flags keys = np.array(tuple(translator._backward.keys()) * 50) flags = _genFlags({"var1": keys, "var2": keys, "var3": keys}) flags[:, "var1"] = BAD flags[:, "var1"] = DOUBTFUL flags[:, "var2"] = BAD - to_call = [ - # the initial columns - (ColumnSelector("var1"), SaQCFunction("flagInit", flagDummy)), - (ColumnSelector("var2"), SaQCFunction("flagInit", flagDummy)), - ( - ColumnSelector("var3"), - SaQCFunction("flagInit", flagDummy, comment="initial flags"), - ), - (ColumnSelector("var1"), SaQCFunction("flagFoo", flagDummy)), - ( - ColumnSelector("var1"), - SaQCFunction("flagBar", flagDummy, comment="I did it"), - ), - (ColumnSelector("var2"), SaQCFunction("flagFoo", flagDummy)), - ] - tflags = translator.backward(flags, to_call) + history1 = flags.history["var1"] + history1.meta[1].update({"func": "flagFoo", "keywords": {"cause": "AUTOFLAGGED"}}) + history1.meta[2].update({"func": "flagBar", "keywords": {"comment": "I did it"}}) + flags.history["var1"] = history1 + + history2 = flags.history["var2"] + history2.meta[-1].update( + {"func": "flagFoo", "keywords": {"cause": "BELOW_OR_ABOVE_MIN_MAX"}} + ) + flags.history["var2"] = history2 + + tflags = translator.backward(flags) + assert set(tflags.columns.get_level_values(1)) == { "quality_flag", "quality_comment", @@ -119,25 +117,29 @@ def test_dmpTranslator(): tflags.loc[:, ("var1", "quality_comment")] == '{"test": "flagBar", "comment": "I did it"}' ).all(axis=None) - assert (tflags.loc[:, ("var1", "quality_cause")] == "OTHER").all(axis=None) + + assert (tflags.loc[:, ("var1", "quality_cause")] == "AUTOFLAGGED").all(axis=None) assert (tflags.loc[:, ("var2", "quality_flag")] == "BAD").all(axis=None) assert ( tflags.loc[:, ("var2", "quality_comment")] == '{"test": "flagFoo", "comment": ""}' ).all(axis=None) - assert (tflags.loc[:, ("var2", "quality_cause")] == "OTHER").all(axis=None) + assert (tflags.loc[:, ("var2", "quality_cause")] == "BELOW_OR_ABOVE_MIN_MAX").all( + axis=None + ) assert ( tflags.loc[flags["var3"] == BAD, ("var3", "quality_comment")] - == '{"test": "flagInit", "comment": "initial flags"}' + == '{"test": "unknown", "comment": ""}' + ).all(axis=None) + assert ( + tflags.loc[flags["var3"] == BAD, ("var3", "quality_cause")] == "AUTOFLAGGED" + ).all(axis=None) + assert ( + tflags.loc[flags["var3"] == UNFLAGGED, ("var3", "quality_cause")] + == "AUTOFLAGGED" ).all(axis=None) - assert (tflags.loc[flags["var3"] == BAD, ("var3", "quality_cause")] == "OTHER").all( - axis=None - ) - assert (tflags.loc[flags["var3"] < DOUBTFUL, ("var3", "quality_cause")] == "").all( - axis=None - ) def test_positionalTranslator(): @@ -147,7 +149,7 @@ def test_positionalTranslator(): flags[1::3, "var1"] = DOUBTFUL flags[2::3, "var1"] = BAD - tflags = translator.backward(flags, None) # type: ignore + tflags = translator.backward(flags) assert (tflags["var2"].replace(-9999, np.nan).dropna() == 90).all(axis=None) assert (tflags["var1"].iloc[1::3] == 90210).all(axis=None) assert (tflags["var1"].iloc[2::3] == 90002).all(axis=None) @@ -168,14 +170,13 @@ def test_positionalTranslatorIntegration(): for field in flags.columns: assert flags[field].astype(str).str.match("^9[012]*$").all() - round_trip = translator.backward(*translator.forward(flags)) + round_trip = translator.backward(translator.forward(flags)) assert (flags.values == round_trip.values).all() assert (flags.index == round_trip.index).all() assert (flags.columns == round_trip.columns).all() -@pytest.mark.skip(reason="dmp translator implementation is currently blocked") def test_dmpTranslatorIntegration(): data = initData(1) @@ -188,15 +189,15 @@ def test_dmpTranslatorIntegration(): qflags = flags.xs("quality_flag", axis="columns", level=1) qfunc = flags.xs("quality_comment", axis="columns", level=1).applymap( - lambda v: json.loads(v)["test"] + lambda v: json.loads(v)["test"] if v else "" ) qcause = flags.xs("quality_cause", axis="columns", level=1) assert qflags.isin(translator._forward.keys()).all(axis=None) assert qfunc.isin({"", "breaks.flagMissing", "outliers.flagRange"}).all(axis=None) - assert (qcause[qflags[col] == "BAD"] == "OTHER").all(axis=None) + assert (qcause[qflags[col] == "BAD"] == "AUTOFLAGGED").all(axis=None) - round_trip = translator.backward(*translator.forward(flags)) + round_trip = translator.backward(translator.forward(flags)) assert round_trip.xs("quality_flag", axis="columns", level=1).equals(qflags) @@ -209,23 +210,20 @@ def test_dmpTranslatorIntegration(): ) -@pytest.mark.skip(reason="dmp translator implementation is currently blocked") -def test_dmpValidCause(): +def test_dmpValidCombinations(): data = initData(1) col = data.columns[0] translator = DmpTranslator() saqc = SaQC(data=data, scheme=translator) - saqc = saqc.outliers.flagRange(col, min=3, max=10, cause="SOMETHING_STUPID") + with pytest.raises(ValueError): - data, flags = saqc.getResult() + saqc.outliers.flagRange( + col, min=3, max=10, cause="SOMETHING_STUPID" + ).getResult() - saqc = saqc.outliers.flagRange(col, min=3, max=10, cause="BELOW_OR_ABOVE_MIN_MAX") - data, flags = saqc.getResult() - qflags = flags.xs("quality_flag", axis="columns", level=1) - qcause = flags.xs("quality_cause", axis="columns", level=1) - assert (qcause[qflags[col] == "BAD"] == "BELOW_OR_ABOVE_MIN_MAX").all(axis=None) - assert (qcause[qflags[col] != "BAD"] == "").all(axis=None) + with pytest.raises(ValueError): + saqc.outliers.flagRange(col, min=3, max=10, cause="").getResult() def _buildupSaQCObjects(): @@ -258,37 +256,39 @@ def test_translationPreservesFlags(): _, flags2 = saqc2.getResult(raw=True) for k in flags2.columns: - got = flags2.history[k].hist.iloc[:, 1:] + got = flags2.history[k].hist - f1hist = flags1.history[k].hist.iloc[:, 1:] + f1hist = flags1.history[k].hist expected = pd.concat([f1hist, f1hist], axis="columns") expected.columns = got.columns assert expected.equals(got) -def test_callHistoryYieldsSameResults(): +def test_reproducibleMetadata(): # a simple SaQC run data = initData(3) col = data.columns[0] - saqc1 = SaQC(data=data) + saqc1 = SaQC(data=data, lazy=True) saqc1 = saqc1.breaks.flagMissing(col, to_mask=False).outliers.flagRange( col, min=3, max=10, to_mask=False ) _, flags1 = saqc1.getResult(raw=True) - # generate a dummy call history from flags - translator = FloatTranslator() - graph = translator.buildGraph(flags1) - saqc2 = SaQC(data=data) + saqc2 = SaQC(data=data, lazy=True) - # convert the call history into an excution plan and inject into a blank SaQC object - saqc2._planned = [(s, APIController(), f) for s, f in graph] - # replay the functions - _, flags2 = saqc2.getResult() + # convert the history meta data into an excution plan... + graph = [] + for m in flags1.history[col].meta: + func = FUNC_MAP[m["func"]].bind(*m["args"], **m["keywords"]) + graph.append((ColumnSelector(col), APIController(), func)) + # ... and inject into a blank SaQC object + saqc2._planned = graph + # ... replay the functions + _, flags2 = saqc2.getResult(raw=True) - assert flags2.equals(flags1.toFrame()) + assert flags2.toFrame().equals(flags1.toFrame()) def test_multicallsPreserveHistory(): @@ -298,8 +298,8 @@ def test_multicallsPreserveHistory(): # check, that the `History` is duplicated for col in flags2.columns: - hist1 = flags1.history[col].hist.loc[:, 1:] - hist2 = flags2.history[col].hist.loc[:, 1:] + hist1 = flags1.history[col].hist + hist2 = flags2.history[col].hist hist21 = hist2.iloc[:, : len(hist1.columns)] hist22 = hist2.iloc[:, len(hist1.columns) :] @@ -311,8 +311,6 @@ def test_multicallsPreserveHistory(): assert hist1.equals(hist22) assert hist21.equals(hist22) - assert len(saqc2._computed) == len(saqc1._computed) * 2 - def test_positionalMulitcallsPreserveState(): @@ -321,41 +319,10 @@ def test_positionalMulitcallsPreserveState(): translator = PositionalTranslator() _, flags1 = saqc1.getResult(raw=True) _, flags2 = saqc2.getResult(raw=True) - tflags1 = translator.backward(flags1, saqc1._computed).astype(str) - tflags2 = translator.backward(flags2, saqc2._computed).astype(str) + tflags1 = translator.backward(flags1).astype(str) + tflags2 = translator.backward(flags2).astype(str) for k in flags2.columns: expected = tflags1[k].str.slice(start=1) * 2 got = tflags2[k].str.slice(start=1) assert expected.equals(got) - - -@pytest.mark.skip(reason="dmp translator implementation is currently blocked") -def test_smpTranslatorHandlesRenames(): - - data = initData(3) - - this: str = data.columns[0] - other: str = this + "_new" - - saqc = ( - SaQC(data=data) - .outliers.flagRange(this, min=1, max=10) - .tools.rename(this, other) - .breaks.flagMissing(other, min=4, max=6) - ) - saqc = saqc.evaluate() - - this_funcs = DmpTranslator._getFieldFunctions(this, saqc._computed) - other_funcs = DmpTranslator._getFieldFunctions(other, saqc._computed) - - assert [f.name for f in this_funcs] == [ - "", - "outliers.flagRange", - "tools.rename", - "breaks.flagMissing", - ] - - # we skip the first function in both lists, as they are dummy functions - # inserted to allow a proper replay of all function calls - assert this_funcs[1:] == other_funcs[1:] diff --git a/tests/funcs/test_constants_detection.py b/tests/funcs/test_constants_detection.py index 302672611e88854a465630512ab7b5aa77e0d76b..7f3916856b369f336b7490a0bbdf81e72eb8fc2c 100644 --- a/tests/funcs/test_constants_detection.py +++ b/tests/funcs/test_constants_detection.py @@ -32,6 +32,7 @@ def test_constants_flagBasic(data): assert np.all(flagscol[25 + 1 :] == UNFLAGGED) +@pytest.mark.skip(reason="see Issue: https://git.ufz.de/rdm-software/saqc/-/issues/220") def test_constants_flagVarianceBased(data): expected = np.arange(5, 25) field, *_ = data.columns diff --git a/tests/funcs/test_generic_api_functions.py b/tests/funcs/test_generic_api_functions.py index 710fbecea3dea8a1faea2302972e0a2d03ea0476..f5c10b90cb8b9fed61ae0acd6b77f30f74f094e3 100644 --- a/tests/funcs/test_generic_api_functions.py +++ b/tests/funcs/test_generic_api_functions.py @@ -5,14 +5,14 @@ import pytest import pandas as pd from saqc.constants import * -from saqc.core.register import register +from saqc.core.register import flagging from saqc.funcs.tools import mask from saqc import SaQC from tests.common import initData, flagAll -register(masking="field")(flagAll) +flagging(masking="field")(flagAll) @pytest.fixture diff --git a/tests/funcs/test_generic_config_functions.py b/tests/funcs/test_generic_config_functions.py index 1c560db44cf0f44f9cb64cb620b943676d0c0f25..b5f5aa0e18db842d44e28895f1d2f7a0dd7bb6b2 100644 --- a/tests/funcs/test_generic_config_functions.py +++ b/tests/funcs/test_generic_config_functions.py @@ -11,7 +11,7 @@ from saqc.constants import * from saqc.core import initFlagsLike, Flags from saqc.core.visitor import ConfigFunctionParser from saqc.core.config import Fields as F -from saqc.core.register import register +from saqc.core.register import flagging from saqc.funcs.generic import _execGeneric from saqc import SaQC @@ -281,7 +281,7 @@ def test_callableArgumentsUnary(data): window = 5 - @register(masking="field") + @flagging(masking="field") def testFuncUnary(data, field, flags, func, **kwargs): data[field] = data[field].rolling(window=window).apply(func) return data, initFlagsLike(data) @@ -310,7 +310,7 @@ def test_callableArgumentsUnary(data): def test_callableArgumentsBinary(data): var1, var2 = data.columns[:2] - @register(masking="field") + @flagging(masking="field") def testFuncBinary(data, field, flags, func, **kwargs): data[field] = func(data[var1], data[var2]) return data, initFlagsLike(data) diff --git a/tests/funcs/test_harm_funcs.py b/tests/funcs/test_harm_funcs.py index fc229d261fcc80d80b63ca6a4e72a393e094e264..958671b8f33e1cf3a42492a0a5b930c25b941880 100644 --- a/tests/funcs/test_harm_funcs.py +++ b/tests/funcs/test_harm_funcs.py @@ -64,7 +64,8 @@ def test_gridInterpolation(data, method): data = dios.DictOfSeries(data) flags = initFlagsLike(data) - # we are just testing if the interpolation gets passed to the series without causing an error: + # we are just testing if the interpolation gets passed to the series without + # causing an error: res = interpolate( data, field, flags, freq, method=method, downcast_interpolation=True ) @@ -320,10 +321,13 @@ def test_harmSingleVarInterpolationShift(data, params, expected): data_harm, flags_harm = copy(data, "data", flags, "data_harm") data_harm, flags_harm = shift(data_harm, h_field, flags_harm, freq, method=method) assert data_harm[h_field].equals(expected) + checkDataFlagsInvariants(data_harm, flags_harm, field, identical=True) data_deharm, flags_deharm = reindexFlags( data_harm, field, flags_harm, source=h_field, method="inverse_" + method ) + checkDataFlagsInvariants(data_deharm, flags_deharm, field, identical=True) + data_deharm, flags_deharm = drop(data_deharm, h_field, flags_deharm) assert data_deharm[field].equals(pre_data[field]) assert flags_deharm[field].equals(pre_flags[field]) diff --git a/tests/fuzzy/test_masking.py b/tests/fuzzy/test_masking.py index d0875906fa6436d00115c01ae2d027e1321d6f21..c066997679e9f37c7a85ca59cd4cfdd91a74616a 100644 --- a/tests/fuzzy/test_masking.py +++ b/tests/fuzzy/test_masking.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import logging +from saqc.core.lib import SaQCFunction import pandas as pd @@ -42,12 +43,13 @@ def test_dataMutationPreventsUnmasking(data_field_flags): data_in, field, flags = data_field_flags data_masked, mask = _maskData(data_in, flags, columns=[field], thresh=UNFLAGGED) state = CallState( - func=None, + func=lambda x: x, + func_name="", data=data_in, flags=flags, field=field, - args=None, - kwargs=None, + args=(), + kwargs={}, masking="field", mthresh=UNFLAGGED, mask=mask, @@ -68,12 +70,13 @@ def test_flagsMutationPreventsUnmasking(data_field_flags): data_in, field, flags = data_field_flags data_masked, mask = _maskData(data_in, flags, columns=[field], thresh=UNFLAGGED) state = CallState( - func=None, + func=lambda x: x, + func_name="", data=data_in, flags=flags, field=field, - args=None, - kwargs=None, + args=(), + kwargs={}, masking="field", mthresh=UNFLAGGED, mask=mask, @@ -97,12 +100,13 @@ def test_reshapingPreventsUnmasking(data_field_flags): data_in, field, flags = data_field_flags data_masked, mask = _maskData(data_in, flags, columns=[field], thresh=UNFLAGGED) state = CallState( - func=None, + func=lambda x: x, + func_name="", data=data_in, flags=flags, field=field, - args=None, - kwargs=None, + args=(), + kwargs={}, masking="field", mthresh=UNFLAGGED, mask=mask, @@ -129,12 +133,13 @@ def test_unmaskingInvertsMasking(data_field_flags): data_in, field, flags = data_field_flags data_masked, mask = _maskData(data_in, flags, columns=[field], thresh=UNFLAGGED) state = CallState( - func=None, + func=lambda x: x, + func_name="", data=data_in, flags=flags, field=field, - args=None, - kwargs=None, + args=(), + kwargs={}, masking="field", mthresh=UNFLAGGED, mask=mask,