From 86e6c765a1c8bd062e3a8ebd0bc68f53759936bc Mon Sep 17 00:00:00 2001 From: Bert Palm <bert.palm@ufz.de> Date: Tue, 23 Mar 2021 01:15:00 +0100 Subject: [PATCH] refactors and fixed test_modelling.py --- saqc/core/register.py | 3 +- saqc/flagger/flags.py | 30 +++++++- saqc/flagger/history.py | 11 ++- saqc/funcs/interpolation.py | 141 +++++++++++++++++----------------- saqc/funcs/resampling.py | 78 +++++++++++-------- saqc/lib/ts_operators.py | 20 +++-- tests/funcs/test_modelling.py | 41 ++++++---- 7 files changed, 191 insertions(+), 133 deletions(-) diff --git a/saqc/core/register.py b/saqc/core/register.py index ce88dc4bc..39c1d4b14 100644 --- a/saqc/core/register.py +++ b/saqc/core/register.py @@ -49,7 +49,8 @@ def register(masking: MaskingStrT = "all", module: Optional[str] = None): # executed if a register-decorated function is called, # nevertheless if it is called plain or via `SaQC.func`. @wraps(func) - def callWrapper(*args, **kwargs): + def callWrapper(data, field, flagger, *args, **kwargs): + args = data, field, flagger, *args args, kwargs, old_state = _preCall(func, args, kwargs, masking, func_name) result = func(*args, **kwargs) return _postCall(result, old_state) diff --git a/saqc/flagger/flags.py b/saqc/flagger/flags.py index c1dcb1ed6..28a0cef59 100644 --- a/saqc/flagger/flags.py +++ b/saqc/flagger/flags.py @@ -36,6 +36,12 @@ class _HistAccess: def __setitem__(self, key: str, value: Union[History, pd.DataFrame]): if not isinstance(value, History): value = History(value) + + if not isinstance(value, History): + raise TypeError("Not a History") + + History._validate_hist_with_mask(value.hist, value.mask) + self.obj._data[key] = value self.obj._cache.pop(key, None) @@ -339,7 +345,9 @@ def initFlagsLike( return Flags(result) -def applyFunctionOnHistory(flags: Flags, column, hist_func, hist_kws, mask_func, mask_kws, last_column=None): +def applyFunctionOnHistory( + flags: Flags, column, hist_func, hist_kws, mask_func, mask_kws, last_column=None, func_handle_df=False +): """ Apply function on history. @@ -355,6 +363,7 @@ def applyFunctionOnHistory(flags: Flags, column, hist_func, hist_kws, mask_func, mask_func : mask_kws : last_column : + func_handle_df : Returns ------- @@ -363,15 +372,28 @@ def applyFunctionOnHistory(flags: Flags, column, hist_func, hist_kws, mask_func, flags = flags.copy() history = flags.history[column] new_history = History() - for pos in history.columns: - new_history.hist[pos] = hist_func(history.hist[pos], **hist_kws) - new_history.mask[pos] = mask_func(history.mask[pos], **mask_kws) + if func_handle_df: + history.hist = hist_func(history.hist, **hist_kws) + history.mask = hist_func(history.mask, **mask_kws) + + else: + for pos in history.columns: + new_history.hist[pos] = hist_func(history.hist[pos], **hist_kws) + new_history.mask[pos] = mask_func(history.mask[pos], **mask_kws) + + # handle unstable state if last_column is None: new_history.mask.iloc[:, -1:] = True else: + if isinstance(last_column, str) and last_column == 'dummy': + last_column = pd.Series(UNTOUCHED, index=new_history.mask.index, dtype=float) + new_history.append(last_column, force=True) + # assure a boolean mask + new_history.mask = new_history.mask.fillna(False).astype(bool) + flags.history[column] = new_history return flags diff --git a/saqc/flagger/history.py b/saqc/flagger/history.py index 72a573bd1..2acc8f22e 100644 --- a/saqc/flagger/history.py +++ b/saqc/flagger/history.py @@ -333,13 +333,14 @@ class History: # validation # - def _validate_hist_with_mask(self, obj: pd.DataFrame, mask: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: + @staticmethod + def _validate_hist_with_mask(obj: pd.DataFrame, mask: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: """ check type, columns, index, dtype and if the mask fits the obj. """ # check hist - self._validate_hist(obj) + History._validate_hist(obj) # check mask if not isinstance(mask, pd.DataFrame): @@ -360,7 +361,8 @@ class History: return obj, mask - def _validate_hist(self, obj: pd.DataFrame) -> pd.DataFrame: + @staticmethod + def _validate_hist(obj: pd.DataFrame) -> pd.DataFrame: """ check type, columns, dtype of obj. """ @@ -379,7 +381,8 @@ class History: return obj - def _validate_value(self, obj: pd.Series) -> pd.Series: + @staticmethod + def _validate_value(obj: pd.Series) -> pd.Series: """ index is not checked ! """ diff --git a/saqc/funcs/interpolation.py b/saqc/funcs/interpolation.py index c0b9b8ee0..be93d5492 100644 --- a/saqc/funcs/interpolation.py +++ b/saqc/funcs/interpolation.py @@ -34,31 +34,35 @@ def interpolateByRolling( **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ - Interpolates missing values (nan values present in the data) by assigning them the aggregation result of - a window surrounding them. - - Note, that in the current implementation, center=True can only be used with integer window sizes - furthermore - note, that integer window sizes can yield screwed aggregation results for not-harmonized or irregular data. + Interpolates nan-values in the data by assigning them the aggregation result of the window surrounding them. Parameters ---------- data : dios.DictOfSeries - A dictionary of pandas.Series, holding all the data. + The data container. + field : str - The fieldname of the column, holding the data-to-be-interpolated. + Name of the column, holding the data-to-be-interpolated. + flagger : saqc.flagger.Flagger - A flagger object, holding flags and additional Informations related to `data`. + A flagger object, holding flags and additional Information related to `data`. + winsz : int, str - The size of the window, the aggregation is computed from. Either counted in periods number (Integer passed), - or defined by a total temporal extension (offset String passed). + The size of the window, the aggregation is computed from. An integer define the number of periods to be used, + an string is interpreted as an offset. ( see `pandas.rolling` for more information). + Integer windows may result in screwed aggregations if called on none-harmonized or irregular data. + func : Callable The function used for aggregation. + center : bool, default True - Wheather or not the window, the aggregation is computed of, is centered around the value to be interpolated. + Center the window around the value. Can only be used with integer windows, otherwise it is silently ignored. + min_periods : int Minimum number of valid (not np.nan) values that have to be available in a window for its aggregation to be computed. - flag : float, default UNFLAGGED + + flag : float or None, default UNFLAGGED Flag that is to be inserted for the interpolated values. If ``None`` no flags are set. Returns @@ -83,7 +87,7 @@ def interpolateByRolling( rolled = roller.apply(func) na_mask = datcol.isna() - interpolated = na_mask & ~rolled.isna() + interpolated = na_mask & rolled.notna() datcol[na_mask] = rolled[na_mask] data[field] = datcol @@ -102,7 +106,6 @@ def interpolateInvalid( inter_order: int = 2, inter_limit: int = 2, downgrade_interpolation: bool = False, - not_interpol_flags=None, flag: float = UNFLAGGED, **kwargs ) -> Tuple[DictOfSeries, Flagger]: @@ -112,32 +115,36 @@ def interpolateInvalid( There are available all the interpolation methods from the pandas.interpolate method and they are applicable by the very same key words, that you would pass to the ``pd.Series.interpolate``'s method parameter. - Note, that the `inter_limit` keyword really restricts the interpolation to chunks, not containing more than - `inter_limit` successive nan entries. - Parameters ---------- data : dios.DictOfSeries - A dictionary of pandas.Series, holding all the data. + The data container. + field : str - The fieldname of the column, holding the data-to-be-interpolated. + Name of the column, holding the data-to-be-interpolated. + flagger : saqc.flagger.Flagger - A flagger object, holding flags and additional Informations related to `data`. + A flagger object, holding flags and additional Information related to `data`. + method : {"linear", "time", "nearest", "zero", "slinear", "quadratic", "cubic", "spline", "barycentric", - "polynomial", "krogh", "piecewise_polynomial", "spline", "pchip", "akima"}: string - The interpolation method you want to apply. + "polynomial", "krogh", "piecewise_polynomial", "spline", "pchip", "akima"} + The interpolation method to use. + inter_order : int, default 2 If there your selected interpolation method can be performed at different 'orders' - here you pass the desired order. + inter_limit : int, default 2 - Maximum number of consecutive 'nan' values allowed for a gap to be interpolated. + Maximum number of consecutive 'nan' values allowed for a gap to be interpolated. This really restricts the + interpolation to chunks, containing not more than `inter_limit` successive nan entries. + flag : float or None, default UNFLAGGED - Flag that is to be inserted for the interpolated values. If ``None`` no flags are set. + Flag that is set for interpolated values. If ``None``, no flags are set at all. + downgrade_interpolation : bool, default False - If interpolation can not be performed at `inter_order`, because not enough values are present or the order - is not implemented for the passed method, automatically try to interpolate at ``inter_order-1``. - not_interpol_flags : None - deprecated + If `True` and the interpolation can not be performed at current order, retry with a lower order. + This can happen, because the chosen ``method`` does not support the passed ``inter_order``, or + simply because not enough values are present in a interval. Returns ------- @@ -148,8 +155,6 @@ def interpolateInvalid( The flagger object, holding flags and additional Informations related to `data`. Flags values may have changed relatively to the flagger input. """ - - data = data.copy() inter_data = interpolateNANs( data[field], method, @@ -159,10 +164,6 @@ def interpolateInvalid( ) interpolated = data[field].isna() & inter_data.notna() - # TODO: remove with version 2.0 - if not_interpol_flags is not None: - raise ValueError("'not_interpol_flags' is deprecated") - if flag is not None: flagger[interpolated, field] = flag @@ -170,17 +171,14 @@ def interpolateInvalid( return data, flagger -def _overlap_rs(x, freq='1min', fill_value=UNFLAGGED): - end = x.index[-1].ceil(freq) - x = x.resample(freq).max() - x = x.combine(x.shift(1, fill_value=fill_value), max) - # we are appending last regular grid entry (if necessary), to conserve integrity of groups of regularized - # timestamps originating all from the same logger. - try: - x = x.append(pd.Series([fill_value], index=[end]), verify_integrity=True) - except ValueError: - pass - return x +def _resampleOverlapping(data: pd.Series, freq: str, fill_value): + dtype = data.dtype + end = data.index[-1].ceil(freq) + data = data.resample(freq).max() + data = data.combine(data.shift(1, fill_value=fill_value), max) + if end not in data: + data.loc[end] = fill_value + return data.fillna(fill_value).astype(dtype) @register(masking='none', module="interpolation") @@ -191,8 +189,8 @@ def interpolateIndex( freq: str, method: _SUPPORTED_METHODS, inter_order: int = 2, - downgrade_interpolation: bool = False, inter_limit: int = 2, + downgrade_interpolation: bool = False, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -201,40 +199,38 @@ def interpolateIndex( Note, that the interpolation will only be calculated, for grid timestamps that have a preceding AND a succeeding valid data value within "freq" range. - Note, that the function differs from proc_interpolateMissing, by returning a whole new data set, only containing - samples at the interpolated, equidistant timestamps (of frequency "freq"). - - Note, it is possible to interpolate unregular "grids" (with no frequencies). In fact, any date index - can be target of the interpolation. Just pass the field name of the variable, holding the index - you want to interpolate, to "grid_field". 'freq' is then use to determine the maximum gap size for - a grid point to be interpolated. - - Note, that intervals, not having an interpolation value assigned (thus, evaluate to np.nan), get UNFLAGGED assigned. - Parameters ---------- data : dios.DictOfSeries - A dictionary of pandas.Series, holding all the data. + The data container. + field : str - The fieldname of the column, holding the data-to-be-interpolated. + Name of the column, holding the data-to-be-interpolated. + flagger : saqc.flagger.Flagger - A flagger object, holding flags and additional Informations related to `data`. + A flagger object, holding flags and additional Information related to `data`. + freq : str An Offset String, interpreted as the frequency of the grid you want to interpolate your data at. + method : {"linear", "time", "nearest", "zero", "slinear", "quadratic", "cubic", "spline", "barycentric", "polynomial", "krogh", "piecewise_polynomial", "spline", "pchip", "akima"}: string The interpolation method you want to apply. - inter_order : integer, default 2 + + inter_order : int, default 2 If there your selected interpolation method can be performed at different 'orders' - here you pass the desired order. + + inter_limit : int, default 2 + Maximum number of consecutive 'nan' values allowed for a gap to be interpolated. This really restricts the + interpolation to chunks, containing not more than `inter_limit` successive nan entries. + downgrade_interpolation : bool, default False - If interpolation can not be performed at `inter_order` - (not enough values or not implemented at this order) - - automatically try to interpolate at order `inter_order` :math:`- 1`. - inter_limit : Integer, default 2 - Maximum number of consecutive Grid values allowed for interpolation. If set - to *n*, chunks of *n* and more consecutive grid values, where there is no value in between, wont be - interpolated. + If `True` and the interpolation can not be performed at current order, retry with a lower order. + This can happen, because the chosen ``method`` does not support the passed ``inter_order``, or + simply because not enough values are present in a interval. + Returns ------- @@ -254,7 +250,7 @@ def interpolateIndex( start, end = datcol.index[0].floor(freq), datcol.index[-1].ceil(freq) grid_index = pd.date_range(start=start, end=end, freq=freq, name=datcol.index.name) - flagged = isflagged(flagscol, kwargs['to_mask']) + flagged = isflagged(flagger[field], kwargs['to_mask']) # drop all points that hold no relevant grid information datcol = datcol[~flagged].dropna() @@ -286,12 +282,15 @@ def interpolateIndex( # flags reshaping flagscol = flagscol[~flagged] - flagscol = _overlap_rs(flagscol, freq, UNFLAGGED) + flagscol = _resampleOverlapping(flagscol, freq, UNFLAGGED) + dummy = pd.Series(UNTOUCHED, index=data[field].index, dtype=float) + + # do the reshaping on the history flagger = applyFunctionOnHistory( flagger, field, - hist_func=_overlap_rs, hist_kws=dict(freq=freq, fill_value=UNFLAGGED), - mask_func=_overlap_rs, mask_kws=dict(freq=freq, fill_value=False), - last_column=flagscol + hist_func=_resampleOverlapping, hist_kws=dict(freq=freq, fill_value=UNFLAGGED), + mask_func=_resampleOverlapping, mask_kws=dict(freq=freq, fill_value=False), + last_column='dummy' ) return data, flagger diff --git a/saqc/funcs/resampling.py b/saqc/funcs/resampling.py index 40e2bd50f..482dd75c3 100644 --- a/saqc/funcs/resampling.py +++ b/saqc/funcs/resampling.py @@ -568,33 +568,43 @@ def resample( return data, flagger -def _getChunkBounds(target_datcol, flagscol, freq): - chunk_end = target_datcol.reindex(flagscol.index, method='bfill', tolerance=freq) - chunk_start = target_datcol.reindex(flagscol.index, method='ffill', tolerance=freq) +def _getChunkBounds(target: pd.Series, flagscol: pd.Series, freq: str): + chunk_end = target.reindex(flagscol.index, method='bfill', tolerance=freq) + chunk_start = target.reindex(flagscol.index, method='ffill', tolerance=freq) ignore_flags = (chunk_end.isna() | chunk_start.isna()) return ignore_flags -def _inverseInterpolation(source_col, freq=None, chunk_bounds=None, target_flagscol=None): - source_col = source_col.copy() +def _inverseInterpolation(source: pd.Series, target: pd.Series, freq: str, chunk_bounds) -> pd.Series: + """ + Do a inverse interpolation. + """ + source = source.copy() if len(chunk_bounds) > 0: - source_col[chunk_bounds] = np.nan - backprojected = source_col.reindex(target_flagscol.index, method="bfill", tolerance=freq) - fwrdprojected = source_col.reindex(target_flagscol.index, method="ffill", tolerance=freq) + source[chunk_bounds] = np.nan + backprojected = source.reindex(target.index, method="bfill", tolerance=freq) + fwrdprojected = source.reindex(target.index, method="ffill", tolerance=freq) return pd.concat([backprojected, fwrdprojected], axis=1).max(axis=1) -def _inverseAggregation(source_col, freq=None, method=None, target_flagscol=None): - return source_col.reindex(target_flagscol.index, method=method, tolerance=freq) +def _inverseAggregation( + source: Union[pd.Series, pd.DataFrame], + target: Union[pd.Series, pd.DataFrame], + freq: str, + method: str, +): + return source.reindex(target.index, method=method, tolerance=freq) +def _inverseShift(source: pd.Series, target: pd.Series, drop_mask: pd.Series, + freq: str, method: str, fill_value) -> pd.Series: + dtype = source.dtype -def _inverseShift(source_col, freq=None, method=None, drop_mask=None, target_flagscol=None): - target_flagscol_drops = target_flagscol[drop_mask] - target_flagscol = target_flagscol.drop(drop_mask[drop_mask].index) + target_drops = target[drop_mask] + target = target[~drop_mask] flags_merged = pd.merge_asof( - source_col, - pd.Series(target_flagscol.index.values, index=target_flagscol.index, name="pre_index"), + source, + target.index.to_series(name='pre_index'), left_index=True, right_index=True, tolerance=freq, @@ -602,13 +612,13 @@ def _inverseShift(source_col, freq=None, method=None, drop_mask=None, target_fla ) flags_merged.dropna(subset=["pre_index"], inplace=True) flags_merged = flags_merged.set_index(["pre_index"]).squeeze() - target_flagscol[flags_merged.index] = flags_merged.values + target[flags_merged.index] = flags_merged.values # reinsert drops - source_col = target_flagscol.reindex(target_flagscol.index.join(target_flagscol_drops.index, how="outer")) - source_col.loc[target_flagscol_drops.index] = target_flagscol_drops.values + source = target.reindex(target.index.union(target_drops.index)) + source.loc[target_drops.index] = target_drops.values - return source_col + return source.fillna(fill_value).astype(dtype, copy=False) @register(masking='none', module="resampling") @@ -689,27 +699,33 @@ def reindexFlags( target_datcol = data[field] target_flagscol = flagger[field] - blank_dummy = pd.Series(np.nan, target_flagscol.index) + dummy = pd.Series(np.nan, target_flagscol.index) + if method[-13:] == "interpolation": ignore = _getChunkBounds(target_datcol, flagscol, freq) - merge_func = _inverseInterpolation - merge_dict = dict(freq=freq, chunk_bounds=ignore, target_flagscol=blank_dummy) - mask_dict = {**merge_dict, 'chunk_bounds':[]} + func = _inverseInterpolation + func_kws = dict(freq=freq, chunk_bounds=ignore, target=dummy) + mask_kws = {**func_kws, 'chunk_bounds': []} - if method[-3:] == "agg" or method == "match": + elif method[-3:] == "agg" or method == "match": projection_method = METHOD2ARGS[method][0] tolerance = METHOD2ARGS[method][1](freq) - merge_func = _inverseAggregation - merge_dict = mask_dict = dict(freq=tolerance, method=projection_method, target_flagscol=blank_dummy) + func = _inverseAggregation + func_kws = dict(freq=tolerance, method=projection_method, target=dummy) + mask_kws = func_kws - if method[-5:] == "shift": + elif method[-5:] == "shift": drop_mask = (target_datcol.isna() | isflagged(target_flagscol, kwargs['to_mask'])) projection_method = METHOD2ARGS[method][0] tolerance = METHOD2ARGS[method][1](freq) - merge_func = _inverseShift - merge_dict = mask_dict = dict(freq=tolerance, method=projection_method, drop_mask=drop_mask, target_flagscol=blank_dummy) + func = _inverseShift + kws = dict(freq=tolerance, method=projection_method, drop_mask=drop_mask, target=dummy) + func_kws = {**kws, 'fill_value': UNTOUCHED} + mask_kws = {**kws, 'fill_value': False} + + else: + raise ValueError(f"unknown method {method}") - tmp_flagger = applyFunctionOnHistory(flagger, source, merge_func, merge_dict, merge_func, mask_dict, - last_column=blank_dummy) + tmp_flagger = applyFunctionOnHistory(flagger, source, func, func_kws, func, mask_kws, last_column=dummy) flagger = appendHistory(flagger, field, tmp_flagger.history[source]) return data, flagger diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index de9de79d2..37d8253ab 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -7,6 +7,7 @@ The module gathers all kinds of timeseries tranformations. import logging import re +from typing import Union import pandas as pd import numpy as np @@ -252,12 +253,13 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio def aggregate2Freq( - data, method, freq, agg_func, fill_value=np.nan, max_invalid_total=None, max_invalid_consec=None + data: pd.Series, method, freq, agg_func, fill_value=np.nan, max_invalid_total=None, max_invalid_consec=None ): - # The function aggregates values to an equidistant frequency grid with agg_func. - # Timestamps that have no values projected on them, get "fill_value" assigned. Also, - # "fill_value" serves as replacement for "invalid" intervals - + """ + The function aggregates values to an equidistant frequency grid with agg_func. + Timestamps that gets no values projected, get filled with the fill-value. It + also serves as a replacement for "invalid" intervals. + """ methods = { "nagg": lambda seconds_total: (seconds_total/2, "left", "left"), "bagg": lambda _: (0, "left", "left"), @@ -309,9 +311,11 @@ def aggregate2Freq( return data -def shift2Freq(data, method, freq, fill_value=np.nan): - # shift timestamps backwards/forwards in order to allign them with an equidistant - # frequencie grid. +def shift2Freq(data: Union[pd.Series, pd.DataFrame], method: str, freq: str, fill_value): + """ + shift timestamps backwards/forwards in order to align them with an equidistant + frequency grid. Resulting Nan's are replaced with the fill-value. + """ methods = { "fshift": lambda freq: ("ffill", pd.Timedelta(freq)), diff --git a/tests/funcs/test_modelling.py b/tests/funcs/test_modelling.py index 23cc82ab2..248c12246 100644 --- a/tests/funcs/test_modelling.py +++ b/tests/funcs/test_modelling.py @@ -6,6 +6,7 @@ import dios +from saqc import BAD, UNFLAGGED from saqc.flagger import initFlagsLike from saqc.funcs.tools import mask from saqc.funcs.residues import calculatePolynomialResidues, calculateRollingResidues @@ -46,23 +47,35 @@ def test_modelling_mask(dat): data, _ = dat() data = dios.DictOfSeries(data) flagger = initFlagsLike(data) - data_seasonal, flagger_seasonal = mask(data, "data", flagger, mode='periodic', period_start="20:00", - period_end="40:00", include_bounds=False) - flaggs = flagger_seasonal["data"] - assert flaggs[np.logical_and(20 <= flaggs.index.minute, 40 >= flaggs.index.minute)].isna().all() - data_seasonal, flagger_seasonal = mask(data, "data", flagger, mode='periodic', period_start="15:00:00", - period_end="02:00:00") - flaggs = flagger_seasonal["data"] - assert flaggs[np.logical_and(15 <= flaggs.index.hour, 2 >= flaggs.index.hour)].isna().all() - data_seasonal, flagger_seasonal = mask(data, "data", flagger, mode='periodic', period_start="03T00:00:00", - period_end="10T00:00:00") - flaggs = flagger_seasonal["data"] - assert flaggs[np.logical_and(3 <= flaggs.index.hour, 10 >= flaggs.index.hour)].isna().all() + field = "data" + + # set flags everywhere to test unflagging + flagger[:, field] = BAD + + common = dict(data=data, field=field, flagger=flagger, mode='periodic') + data_seasonal, flagger_seasonal = mask(**common, period_start="20:00", period_end="40:00", include_bounds=False) + flags = flagger_seasonal[field] + m = (20 <= flags.index.minute) & (flags.index.minute <= 40) + assert all(flagger_seasonal[field][m] == UNFLAGGED) + assert all(data_seasonal[field][m].isna()) + + data_seasonal, flagger_seasonal = mask(**common, period_start="15:00:00", period_end="02:00:00") + flags = flagger_seasonal[field] + m = (15 <= flags.index.hour) & (flags.index.hour <= 2) + assert all(flagger_seasonal[field][m] == UNFLAGGED) + assert all(data_seasonal[field][m].isna()) + + data_seasonal, flagger_seasonal = mask(**common, period_start="03T00:00:00", period_end="10T00:00:00") + flags = flagger_seasonal[field] + m = (3 <= flags.index.hour) & (flags.index.hour <= 10) + assert all(flagger_seasonal[field][m] == UNFLAGGED) + assert all(data_seasonal[field][m].isna()) mask_ser = pd.Series(False, index=data["data"].index) mask_ser[::5] = True data["mask_ser"] = mask_ser flagger = initFlagsLike(data) data_masked, flagger_masked = mask(data, "data", flagger, mode='mask_var', mask_var="mask_ser") - flaggs = flagger_masked["data"] - assert flaggs[data_masked['mask_ser']].isna().all() + m = mask_ser + assert all(flagger_masked[field][m] == UNFLAGGED) + assert all(data_masked[field][m].isna()) -- GitLab