From c3e4ec4249e477f06107be61ba8f7b028f56004b Mon Sep 17 00:00:00 2001 From: Bert Palm <bert.palm@ufz.de> Date: Mon, 19 Dec 2022 09:09:52 +0100 Subject: [PATCH] harden and improve History --- saqc/core/flags.py | 2 +- saqc/core/history.py | 335 +++++++++++++++++++++++++------------ saqc/core/register.py | 10 +- saqc/funcs/resampling.py | 2 +- tests/core/test_history.py | 6 +- 5 files changed, 239 insertions(+), 116 deletions(-) diff --git a/saqc/core/flags.py b/saqc/core/flags.py index 48482a703..a303c05f8 100644 --- a/saqc/core/flags.py +++ b/saqc/core/flags.py @@ -147,7 +147,7 @@ class Flags: 0 True 1 False 2 True - Name: 2, dtype: bool + dtype: bool .. doctest:: exampleFlags diff --git a/saqc/core/history.py b/saqc/core/history.py index 37285b5dc..e9cf472bf 100644 --- a/saqc/core/history.py +++ b/saqc/core/history.py @@ -8,10 +8,11 @@ from __future__ import annotations from copy import copy as shallowcopy from copy import deepcopy -from typing import Any, Callable, Dict, List, Tuple, Union +from typing import Any, Callable, Dict, List, Tuple import numpy as np import pandas as pd +from pandas.api.types import is_categorical_dtype, is_float_dtype from saqc.constants import UNFLAGGED @@ -45,8 +46,32 @@ class History: def __init__(self, index: pd.Index | None): - self.hist = pd.DataFrame(index=index) - self.meta = [] + self._hist = pd.DataFrame(index=index) + self._meta = [] + + @property + def hist(self): + return self._hist.astype(float, copy=True) + + @hist.setter + def hist(self, value: pd.DataFrame) -> None: + self._validateHist(value) + if len(value.columns) != len(self._meta): + raise ValueError( + "passed history does not match existing meta. " + "To use a new `hist` with new `meta` use " + "'History.createFromData(new_hist, new_meta)'" + ) + self._hist = value.astype("category", copy=True) + + @property + def meta(self) -> list[dict[str, Any]]: + return list(self._meta) + + @meta.setter + def meta(self, value: list[dict[str, Any]]) -> None: + self._validateMetaList(value, self._hist) + self._meta = deepcopy(value) @property def index(self) -> pd.Index: @@ -66,7 +91,7 @@ class History: ------- index : pd.Index """ - return self.hist.index + return self._hist.index @property def columns(self) -> pd.Index: @@ -80,7 +105,7 @@ class History: ------- columns : pd.Index """ - return self.hist.columns + return self._hist.columns @property def empty(self) -> bool: @@ -118,15 +143,11 @@ class History: # all following code must handle a passed empty series # ensure continuous increasing columns - assert 0 <= pos <= len(self) - - self.hist[pos] = s.astype("category") - + assert 0 <= pos <= len(self.columns) + self._hist[pos] = s.astype("category") return self - def append( - self, value: Union[pd.Series, History], meta: dict | None = None - ) -> History: + def append(self, value: pd.Series | History, meta: dict | None = None) -> History: """ Create a new FH column and insert given pd.Series to it. @@ -157,8 +178,7 @@ class History: if meta is None: meta = {} - - if not isinstance(meta, dict): + elif not isinstance(meta, dict): raise TypeError("'meta' must be of type None or dict") val = self._validateValue(value) @@ -166,10 +186,10 @@ class History: raise ValueError("Index does not match") self._insert(val, pos=len(self)) - self.meta.append(meta.copy()) + self._meta.append(meta.copy()) return self - def _appendHistory(self, value: History): + def _appendHistory(self, value: History) -> History: """ Append multiple columns of a history to self. @@ -190,44 +210,107 @@ class History: ----- This ignores the column names of the passed History. """ - self._validate(value.hist, value.meta) + self._validate(value._hist, value._meta) if not value.index.equals(self.index): raise ValueError("Index does not match") # we copy shallow because we only want to set new columns # the actual data copy happens in calls to astype - value_hist = value.hist.copy(deep=False) - value_meta = value.meta.copy() + value_hist = value._hist.copy(deep=False) + value_meta = value._meta.copy() # rename columns, to avoid ``pd.DataFrame.loc`` become confused n = len(self.columns) columns = pd.Index(range(n, n + len(value_hist.columns))) value_hist.columns = columns - hist = self.hist.astype(float) + hist = self._hist.astype(float) hist.loc[:, columns] = value_hist.astype(float) - self.hist = hist.astype("category") - self.meta += value_meta + self._hist = hist.astype("category") + self._meta += value_meta return self - def squeeze(self, raw=False) -> pd.Series: + def squeeze( + self, raw: bool = False, start: int | None = None, end: int | None = None + ) -> pd.Series: """ - Get the last flag value per row of the FH. + Reduce history to a series, by taking the last set value per row. + + By passing `start` and/or `end` only a slice of the history is used. + This can be used to get the values of an earlier test. See the + Examples. + + Parameters + ---------- + raw : bool, default False + If True, 'unset' values are represented by `nan`, + otherwise, 'unset' values are represented by the + `UNFLAGGED` (`-inf`) constant + + start : int, default None + The first history column to use (inclusive). + + end : int, default None + The last history column to use (exclusive). Returns ------- - pd.Series + pandas.Series + + Examples + -------- + >>> from saqc.core.history import History + >>> s0 = pd.Series([np.nan, np.nan, 99.]) + >>> s1 = pd.Series([1., 1., np.nan]) + >>> s2 = pd.Series([2., np.nan, 2.]) + >>> h = History(pd.Index([0,1,2])).append(s0).append(s1).append(s2) + >>> h + 0 1 2 + 0 nan 1.0 2.0 + 1 nan 1.0 nan + 2 99.0 nan 2.0 + + Get current flags. + + >>> h.squeeze() + 0 2.0 + 1 1.0 + 2 2.0 + dtype: float64 + + Get only the flags that the last function had set: + + >>> h.squeeze(start=-1) + 0 2.0 + 1 -inf + 2 2.0 + dtype: float64 + + Get the flags before the last function run: + + >>> h.squeeze(end=-1) + 0 1.0 + 1 1.0 + 2 99.0 + dtype: float64 + + Get only the flags that the 2nd function had set: + + >>> h.squeeze(start=1, end=2) + 0 1.0 + 1 1.0 + 2 -inf + dtype: float64 """ - result = self.hist.astype(float) - if result.empty: - result = pd.DataFrame(data=np.nan, index=self.hist.index, columns=[0]) - - result = result.ffill(axis=1).iloc[:, -1] - - if raw: - return result + hist = self._hist.iloc[:, slice(start, end)].astype(float) + if hist.empty: + result = pd.Series(data=np.nan, index=self._hist.index, dtype=float) else: - return result.fillna(UNFLAGGED) + result = hist.ffill(axis=1).iloc[:, -1] + if not raw: + result = result.fillna(UNFLAGGED) + result.name = None + return result def reindex( self, index: pd.Index, fill_value_last: float = UNFLAGGED, copy: bool = True @@ -251,17 +334,11 @@ class History: ------- History """ + # Note: code must handle empty frames out = self.copy() if copy else self - - hist = out.hist.astype(float).reindex( - index=index, copy=False, fill_value=np.nan - ) - - # Note: all following code must handle empty frames + hist = out._hist.astype(float).reindex(index=index, copy=False) hist.iloc[:, -1:] = hist.iloc[:, -1:].fillna(fill_value_last) - - out.hist = hist.astype("category") - + out._hist = hist.astype("category") return out def apply( @@ -271,7 +348,7 @@ class History: func_kws: dict, func_handle_df: bool = False, copy: bool = True, - ): + ) -> History: """ Apply a function on each column in history. @@ -309,27 +386,31 @@ class History: Returns ------- - history with altered columns + History with altered columns """ hist = pd.DataFrame(index=index) - # implicit copy by astype - # convert data to floats as functions may fail with categoricals + # convert data to floats as functions may fail with categorical dtype if func_handle_df: - hist = func(self.hist.astype(float), **func_kws) + hist = func(self._hist.astype(float, copy=True), **func_kws) else: for pos in self.columns: - hist[pos] = func(self.hist[pos].astype(float), **func_kws) + hist[pos] = func(self._hist[pos].astype(float, copy=True), **func_kws) - History._validate(hist, self.meta) + try: + self._validate(hist, self._meta) + except Exception as e: + raise ValueError( + f"result from applied function is not a valid History, because {e}" + ) from e if copy: history = History(index=None) # noqa - history.meta = self.meta.copy() + history._meta = self._meta.copy() else: history = self - history.hist = hist.astype("category") + history._hist = hist.astype("category") return history @@ -350,8 +431,8 @@ class History: """ copyfunc = deepcopy if deep else shallowcopy new = History(self.index) - new.hist = self.hist.copy(deep) - new.meta = copyfunc(self.meta) + new._hist = self._hist.copy(deep) + new._meta = copyfunc(self._meta) return new def __copy__(self): @@ -367,14 +448,14 @@ class History: return self.copy(deep=True) def __len__(self) -> int: - return len(self.hist.columns) + return len(self._hist.columns) def __repr__(self): if self.empty: - return str(self.hist).replace("DataFrame", "History") + return str(self._hist).replace("DataFrame", "History") - r = self.hist.astype(str) + r = self._hist.astype(str) return str(r)[1:] @@ -382,51 +463,62 @@ class History: # validation # - @staticmethod - def _validate(hist: pd.DataFrame, meta: List[Any]) -> Tuple[pd.DataFrame, List]: + @classmethod + def _validate( + cls, hist: pd.DataFrame, meta: List[Any] + ) -> Tuple[pd.DataFrame, List]: """ check type, columns, index, dtype of hist and if the meta fits also """ + cls._validateHist(hist) + cls._validateMetaList(meta, hist) + return hist, meta - # check hist - if not isinstance(hist, pd.DataFrame): + @classmethod + def _validateHist(cls, obj): + if not isinstance(obj, pd.DataFrame): raise TypeError( - f"'hist' must be of type pd.DataFrame, but is of type {type(hist).__name__}" + f"'hist' must be of type pd.DataFrame, " + f"but is of type {type(obj).__name__}" ) - # isin([float, ..]) does not work ! - if not ( - (hist.dtypes == float) - | (hist.dtypes == np.float32) - | (hist.dtypes == np.float64) - | (hist.dtypes == "category") - ).all(): + if not obj.columns.equals(pd.RangeIndex(len(obj.columns))): raise ValueError( - "dtype of all columns in hist must be float or categorical" - ) - - if not hist.empty and ( - not hist.columns.equals(pd.Index(range(len(hist.columns)))) - or not np.issubdtype(hist.columns.dtype, np.integer) - ): - raise ValueError( - "column names must be continuous increasing int's, starting with 0." + "Columns of 'hist' must consist of " + "continuous increasing integers, " + "starting with 0." ) + for c in obj.columns: + try: + cls._validateValue(obj[c]) + except Exception as e: + raise ValueError(f"Bad column in hist. column '{c}': {e}") from None + return obj - # check meta - if not isinstance(meta, list): + @classmethod + def _validateMetaList(cls, obj, hist=None): + if not isinstance(obj, list): raise TypeError( - f"'meta' must be of type list, but is of type {type(meta).__name__}" - ) - if not all([isinstance(e, dict) for e in meta]): - raise TypeError("All elements in meta must be of type 'dict'") - - # check combinations of hist and meta - if not len(hist.columns) == len(meta): - raise ValueError( - "'meta' must have as many entries as columns exist in hist" + f"'meta' must be of type list, got type {type(obj).__name__}" ) + if hist is not None: + if not len(obj) == len(hist.columns): + raise ValueError( + "'meta' must have as many entries as columns in 'hist'" + ) + for i, item in enumerate(obj): + try: + cls._validateMetaDict(item) + except Exception as e: + raise ValueError(f"Bad meta. item {i}: {e}") from None + return obj - return hist, meta + @staticmethod + def _validateMetaDict(obj): + if not isinstance(obj, dict): + raise TypeError("obj must be dict") + if not all(isinstance(k, str) for k in obj.keys()): + raise ValueError("all keys in dict must be strings") + return obj @staticmethod def _validateValue(obj: pd.Series) -> pd.Series: @@ -435,14 +527,52 @@ class History: """ if not isinstance(obj, pd.Series): raise TypeError( - f"value must be of type pd.Series, but {type(obj).__name__} was given" + f"value must be of type pd.Series, got type {type(obj).__name__}" ) - - if not ((obj.dtype == float) or isinstance(obj.dtype, pd.CategoricalDtype)): + if not is_float_dtype(obj.dtype) and not is_categorical_dtype(obj.dtype): raise ValueError("dtype must be float or categorical") - return obj + @classmethod + def createFromData(cls, hist: pd.DataFrame, meta: List[Dict], copy: bool = False): + """ + Create a History from existing data. + + Parameters + ---------- + hist : pd.Dataframe + Data that define the flags of the history. + + meta : List of dict + A list holding meta information for each column, therefore it must + have the same number of entries as columns exist in `hist`. + + copy : bool, default False + If `True`, the input data is copied, otherwise not. + + + Notes + ----- + To create a very simple History from a flags dataframe ``f`` use + ``mask = pd.DataFrame(True, index=f.index, columns=f.columns`` + and + ``meta = [{}] * len(f.columns)``. + + Returns + ------- + History + """ + cls._validate(hist, meta) + + if copy: + hist = hist.copy() + meta = deepcopy(meta) + + history = cls(index=None) # noqa + history._hist = hist.astype("category", copy=False) + history._meta = meta + return history + def createHistoryFromData( hist: pd.DataFrame, @@ -476,13 +606,10 @@ def createHistoryFromData( ------- History """ - History._validate(hist, meta) - - if copy: - hist = hist.copy() - meta = deepcopy(meta) - - history = History(index=None) # noqa - history.hist = hist.astype("category", copy=False) - history.meta = meta - return history + # todo: expose History, enable this warning + # warnings.warn( + # "saqc.createHistoryFromData() will be deprecated soon. " + # "Please use saqc.History.createFromData() instead.", + # category=FutureWarning, + # ) + return History.createFromData(hist, meta, copy) diff --git a/saqc/core/register.py b/saqc/core/register.py index 2fc17c19f..cdbcc48a3 100644 --- a/saqc/core/register.py +++ b/saqc/core/register.py @@ -147,20 +147,12 @@ def _squeezeFlags(old_flags, new_flags: Flags, columns: pd.Index, meta) -> Flags # function call. If no such columns exist, we end up with an empty # new_history. start = len(old_history.columns) - new_history = _sliceHistory(new_history, slice(start, None)) - - squeezed = new_history.squeeze(raw=True) + squeezed = new_history.squeeze(raw=True, start=start) out.history[col] = out.history[col].append(squeezed, meta=meta) return out -def _sliceHistory(history: History, sl: slice) -> History: - history.hist = history.hist.iloc[:, sl] - history.meta = history.meta[sl] - return history - - def _maskData( data: dios.DictOfSeries, flags: Flags, columns: Sequence[str], thresh: float ) -> Tuple[dios.DictOfSeries, dios.DictOfSeries]: diff --git a/saqc/funcs/resampling.py b/saqc/funcs/resampling.py index 86a581214..5dce6b009 100644 --- a/saqc/funcs/resampling.py +++ b/saqc/funcs/resampling.py @@ -457,7 +457,7 @@ class ResamplingMixin: if overwrite is False: mask = _isflagged(self._flags[target], thresh=kwargs["dfilter"]) - history.hist[mask] = np.nan + history._hist[mask] = np.nan if squeeze: history = history.squeeze(raw=True) diff --git a/tests/core/test_history.py b/tests/core/test_history.py index 3481b6ce1..0965032dc 100644 --- a/tests/core/test_history.py +++ b/tests/core/test_history.py @@ -7,6 +7,7 @@ import numpy as np import pandas as pd import pytest +from pandas.api.types import is_categorical_dtype, is_float_dtype from saqc.core.history import History, createHistoryFromData from tests.common import dummyHistory @@ -75,7 +76,10 @@ def check_invariants(hist): assert isinstance(hist.hist, pd.DataFrame) assert isinstance(hist.meta, list) assert all( - [isinstance(dtype, (float, pd.CategoricalDtype)) for dtype in hist.hist.dtypes] + [ + is_float_dtype(dtype) or is_categorical_dtype(dtype) + for dtype in hist.hist.dtypes + ] ) assert all([isinstance(e, dict) for e in hist.meta]) assert hist.columns is hist.hist.columns -- GitLab