diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 944d88f8c99c9c02a27ff244b198a9c7d0213472..db9bfa28c5688356eb3891571286919b3705148b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -20,7 +20,7 @@ test:python37: - pyenv shell 3.7.5 - pip install --upgrade pip - pip install -r requirements.txt - - python -m pytest test + - python -m pytest --ignore test/lib test - python -m saqc --config ressources/data/config_ci.csv --data ressources/data/data.csv --outfile /tmp/test.csv @@ -30,7 +30,7 @@ test:python38: - pyenv shell 3.8.0 - pip install --upgrade pip - pip install -r requirements.txt - - python -m pytest test + - python -m pytest --ignore test/lib test - python -m saqc --config ressources/data/config_ci.csv --data ressources/data/data.csv --outfile /tmp/test.csv # Make html docu with sphinx diff --git a/saqc/flagger/baseflagger.py b/saqc/flagger/baseflagger.py index 51448d577b63c0e3c267d670999da52048798d4c..edb275bbac2fdcfeb9faf199b741ee2b560f940a 100644 --- a/saqc/flagger/baseflagger.py +++ b/saqc/flagger/baseflagger.py @@ -11,7 +11,7 @@ import pandas as pd import numpy as np import dios -from saqc.lib.tools import assertScalar, mergeDios, toSequence, mutateIndex, customRolling +from saqc.lib.tools import assertScalar, mergeDios, toSequence, customRoller COMPARATOR_MAP = { "!=": op.ne, @@ -260,13 +260,19 @@ class BaseFlagger(ABC): base = mask.reindex_like(self._flags[field]).fillna(False) before, after = False, False + if flag_before is not None: + closed = 'both' + if isinstance(flag_before, int): + flag_before, closed = flag_before + 1, None + r = customRoller(base, window=flag_before, min_periods=1, closed=closed, expand=True, forward=True) + before = r.sum().astype(bool) + if flag_after is not None: + closed = 'both' if isinstance(flag_after, int): - flag_after += 1 - after = base.rolling(window=flag_after, min_periods=1, closed='both').sum().astype(bool) - - if flag_before is not None: - raise NotImplementedError("flag_before is not implemented") + flag_after, closed = flag_after + 1, None + r = customRoller(base, window=flag_after, min_periods=1, closed=closed, expand=True) + after = r.sum().astype(bool) # does not include base, to avoid overriding flags that just was set # by the test, because flag and win_flag may differ. diff --git a/saqc/funcs/constants_detection.py b/saqc/funcs/constants_detection.py index 92a027cc1feee102a8b919a7735f48c79aa8624a..c8382ab5c0ace7e5c04acf4cb7258eaff8fec232 100644 --- a/saqc/funcs/constants_detection.py +++ b/saqc/funcs/constants_detection.py @@ -6,7 +6,7 @@ import pandas as pd from saqc.core.register import register from saqc.lib.ts_operators import varQC -from saqc.lib.tools import retrieveTrustworthyOriginal +from saqc.lib.tools import retrieveTrustworthyOriginal, customRoller @register(masking='field') @@ -45,19 +45,20 @@ def constants_flagBasic(data, field, flagger, thresh, window, **kwargs): """ d = data[field] - - # find all constant values in a row with a forward search - r = d.rolling(window=window) - mask = (r.max() - r.min() <= thresh) & (r.count() > 1) - - # backward rolling for offset windows hack - bw = mask[::-1].copy() - bw.index = bw.index.max() - bw.index - - # propagate the mask(!), backwards - bwmask = bw.rolling(window=window).sum() > 0 - - mask |= bwmask[::-1].values + if not isinstance(window, str): + raise TypeError('window must be offset string.') + + # min_periods=2 ensures that at least two non-nan values are present + # in each window and also min() == max() == d[i] is not possible. + kws = dict(window=window, min_periods=2) + + # find all consecutive constant values in one direction... + r = customRoller(d, **kws) + m1 = r.max() - r.min() <= thresh + # and in the other + r = customRoller(d, forward=True, **kws) + m2 = r.max() - r.min() <= thresh + mask = m1 | m2 flagger = flagger.setFlags(field, mask, **kwargs) return data, flagger diff --git a/saqc/funcs/modelling.py b/saqc/funcs/modelling.py index c37bf5fef4a344a0066d672a0f6cb720bf22e219..704a8b52b2fc04b1bc4fa9783e999916a832714d 100644 --- a/saqc/funcs/modelling.py +++ b/saqc/funcs/modelling.py @@ -13,7 +13,7 @@ from saqc.lib.ts_operators import ( polyRollerIrregular, count ) -from saqc.lib.tools import seasonalMask, customRolling, FreqIndexer +from saqc.lib.tools import seasonalMask, customRoller import logging logger = logging.getLogger("SaQC") @@ -512,18 +512,13 @@ def modelling_changePointCluster(data, field, flagger, stat_func, thresh_func, b if reduce_window is None: reduce_window = f"{int(pd.Timedelta(bwd_window).total_seconds() + pd.Timedelta(fwd_window).total_seconds())}s" - indexer = FreqIndexer() - indexer.index_array = data_ser.index.to_numpy(int) - indexer.win_points = None - indexer.window_size = int(pd.Timedelta(bwd_window).total_seconds() * 10 ** 9) - indexer.forward = False - indexer.center = False - bwd_start, bwd_end = indexer.get_window_bounds(var_len, min_periods_bwd, center, closed) + # native pandas.rolling also fails + data_ser.rolling(window=bwd_window, min_periods=min_periods_bwd, closed=closed) + roller = customRoller(data_ser, window=bwd_window, min_periods=min_periods_bwd, closed=closed) + bwd_start, bwd_end = roller.window.get_window_bounds() - indexer.window_size = int(pd.Timedelta(fwd_window).total_seconds() * 10 ** 9) - indexer.forward = True - fwd_start, fwd_end = indexer.get_window_bounds(var_len, min_periods_fwd, center, closed) - fwd_start, fwd_end = np.roll(fwd_start, -1), np.roll(fwd_end, -1) + roller = customRoller(data_ser, window=fwd_window, min_periods=min_periods_fwd, closed=closed, forward=True) + fwd_start, fwd_end = roller.window.get_window_bounds() min_mask = ~((fwd_end - fwd_start <= min_periods_fwd) | (bwd_end - bwd_start <= min_periods_bwd)) fwd_end = fwd_end[min_mask] @@ -564,10 +559,11 @@ def modelling_changePointCluster(data, field, flagger, stat_func, thresh_func, b det_index = masked_index[result_arr] detected = pd.Series(True, index=det_index) if reduce_window is not False: - start, end = customRolling(detected, reduce_window, count, closed='both', min_periods=1, center=True, - index_only=True) - detected = _reduceCPCluster(stat_arr[result_arr], thresh_arr[result_arr], start, end, reduce_func, - detected.shape[0]) + l = detected.shape[0] + roller = customRoller(detected, window=reduce_window, min_periods=1, closed='both', center=True) + start, end = roller.window.get_window_bounds(num_values=l, min_periods=1, closed='both', center=True) + + detected = _reduceCPCluster(stat_arr[result_arr], thresh_arr[result_arr], start, end, reduce_func, l) det_index = det_index[detected] cluster = pd.Series(False, index=data[field].index) diff --git a/saqc/funcs/spikes_detection.py b/saqc/funcs/spikes_detection.py index b9175033d1017c36a8aad7dc5d134571f793563f..bbbe96b97a0b400b75ed061dcd7c883d373f21f0 100644 --- a/saqc/funcs/spikes_detection.py +++ b/saqc/funcs/spikes_detection.py @@ -17,14 +17,14 @@ from saqc.lib.tools import ( slidingWindowIndices, findIndex, toSequence, - customRolling + customRoller ) from outliers import smirnov_grubbs def _stray( val_frame, partition_freq=None, - partition_min=0, + partition_min=11, scoring_method="kNNMaxGap", n_neighbors=10, iter_start=0.5, @@ -59,7 +59,8 @@ def _stray( of `partition_freq` periods. if ``None`` is passed (default), all the data will be tested in one run. partition_min : int, default 0 Minimum number of periods per partition that have to be present for a valid outlier dettection to be made in - this partition. (Only of effect, if `partition_freq` is an integer.) + this partition. (Only of effect, if `partition_freq` is an integer.) Partition min value must always be + greater then the nn_neighbors value. scoring_method : {'kNNSum', 'kNNMaxGap'}, default 'kNNMaxGap' Scoring method applied. `'kNNSum'`: Assign to every point the sum of the distances to its 'n_neighbors' nearest neighbors. @@ -954,10 +955,10 @@ def spikes_flagBasic(data, field, flagger, thresh, tolerance, window, numba_kick to_roll = dataseries[to_roll] roll_mask = pd.Series(False, index=to_roll.index) roll_mask[post_jumps.index] = True - engine=None - if roll_mask.sum() > numba_kickin: - engine = 'numba' - result = customRolling(to_roll, window, spikeTester, roll_mask, closed='both', engine=engine, min_periods=2) + + roller = customRoller(to_roll, window=window, mask=roll_mask, min_periods=2, closed='both') + engine = None if roll_mask.sum() < numba_kickin else 'numba' + result = roller.apply(spikeTester, raw=True, engine=engine) # correct the result: only those values define plateaus, that do not have # values at their left starting point, that belong to other plateaus themself: diff --git a/saqc/lib/rolling.py b/saqc/lib/rolling.py new file mode 100644 index 0000000000000000000000000000000000000000..ba529fa5359f1e9707d12510939b524d0f6d4e65 --- /dev/null +++ b/saqc/lib/rolling.py @@ -0,0 +1,383 @@ +#!/usr/bin/env python + +__author__ = "Bert Palm" +__email__ = "bert.palm@ufz.de" +__copyright__ = "Copyright 2020, Helmholtz-Zentrum für Umweltforschung GmbH - UFZ" + +# We need to implement the +# - calculation/skipping of min_periods, +# because `calculate_center_offset` does ignore those and we cannot rely on rolling(min_periods), as +# pointed out in customRoller. Also we need to implement +# - centering of windows for fixed windows, +# for variable windows this is not allowed (similar to pandas). +# The close-param, for variable windows is already implemented in `calculate_center_offset`, +# and we dont allow it for fixed windows (similar to pandas). We also want to +# - fix the strange ramp-up behavior, +# which occur if the window is shifted in the data but yet is not fully inside the data. In this +# case we want to spit out nan's instead of results calculated by less than window-size many values. +# This is slightly different than the min_periods parameter, because this mainly should control Nan-behavior +# for fixed windows, and minimum needed observations (also excluding Nans) in a offset window, but should not apply +# if window-size many values couldn't be even possible due to technical reasons. This is mainly because one +# cannot know (except one knows the exact (and fixed) frequency) the number(!) of observations that can occur in a +# given offset window. That's why rolling should spit out Nan's as long as the window is not fully shifted in the data. + +import numpy as np +from typing import Union +from pandas.api.types import is_integer, is_bool +from pandas.api.indexers import BaseIndexer +from pandas.core.dtypes.generic import ABCSeries, ABCDataFrame +from pandas.core.window.indexers import calculate_variable_window_bounds +from pandas.core.window.rolling import Rolling, Window, calculate_center_offset + + +def is_slice(k): return isinstance(k, slice) + + +class _CustomBaseIndexer(BaseIndexer): + is_datetimelike = None + + def __init__(self, index_array, window_size, min_periods=None, center=False, closed=None, forward=False, + expand=False, step=None, mask=None): + super().__init__() + self.index_array = index_array + self.num_values = len(index_array) + self.window_size = window_size + self.min_periods = min_periods + self.center = center + self.closed = closed + self.forward = forward + self.expand = expand + self.step = step + self.skip = mask + self.validate() + + def validate(self) -> None: + if self.center is not None and not is_bool(self.center): + raise ValueError("center must be a boolean") + if not is_bool(self.forward): + raise ValueError("forward must be a boolean") + if not is_bool(self.expand): + raise ValueError("expand must be a boolean") + + if is_integer(self.step) or self.step is None: + self.step = slice(None, None, self.step or None) + if not is_slice(self.step): + raise TypeError('step must be integer or slice.') + if self.step == slice(None): + self.step = None + + if self.skip is not None: + if len(self.index_array) != len(self.skip): + raise ValueError('mask must have same length as data to roll over.') + self.skip = np.array(self.skip) + if self.skip.dtype != bool: + raise TypeError('mask must have boolean values only.') + self.skip = ~self.skip + + def get_window_bounds(self, num_values=0, min_periods=None, center=False, closed=None): + if min_periods is not None: + self.min_periods = max(self.min_periods, min_periods) + num_values = self.num_values + min_periods = self.min_periods + center = self.center + closed = self.closed + + start, end = self._get_bounds(num_values, min_periods, center, closed) + start, end = self._apply_skipmask(start, end) + start, end = self._apply_steps(start, end, num_values) + start, end = self._prepare_min_periods_masking(start, end, num_values) + return start, end + + def _prepare_min_periods_masking(self, start, end, num_values): + # correction for min_periods calculation + end[end > num_values] = num_values + + # this is the same as .rolling will do, so leave the work to them ;) + # additional they are able to count the nans in each window, we couldn't. + # end[end - start < self.min_periods] = 0 + return start, end + + def _get_center_window_sizes(self, winsz): + ws1 = ws2 = winsz + if self.center: + # centering of dtlike windows is just looking left and right + # with half amount of window-size + ws1 = (winsz + 1) // 2 + ws2 = winsz // 2 + if self.forward: + ws1, ws2 = ws2, ws1 + return ws1, ws2 + + def _apply_skipmask(self, start, end): + if self.skip is not None: + end[self.skip] = 0 + return start, end + + def _apply_steps(self, start, end, num_values): + if self.step is not None: + m = np.full(num_values, 1) + m[self.step] = 0 + m = m.astype(bool) + end[m] = 0 + return start, end + + def _get_bounds(self, num_values=0, min_periods=None, center=False, closed=None): + raise NotImplementedError + + +class _FixedWindowDirectionIndexer(_CustomBaseIndexer): + # automatically added in super call to init + index_array: np.array + window_size: int + # set here + is_datetimelike = False + + def validate(self) -> None: + super().validate() + # if self.closed is not None: + # raise ValueError("closed only implemented for datetimelike and offset based windows") + + def _get_bounds(self, num_values=0, min_periods=None, center=False, closed=None): + offset = calculate_center_offset(self.window_size) if center else 0 + num_values += offset + + if self.forward: + start, end = self._fw(num_values, min_periods, center, closed, offset) + else: + start, end = self._bw(num_values, min_periods, center, closed, offset) + + if center: + start, end = self._center_result(start, end, offset) + num_values -= offset + + if not self.expand: + start, end = self._remove_ramps(start, end, center) + + return start, end + + def _center_result(self, start, end, offset): + if offset > 0: + if self.forward: + start = start[:-offset] + end = end[:-offset] + else: + start = start[offset:] + end = end[offset:] + return start, end + + def _remove_ramps(self, start, end, center): + fw, bw = self.forward, not self.forward + ramp_l, ramp_r = self._get_center_window_sizes(self.window_size - 1) + if center: + fw = bw = True + + if bw and ramp_l > 0: + end[:ramp_l] = 0 + if fw and ramp_r > 0: + end[-ramp_r:] = 0 + + return start, end + + def _bw(self, num_values=0, min_periods=None, center=False, closed=None, offset=0): + # code taken from pd.core.windows.indexer.FixedWindowIndexer + start_s = np.zeros(self.window_size, dtype="int64") + start_e = (np.arange(self.window_size, num_values, dtype="int64") - self.window_size + 1) + start = np.concatenate([start_s, start_e])[:num_values] + + end_s = np.arange(self.window_size, dtype="int64") + 1 + end_e = start_e + self.window_size + end = np.concatenate([end_s, end_e])[:num_values] + # end stolen code + return start, end + + def _fw(self, num_values=0, min_periods=None, center=False, closed=None, offset=0): + start = np.arange(-offset, num_values, dtype="int64")[:num_values] + end = start + self.window_size + start[:offset] = 0 + return start, end + + +class _VariableWindowDirectionIndexer(_CustomBaseIndexer): + # automatically added in super call to init + index_array: np.array + window_size: int + # set here + is_datetimelike = True + + def validate(self) -> None: + super().validate() + if self.min_periods is None: + self.min_periods = 1 + + def _get_bounds(self, num_values=0, min_periods=None, center=False, closed=None): + ws_bw, ws_fw = self._get_center_window_sizes(self.window_size) + if center: + c1 = c2 = closed + if closed == 'neither': + c1, c2 = 'right', 'left' + + start, _ = self._bw(num_values, ws_bw, c1) + _, end = self._fw(num_values, ws_fw, c2) + + elif self.forward: + start, end = self._fw(num_values, ws_fw, closed) + else: + start, end = self._bw(num_values, ws_bw, closed) + + if not self.expand: + start, end = self._remove_ramps(start, end, center) + + return start, end + + def _remove_ramps(self, start, end, center): + ws_bw, ws_fw = self._get_center_window_sizes(self.window_size) + + if center or not self.forward: + # remove (up) ramp + # we dont want this: [1,1,1,1,1].rolling(window='2min').sum() -> [1, 2, 3, 3, 3] + # instead we want: [1,1,1,1,1].rolling(window='2min').sum() -> [nan, nan, 3, 3, 3] + tresh = self.index_array[0] + ws_bw + mask = self.index_array < tresh + end[mask] = 0 + + if center or self.forward: + # remove (down) ramp + # we dont want this: [1,1,1,1,1].rolling(window='2min', forward=True).sum() -> [3, 3, 3, 2, 1 ] + # instead we want: [1,1,1,1,1].rolling(window='2min', forward=True).sum() -> [3, 3, 3, nan, nan] + tresh = self.index_array[-1] - ws_fw + mask = self.index_array > tresh + end[mask] = 0 + + return start, end + + def _bw(self, num_values, window_size, closed=None): + arr = self.index_array + start, end = calculate_variable_window_bounds(num_values, window_size, None, None, closed, arr) + return start, end + + def _fw(self, num_values, window_size, closed=None): + arr = self.index_array[::-1] + s, _ = calculate_variable_window_bounds(num_values, window_size, None, None, closed, arr) + start = np.arange(num_values) + end = num_values - s[::-1] + + if closed in ['left', 'neither']: + start += 1 + return start, end + + +def customRoller(obj, window, min_periods=None, # aka minimum non-nan values + center=False, win_type=None, on=None, axis=0, closed=None, + forward=False, expand=True, step=None, mask=None) -> Union[Rolling, Window]: + """ + A custom rolling implementation, using pandas as base. + + Parameters + ---------- + obj : pd.Series (or pd.DataFrame) + The object to roll over. DataFrame is currently still experimental. + + window : int or offset + Size of the moving window. This is the number of observations used for calculating the statistic. + Each window will be a fixed size. + If its an offset then this will be the time period of each window. Each window will be a variable sized + based on the observations included in the time-period. This is only valid for datetimelike indexes. + + min_periods : int, default None + Minimum number of observations in window required to have a value (otherwise result is NA). + For a window that is specified by an offset, min_periods will default to 1. Otherwise, min_periods + will default to the size of the window. + + center : bool, default False + Set the labels at the center of the window. Also works for offset-based windows (in contrary to pandas). + + win_type : str, default None + Not implemented. Raise NotImplementedError if not None. + + on : str, optional + For a DataFrame, a datetime-like column or MultiIndex level on which to calculate the rolling window, + rather than the DataFrame’s index. Provided integer column is ignored and excluded from result since + an integer index is not used to calculate the rolling window. + + axis : int or str, default 0 + + closed : str, default None + Make the interval closed on the ‘right’, ‘left’, ‘both’ or ‘neither’ endpoints. For offset-based windows, + it defaults to ‘right’. For fixed windows, defaults to ‘both’. Remaining cases not implemented for fixed + windows. + + forward : bool, default False + By default a window is 'looking' backwards (in time). If True the window is looking forward in time. + + expand : bool, default True + If True the window expands/shrink up to its final window size while shifted in the data or shifted out + respectively. + For (normal) backward-windows it only expands at the left border, for forward-windows it shrinks on + the right border and for centered windows both apply. + + Also bear in mind that even if this is True, an many as `min_periods` values are necessary to get a + valid value, see there for more info. + + + step : int, slice or None, default None + If given, only every n'th step a window is calculated starting from the very first. One can + give a slice if one want to start from eg. the second (`slice(2,None,n)`) or similar. + + mask : boolean array-like + Only calculate the window if the mask is True, otherwise skip it. + + Returns + ------- + a Window or Rolling sub-classed for the particular operation + + + Notes + ----- + If for some reason the start and end numeric indices of the window are needed, one can call + `start, end = customRoller(obj, ...).window.get_window_bounds()`, which return two arrays, + holding the start and end indices. Any passed (allowed) parameter to `get_window_bounds()` is + ignored and the arguments that was passed to `customRoller()` beforehand will be used instead. + + See Also + -------- + pandas.Series.rolling + pandas.DataFrame.rolling + """ + num_params = len(locals()) - 2 # do not count window and obj + if not isinstance(obj, (ABCSeries, ABCDataFrame)): + raise TypeError(f"invalid type: {type(obj)}") + + theirs = dict(min_periods=min_periods, center=center, win_type=win_type, on=on, axis=axis, closed=closed) + ours = dict(forward=forward, expand=expand, step=step, mask=mask) + assert len(theirs) + len(ours) == num_params, "not all params covert (!)" + + # center is the only param from the pandas rolling implementation + # that we advance, namely we allow center=True on dt-indexed data + ours.update(center=theirs.pop('center')) + + # use .rolling to do all the checks like if closed is one of [left, right, neither, both], + # closed not allowed for integer windows, index is monotonic (in- or decreasing), if freq-based + # windows can be transformed to nanoseconds (eg. fails for `1y` - it could have 364 or 365 days), etc. + # Also it converts window and the index to numpy-arrays (so we don't have to do it :D). + try: + x = obj.rolling(window, center=False, **theirs) + except Exception: + raise + + if theirs.pop('win_type') is not None: + raise NotImplementedError("customRoller() does not implemented win_type.") + num_params -= 1 + + ours.update(min_periods=theirs.pop('min_periods'), closed=theirs.pop('closed')) + assert len(theirs) + len(ours) == num_params, "not all params covert (!)" + + indexer = _VariableWindowDirectionIndexer if x.is_freq_type else _FixedWindowDirectionIndexer + indexer = indexer(index_array=x._on.asi8, window_size=x.window, **ours) + + # center offset is calculated from min_periods if a indexer is passed to rolling(). + # if instead a normal window is passed, it is used for offset calculation. + # also if we pass min_periods == None or 0, all values will Nan in the result even if + # start[i]<end[i] as expected. So we cannot pass `center` to rolling. Instead we manually do the centering + # in the Indexer. To calculate min_periods (!) including NaN count (!) we need to pass min_periods, but + # ensure that it is not None nor 0. + return obj.rolling(indexer, min_periods=indexer.min_periods, **theirs) diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index 69fa340df69acdcc0336e6832e40e8f6bacaff97..3cbe5ab766a7bdb0f58324307d36a9981d7f98a6 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -13,14 +13,12 @@ import logging import dios import collections -from pandas.api.indexers import BaseIndexer -from pandas._libs.window.indexers import calculate_variable_window_bounds from scipy.cluster.hierarchy import linkage, fcluster -from pandas.api.indexers import BaseIndexer -from pandas._libs.window.indexers import calculate_variable_window_bounds -from pandas.core.window.indexers import calculate_variable_window_bounds from saqc.lib.types import T +# keep this for external imports +from saqc.lib.rolling import customRoller + logger = logging.getLogger("SaQC") @@ -178,36 +176,6 @@ def offset2seconds(offset): return pd.Timedelta.total_seconds(pd.Timedelta(offset)) -def flagWindow(flagger_old, flagger_new, field, direction="fw", window=0, **kwargs) -> pd.Series: - # NOTE: unused -> remove? - if window == 0 or window == "": - return flagger_new - - fw, bw = False, False - mask = flagger_old.getFlags(field) != flagger_new.getFlags(field) - f = flagger_new.isFlagged(field) & mask - - if not mask.any(): - # nothing was flagged, so nothing need to be flagged additional - return flagger_new - - if isinstance(window, int): - x = f.rolling(window=window + 1).sum() - if direction in ["fw", "both"]: - fw = x.fillna(method="bfill").astype(bool) - if direction in ["bw", "both"]: - bw = x.shift(-window).fillna(method="bfill").astype(bool) - else: - # time-based windows - if direction in ["bw", "both"]: - # todo: implement time-based backward rolling - raise NotImplementedError - fw = f.rolling(window=window, closed="both").sum().astype(bool) - - fmask = bw | fw - return flagger_new.setFlags(field, fmask, **kwargs) - - def seasonalMask(dtindex, season_start, season_end, include_bounds): """ This function generates date-periodic/seasonal masks from an index passed. @@ -527,142 +495,6 @@ def evalFreqStr(freq, check, index): return f_passed -class FreqIndexer(BaseIndexer): - # indexer class capable of generating indices for frequency determined windows, - # arbitrary window skips and forward facing windows (meant to be used by pd.rolling) - def __init__(self, *args, win_points=None, **kwargs): - self.win_points = win_points - super().__init__(*args, **kwargs) - - def get_window_bounds(self, num_values, min_periods, center, closed): - i_dir = 1 - if self.forward: - i_dir = -1 - - start, end = calculate_variable_window_bounds(num_values, self.window_size, min_periods, center, closed, - self.index_array[::i_dir]) - if self.forward: - start, end = (num_values - end)[::-1], (num_values - start)[::-1] - if self.center: - end = (num_values - start)[::-1] - end = np.roll(end, -1) - end[-1] = num_values - 1 - if self.win_points is not None: - end[~self.win_points] = 0 - start[~self.win_points] = 0 - - return start, end - - -class PeriodsIndexer(BaseIndexer): - # indexer class capable of generating periods-number determined windows and - # arbitrary window skips (meant to be used by pd.rolling) - def __init__(self, *args, win_points=None, **kwargs): - self.win_points = win_points - super().__init__(*args, **kwargs) - - def get_window_bounds(self, num_values, min_periods, center, closed): - start_s = np.zeros(self.window_size, dtype="int64") - start_e = np.arange(self.window_size, num_values, dtype="int64") - self.window_size + 1 - start = np.concatenate([start_s, start_e])[:num_values] - - end_s = np.arange(self.window_size, dtype="int64") + 1 - end_e = start_e + self.window_size - end = np.concatenate([end_s, end_e])[:num_values] - if self.win_points is not None: - start[~self.win_points] = 0 - end[~self.win_points] = 0 - return start, end - - -def customRolling(to_roll, winsz, func, roll_mask=None, min_periods=1, center=False, closed=None, raw=True, engine=None, - forward=False, index_only=False): - """ - A wrapper around pandas.rolling.apply(), that allows for skipping func application on - arbitrary selections of windows. - - Parameters - ---------- - to_roll : pandas.Series - Timeseries to be "rolled over". - winsz : {int, str} - Gets passed on to the window-size parameter of pandas.Rolling. - func : Callable - Function to be rolled with. If the funcname matches a .rolling attribute, - the associated method of .rolling will be used instead of .apply(func) (=faster) - roll_mask : {None, numpy.array[bool]}, default None - A mask, indicating the rolling windows, `func` shall be applied on. - Has to be of same length as `to_roll`. - roll_mask[i] = False indicates, that the window with right end point to_roll.index[i] shall - be skipped. - Pass None(default) if you want no values to be masked. - min_periods : int, default 1 - Gets passed on to the min_periods parameter of pandas.Rolling. - (Note, that rolling with freq string defined window size and `min_periods`=None, - results in nothing being computed due to some inconsistencies in the interplay of pandas.rolling and its - indexer.) - center : bool, default False - Gets passed on to the center parameter of pandas.Rolling. - closed : {None, 'left', 'right', 'both'}, default None - Gets passed on to the closed parameter of pandas.Rolling. - raw : bool, default True - Gets passed on to the raw parameter of pandas.Rolling.apply. - engine : {None, 'cython', 'numba'}, default None - Gets passed on to the engine parameter of pandas.Rolling.apply. None defaults to 'cython'. - forward : bool, default False - If true, roll with forward facing windows. (not yet implemented for - integer defined windows.) - center : bool, default False - If true, set the label to the center of the rolling window. Also available - for frequencie defined rolling windows! (yeah!) - index_only : bool, default False - Only return rolling window indices. - - Returns - ------- - result : pandas.Series - The result of the rolling application. - - """ - i_roll = to_roll.copy() - i_roll.index = pd.RangeIndex(len(i_roll)) - engine = engine or 'cython' - - if isinstance(winsz, str): - winsz = np.int64(pd.Timedelta(winsz).total_seconds()*10**9) - indexer = FreqIndexer(window_size=winsz, - win_points=roll_mask, - index_array=to_roll.index.to_numpy(np.int64), - center=center, - closed=closed, - forward=forward) - - elif isinstance(winsz, int): - indexer = PeriodsIndexer(window_size=winsz, - win_points=roll_mask, - center=center, - closed=closed) - - if index_only: - num_values = to_roll.shape[0] - if num_values == 0: - return np.array([]), np.array([]) - else: - return indexer.get_window_bounds(num_values, min_periods, center, closed) - - i_roller = i_roll.rolling(indexer, - min_periods=min_periods, - center=center, - closed=closed) - - if hasattr(i_roller, func.__name__): - i_roll = getattr(i_roller, func.__name__)() - else: - i_roll = i_roller.apply(func, raw=raw, engine=engine) - - return pd.Series(i_roll.values, index=to_roll.index) - - def detectDeviants(data, metric, norm_spread, norm_frac, linkage_method='single', population='variables'): """ Helper function for carrying out the repeatedly upcoming task, diff --git a/test/flagger/test_flagger.py b/test/flagger/test_flagger.py index 1a431610f2bb6f8598064201fa2b921a2b43c637..2bbe9bec16d493779a0e90b699117d50e941e9be 100644 --- a/test/flagger/test_flagger.py +++ b/test/flagger/test_flagger.py @@ -717,3 +717,37 @@ def test_flagAfter(flagger): exp[3] = flagger.BAD assert (flags == exp).all() + +@pytest.mark.parametrize("flagger", TESTFLAGGER) +def test_flagBefore(flagger): + idx = pd.date_range("2000", "2001", freq='1M') + s = pd.Series(0, index=idx) + data = dios.DictOfSeries(s, columns=['a']) + exp_base = pd.Series(flagger.UNFLAGGED, index=idx) + + flagger = flagger.initFlags(data) + field, *_ = data.columns + + flags = flagger.setFlags(field, loc=s.index[8], flag_before=5).getFlags(field) + exp = exp_base.copy() + exp.iloc[8-5: 8+1] = flagger.BAD + assert (flags == exp).all() + + flags = flagger.setFlags(field, loc=s.index[8], flag_before=5, win_flag=flagger.GOOD).getFlags(field) + exp = exp_base.copy() + exp.iloc[8-5: 8+1] = flagger.GOOD + exp[8] = flagger.BAD + assert (flags == exp).all() + + # 3 month < 99 days < 4 month + flags = flagger.setFlags(field, loc=s.index[8], flag_before="99d").getFlags(field) + exp = exp_base.copy() + exp.iloc[8-3: 8+1] = flagger.BAD + assert (flags == exp).all() + + # 3 month < 99 days < 4 month + flags = flagger.setFlags(field, loc=s.index[8], flag_before="99d", win_flag=flagger.GOOD).getFlags(field) + exp = exp_base.copy() + exp.iloc[8-3: 8+1] = flagger.GOOD + exp[8] = flagger.BAD + assert (flags == exp).all() diff --git a/test/funcs/test_constants_detection.py b/test/funcs/test_constants_detection.py index 74e066de8c2c2f1d5d7eb1d4859491d1945d985b..52e2f6d9e50fab7d0ecda01adea82d60ff3614ea 100644 --- a/test/funcs/test_constants_detection.py +++ b/test/funcs/test_constants_detection.py @@ -12,14 +12,12 @@ from test.common import TESTFLAGGER, initData @pytest.fixture def data(): constants_data = initData(1, start_date="2011-01-01 00:00:00", end_date="2011-01-01 03:00:00", freq="5min") - constants_data.iloc[5:25] = 0 + constants_data.iloc[5:25] = 200 return constants_data @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_constants_flagBasic(data, flagger): - idx = np.array([5, 6, 7, 8, 9, 10, 18, 19, 20, 21]) - data.iloc[idx] = 200 expected = np.arange(5, 22) field, *_ = data.columns flagger = flagger.initFlags(data) @@ -30,7 +28,6 @@ def test_constants_flagBasic(data, flagger): @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_constants_flagVarianceBased(data, flagger): - data.iloc[5:25] = 200 expected = np.arange(5, 25) field, *_ = data.columns flagger = flagger.initFlags(data) diff --git a/test/funcs/test_proc_functions.py b/test/funcs/test_proc_functions.py index dbb9754ae961681ffbe862d22ba0e658125a46cc..457c56f06b0da92dbe372a71ed9b570aa351dbd1 100644 --- a/test/funcs/test_proc_functions.py +++ b/test/funcs/test_proc_functions.py @@ -99,11 +99,11 @@ def test_interpolateGrid(course_5, course_3, flagger): @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_offsetCorrecture(flagger): - data = pd.Series(0, index=pd.date_range('2000', freq='1Y', periods=100), name='dat') + data = pd.Series(0, index=pd.date_range('2000', freq='1d', periods=100), name='dat') data.iloc[30:40] = -100 data.iloc[70:80] = 100 data = dios.DictOfSeries(data) flagger = flagger.initFlags(data) - data, flagger = proc_offsetCorrecture(data, 'dat', flagger, 40, 20, '3Y', 1) + data, flagger = proc_offsetCorrecture(data, 'dat', flagger, 40, 20, '3d', 1) assert (data == 0).all()[0] diff --git a/test/lib/test_rolling.py b/test/lib/test_rolling.py new file mode 100644 index 0000000000000000000000000000000000000000..03da38f7c0d08144627b151dd86a8cf4c4938b2f --- /dev/null +++ b/test/lib/test_rolling.py @@ -0,0 +1,141 @@ +import pytest + +from saqc.lib.rolling import customRoller +import pandas as pd +import numpy as np + + +@pytest.fixture +def data(): + return data_() + + +def data_(): + s1 = pd.Series(1., index=pd.date_range("1999/12", periods=12, freq='1M') + pd.Timedelta('1d')) + s2 = pd.Series(1., index=pd.date_range('2000/05/15', periods=8, freq='1d')) + s = pd.concat([s1, s2]).sort_index() + s.name = 's' + s[15] = np.nan + return s + + +len_s = len(data_()) + + +def make_num_kws(): + l = [] + for window in range(len_s + 2): + for min_periods in [None] + list(range(window + 1)): + for center in [False, True]: + for closed in [None] + ['left', 'right', 'both', 'neither']: + l.append(dict(window=window, min_periods=min_periods, center=center, closed=closed)) + return l + + +def make_dt_kws(): + l = [] + for closed in [None] + ['right', 'both', 'neither', 'left']: + for window in range(1, len_s + 3): + for min_periods in [None] + list(range(window + 1)): + for win in [f'{window}d', f'{window * 31}d']: + l.append(dict(window=win, min_periods=min_periods, closed=closed)) + return l + + +def check_series(result, expected): + if not (result.isna() == expected.isna()).all(): + return False + result = result.dropna() + expected = expected.dropna() + if not (result == expected).all(): + return False + return True + + +def print_diff(s, result, expected): + df = pd.DataFrame() + df['s'] = s + df['exp'] = expected + df['res'] = result + print(df) + + +def runtest_for_kw_combi(s, kws): + print(kws) + forward = kws.pop('forward', False) + if forward: + result = customRoller(s, forward=True, **kws).sum() + expected = pd.Series(reversed(s), reversed(s.index)).rolling(**kws).sum()[::-1] + + success = check_series(result, expected) + if not success: + print_diff(s, result, expected) + assert False, f"forward=True !! {kws}" + else: + result = customRoller(s, **kws).sum() + expected = s.rolling(**kws).sum() + + success = check_series(result, expected) + if not success: + print_diff(s, result, expected) + assert False + + +@pytest.mark.parametrize("kws", make_num_kws()) +def test_pandas_conform_num(data, kws): + runtest_for_kw_combi(data, kws) + + +@pytest.mark.parametrize("kws", make_dt_kws()) +def test_pandas_conform_dt(data, kws): + runtest_for_kw_combi(data, kws) + + +@pytest.mark.parametrize("kws", make_num_kws()) +def test_forward_num(data, kws): + kws.update(forward=True) + runtest_for_kw_combi(data, kws) + + +@pytest.mark.parametrize("kws", make_dt_kws()) +def test_forward_dt(data, kws): + kws.update(forward=True) + runtest_for_kw_combi(data, kws) + + +def dt_center_kws(): + l = [] + for window in range(2, 10, 2): + for min_periods in range(1, window + 1): + l.append(dict(window=window, min_periods=min_periods)) + return l + + +@pytest.mark.parametrize("kws", dt_center_kws()) +def test_centering_w_dtindex(kws): + print(kws) + s = pd.Series(0., index=pd.date_range("2000", periods=10, freq='1H')) + s[4:7] = 1 + + w = kws.pop('window') + mp = kws.pop('min_periods') + + pd_kw = dict(window=w, center=True, min_periods=mp) + our_kw = dict(window=f'{w}h', center=True, closed='both', min_periods=mp) + expected = s.rolling(**pd_kw).sum() + result = customRoller(s, **our_kw).sum() + success = check_series(result, expected) + if not success: + print_diff(s, result, expected) + assert False + + w -= 1 + mp -= 1 + pd_kw = dict(window=w, center=True, min_periods=mp) + our_kw = dict(window=f'{w}h', center=True, closed='neither', min_periods=mp) + expected = s.rolling(**pd_kw).sum() + result = customRoller(s, **our_kw).sum() + success = check_series(result, expected) + if not success: + print_diff(s, result, expected) + assert False