diff --git a/requirements.txt b/requirements.txt index 40f1779b3bf2ae4c20cc7063e8a790705fbc10f2..c7c4c5537a16bc4807289041d2658b96b3136c60 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ +typing_extensions==3.7.4.3 +hypothesis==5.43.4 attrs==20.3.0 Click==7.1.2 cycler==0.10.0 @@ -5,18 +7,18 @@ dios==0.6.0 dtw==1.4.0 kiwisolver==1.3.1 llvmlite==0.35.0 -importlib-metadata==3.4.0 -joblib==1.0.0 +importlib-metadata==3.7.0 +joblib==1.0.1 matplotlib==3.3.4 mlxtend==0.18.0 -more-itertools==8.6.0 +more-itertools==8.7.0 numba==0.52.0 -numpy==1.20.0 +numpy==1.20.1 outlier==0.2 utils==1.0.1 outlier-utils==0.0.3 packaging==20.9 -pandas==1.2.1 +pandas==1.2.2 pluggy==0.13.1 pyparsing==2.4.7 py==1.10.0 @@ -25,11 +27,11 @@ pytest-lazy-fixture==0.6.3 pytest==6.2.2 python-dateutil==2.8.1 python-intervals==1.10.0.post1 -pytz==2020.5 +pytz==2021.1 PyWavelets==1.1.1 zipp==3.4.0 wcwidth==0.2.5 -scipy==1.6.0 +scipy==1.6.1 scikit-learn==0.24.1 six==1.15.0 typing_extensions==3.7.4.3 diff --git a/saqc/core/lib.py b/saqc/core/lib.py index 7ced1f9ce7f46e28695446f78f9c0ae23bff6773..7ee39f76805694d978599956623a03970bb86cb9 100644 --- a/saqc/core/lib.py +++ b/saqc/core/lib.py @@ -40,6 +40,9 @@ class SaQCFunction: self.args = args self.keywords = keywords + def __repr__(self): + return f"{self.__class__.__name__}.{self.func.__name__}" + def bind(self, *args, **keywords): return SaQCFunction( self.name, self.func, diff --git a/saqc/funcs/breaks.py b/saqc/funcs/breaks.py index f1d9dc4e950c3ddaa30790f18600321009eb8284..7f21609be012d904c184e272a86063dedc842b73 100644 --- a/saqc/funcs/breaks.py +++ b/saqc/funcs/breaks.py @@ -1,26 +1,35 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- + """Detecting breakish changes in timeseries value courses. This module provides functions to detect and flag breakish changes in the data value course, like gaps (:py:func:`flagMissing`), jumps/drops (:py:func:`flagJumps`) or isolated values (:py:func:`flagIsolated`). """ -from dios import DictOfSeries +from typing import Tuple + import numpy as np import pandas as pd -from typing import Tuple +from dios import DictOfSeries from saqc.lib.tools import groupConsecutives +from saqc.lib.types import FreqString, ColumnName, IntegerWindow from saqc.funcs.changepoints import assignChangePointCluster from saqc.core.register import register from saqc.flagger import Flagger @register(masking='field', module="breaks") -def flagMissing(data: DictOfSeries, field: str, flagger: Flagger, nodata: float=np.nan, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagMissing( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + nodata: float=np.nan, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ The function flags all values indicating missing data. @@ -55,7 +64,14 @@ def flagMissing(data: DictOfSeries, field: str, flagger: Flagger, nodata: float= @register(masking='field', module="breaks") -def flagIsolated(data: DictOfSeries, field: str, flagger: Flagger, gap_window: str, group_window: str, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagIsolated( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + gap_window: FreqString, + group_window: FreqString, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ The function flags arbitrary large groups of values, if they are surrounded by sufficiently large data gaps. @@ -123,8 +139,15 @@ def flagIsolated(data: DictOfSeries, field: str, flagger: Flagger, gap_window: s @register(masking='field', module="breaks") -def flagJumps(data: DictOfSeries, field: str, flagger: Flagger, thresh: float, winsz: str, min_periods: int=1, - **kwargs): +def flagJumps( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + thresh: float, + winsz: FreqString, + min_periods: IntegerWindow=1, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ Flag datapoints, where the mean of the values significantly changes (where the value course "jumps"). @@ -134,7 +157,7 @@ def flagJumps(data: DictOfSeries, field: str, flagger: Flagger, thresh: float, w A dictionary of pandas.Series, holding all the data. field : str The reference variable, the deviation from wich determines the flagging. - flagger : saqc.flagger + flagger : saqc.flagger.Flagger A flagger object, holding flags and additional informations related to `data`. thresh : float The threshold, the mean of the values have to change by, to trigger flagging. @@ -146,14 +169,16 @@ def flagJumps(data: DictOfSeries, field: str, flagger: Flagger, thresh: float, w the mean value obtained from that window is regarded valid. """ - data, flagger = assignChangePointCluster(data, field, flagger, - stat_func=lambda x, y: np.abs(np.mean(x) - np.mean(y)), - thresh_func=lambda x, y: thresh, - bwd_window=winsz, - min_periods_bwd=min_periods, - flag_changepoints=True, - model_by_resids=False, - assign_cluster=False, - **kwargs) + data, flagger = assignChangePointCluster( + data, field, flagger, + stat_func=lambda x, y: np.abs(np.mean(x) - np.mean(y)), + thresh_func=lambda x, y: thresh, + bwd_window=winsz, + min_periods_bwd=min_periods, + flag_changepoints=True, + model_by_resids=False, + assign_cluster=False, + **kwargs + ) return data, flagger diff --git a/saqc/funcs/changepoints.py b/saqc/funcs/changepoints.py index dcd41a66f1a4e6a5009b9f7736b842479271bfa4..b6e3f8aa435197e57e8b9f95ab6978d8b1126d90 100644 --- a/saqc/funcs/changepoints.py +++ b/saqc/funcs/changepoints.py @@ -6,7 +6,7 @@ import logging import pandas as pd import numpy as np import numba -from typing import Callable, Union, Tuple +from typing import Callable, Union, Tuple, Optional from typing_extensions import Literal from dios import DictOfSeries @@ -14,23 +14,26 @@ from dios import DictOfSeries from saqc.core.register import register from saqc.lib.tools import customRoller from saqc.flagger import Flagger +from saqc.lib.types import ColumnName, FreqString, IntegerWindow logger = logging.getLogger("SaQC") @register(masking='field', module="changepoints") -def flagChangePoints(data: DictOfSeries, field: str, flagger: Flagger, - stat_func: Callable[[np.array], np.array], - thresh_func: Callable[[np.array], np.array], - bwd_window: str, - min_periods_bwd: Union[str, int], - fwd_window: str=None, - min_periods_fwd: Union[str, int]=None, - closed: Literal["right", "left", "both", "neither"]="both", - try_to_jit: bool=True, - reduce_window: str=None, - reduce_func: Callable[[np.array, np.array], np.array]=lambda x, y: x.argmax(), - **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagChangePoints( + data: DictOfSeries, field: str, flagger: Flagger, + stat_func: Callable[[np.ndarray, np.ndarray], float], + thresh_func: Callable[[np.ndarray, np.ndarray], float], + bwd_window: FreqString, + min_periods_bwd: IntegerWindow, + fwd_window: Optional[FreqString]=None, + min_periods_fwd: Optional[IntegerWindow]=None, + closed: Literal["right", "left", "both", "neither"]="both", + try_to_jit: bool=True, + reduce_window: FreqString=None, + reduce_func: Callable[[np.ndarray, np.ndarray], int]=lambda x, _: x.argmax(), + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ Flag datapoints, where the parametrization of the process, the data is assumed to generate by, significantly changes. @@ -70,7 +73,7 @@ def flagChangePoints(data: DictOfSeries, field: str, flagger: Flagger, will be selected the value with index `reduce_func(x, y)` and the others will be dropped. If `reduce_window` is None, the reduction window size equals the twin window size, the changepoints have been detected with. - reduce_func : Callable[[numpy.array, numpy.array], np.array], default lambda x, y: x.argmax() + reduce_func : Callable[[numpy.ndarray, numpy.ndarray], int], default lambda x, y: x.argmax() A function that must return an index value upon input of two arrays x and y. First input parameter will hold the result from the stat_func evaluation for every reduction window. Second input parameter holds the result from the thresh_func evaluation. @@ -82,31 +85,35 @@ def flagChangePoints(data: DictOfSeries, field: str, flagger: Flagger, """ - data, flagger = assignChangePointCluster(data, field, flagger, stat_func=stat_func, thresh_func=thresh_func, - bwd_window=bwd_window, min_periods_bwd=min_periods_bwd, - fwd_window=fwd_window, min_periods_fwd=min_periods_fwd, closed=closed, - try_to_jit=try_to_jit, reduce_window=reduce_window, - reduce_func=reduce_func, flag_changepoints=True, model_by_resids=False, - assign_cluster=False, **kwargs) + data, flagger = assignChangePointCluster( + data, field, flagger, stat_func=stat_func, thresh_func=thresh_func, + bwd_window=bwd_window, min_periods_bwd=min_periods_bwd, + fwd_window=fwd_window, min_periods_fwd=min_periods_fwd, closed=closed, + try_to_jit=try_to_jit, reduce_window=reduce_window, + reduce_func=reduce_func, flag_changepoints=True, model_by_resids=False, + assign_cluster=False, **kwargs + ) return data, flagger @register(masking='field', module="changepoints") -def assignChangePointCluster(data: DictOfSeries, field: str, flagger: Flagger, - stat_func: Callable[[np.array, np.array], float], - thresh_func: Callable[[np.array, np.array], float], - bwd_window: str, - min_periods_bwd: Union[str, int], - fwd_window: str=None, - min_periods_fwd: Union[str, int]=None, - closed: Literal["right", "left", "both", "neither"]="both", - try_to_jit: bool=True, - reduce_window: str=None, - reduce_func: Callable[[np.array, np.array], np.array]=lambda x, y: x.argmax(), - model_by_resids: bool=False, - flag_changepoints: bool=False, - assign_cluster: bool=True, - **kwargs) -> Tuple[DictOfSeries, Flagger]: +def assignChangePointCluster( + data: DictOfSeries, field: str, flagger: Flagger, + stat_func: Callable[[np.array, np.array], float], + thresh_func: Callable[[np.array, np.array], float], + bwd_window: str, + min_periods_bwd: int, + fwd_window: str=None, + min_periods_fwd: Optional[int]=None, + closed: Literal["right", "left", "both", "neither"]="both", + try_to_jit: bool=True, + reduce_window: str=None, + reduce_func: Callable[[np.ndarray, np.ndarray], float]=lambda x, _: x.argmax(), + model_by_resids: bool=False, + flag_changepoints: bool=False, + assign_cluster: bool=True, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ Assigns label to the data, aiming to reflect continous regimes of the processes the data is assumed to be @@ -132,12 +139,12 @@ def assignChangePointCluster(data: DictOfSeries, field: str, flagger: Flagger, changepoint. bwd_window : str The left (backwards facing) windows temporal extension (freq-string). - min_periods_bwd : {str, int} + min_periods_bwd : int Minimum number of periods that have to be present in a backwards facing window, for a changepoint test to be performed. fwd_window : {None, str}, default None The right (forward facing) windows temporal extension (freq-string). - min_periods_fwd : {None, str, int}, default None + min_periods_fwd : {None, int}, default None Minimum number of periods that have to be present in a forward facing window, for a changepoint test to be performed. closed : {'right', 'left', 'both', 'neither'}, default 'both' @@ -197,16 +204,14 @@ def assignChangePointCluster(data: DictOfSeries, field: str, flagger: Flagger, stat_func = jit_sf thresh_func = jit_tf try_to_jit = True - except (numba.core.errors.TypingError, IndexError): + except (numba.core.errors.TypingError, numba.core.errors.UnsupportedError, IndexError): try_to_jit = False logging.warning('Could not jit passed statistic - omitting jitting!') if try_to_jit: - stat_arr, thresh_arr = _slidingWindowSearchNumba(data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, - check_len) + stat_arr, thresh_arr = _slidingWindowSearchNumba(data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, check_len) else: - stat_arr, thresh_arr = _slidingWindowSearch(data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, - check_len) + stat_arr, thresh_arr = _slidingWindowSearch(data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, check_len) result_arr = stat_arr > thresh_arr if model_by_resids: @@ -272,4 +277,5 @@ def _reduceCPCluster(stat_arr, thresh_arr, start, end, obj_func, num_val): pos = s + obj_func(x, y) + 1 out_arr[s:e] = False out_arr[pos] = True + return out_arr diff --git a/saqc/funcs/constants.py b/saqc/funcs/constants.py index 9244d8c827a765609af82974b3faa700490d1072..9e2864ed7bc1a5e1b78474fdc289d6de1a59c62f 100644 --- a/saqc/funcs/constants.py +++ b/saqc/funcs/constants.py @@ -13,10 +13,18 @@ from saqc.core.register import register from saqc.flagger import Flagger from saqc.lib.ts_operators import varQC from saqc.lib.tools import customRoller, getFreqDelta +from saqc.lib.types import FreqString, ColumnName @register(masking='field', module="constants") -def flagConstants(data: DictOfSeries, field: str, flagger: Flagger, thresh: float, window: str, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagConstants( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + thresh: float, + window: FreqString, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ This functions flags plateaus/series of constant values of length `window` if their maximum total change is smaller than thresh. @@ -72,9 +80,14 @@ def flagConstants(data: DictOfSeries, field: str, flagger: Flagger, thresh: floa @register(masking='field', module="constants") def flagByVariance( - data: DictOfSeries, field: str, flagger: Flagger, - window: str="12h", thresh: float=0.0005, - max_missing: int=None, max_consec_missing: int=None, **kwargs + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + window: FreqString="12h", + thresh: float=0.0005, + max_missing: int=None, + max_consec_missing: int=None, + **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ diff --git a/saqc/funcs/drift.py b/saqc/funcs/drift.py index 9c7c26b58b140d3ec77a2f6b77894353fafc9801..fd7d0dcb913f1886027f7899030683e5e69a243d 100644 --- a/saqc/funcs/drift.py +++ b/saqc/funcs/drift.py @@ -1,7 +1,7 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- import functools -from typing import Optional, Tuple, Sequence, Callable, Any, Optional +from typing import Optional, Tuple, Sequence, Callable, Optional from typing_extensions import Literal import numpy as np @@ -18,18 +18,26 @@ from saqc.funcs.resampling import shift from saqc.funcs.changepoints import assignChangePointCluster from saqc.funcs.tools import drop, copy from saqc.lib.tools import detectDeviants +from saqc.lib.types import FreqString, ColumnName, CurveFitter, TimestampColumnName from saqc.lib.ts_operators import expModelFunc +LinkageString = Literal["single", "complete", "average", "weighted", "centroid", "median", "ward"] + + @register(masking='all', module="drift") -def flagDriftFromNorm(data: DictOfSeries, field: str, flagger: Flagger, - fields: Sequence[str], - segment_freq: str, - norm_spread: float, - norm_frac: float=0.5, - metric: Callable[[np.array, np.array], float]=lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), - linkage_method: Literal["single", "complete", "average", "weighted", "centroid", "median", "ward"]="single", - **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagDriftFromNorm( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + fields: Sequence[ColumnName], + segment_freq: FreqString, + norm_spread: float, + norm_frac: float=0.5, + metric: Callable[[np.ndarray, np.ndarray], float]=lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), + linkage_method: LinkageString="single", + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ The function flags value courses that significantly deviate from a group of normal value courses. @@ -131,12 +139,16 @@ def flagDriftFromNorm(data: DictOfSeries, field: str, flagger: Flagger, @register(masking='all', module="drift") -def flagDriftFromReference(data: DictOfSeries, field: str, flagger: Flagger, - fields: Sequence[str], - segment_freq: str, - thresh: float, - metric: Callable[[np.array, np.array], float]=lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), - **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagDriftFromReference( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + fields: Sequence[ColumnName], + segment_freq: FreqString, + thresh: float, + metric: Callable[[np.ndarray, np.ndarray], float]=lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ The function flags value courses that deviate from a reference course by a margin exceeding a certain threshold. @@ -200,15 +212,19 @@ def flagDriftFromReference(data: DictOfSeries, field: str, flagger: Flagger, @register(masking='all', module="drift") -def flagDriftFromScaledNorm(data: DictOfSeries, field: str, flagger: Flagger, - fields_scale1: Sequence[str], - fields_scale2: Sequence[str], - segment_freq: str, - norm_spread: float, - norm_frac: float=0.5, - metric: Callable[[np.array, np.array], float]=lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), - linkage_method: Literal["single", "complete", "average", "weighted", "centroid", "median", "ward"]="single", - **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagDriftFromScaledNorm( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + fields_scale1: Sequence[ColumnName], + fields_scale2: Sequence[ColumnName], + segment_freq: FreqString, + norm_spread: float, + norm_frac: float=0.5, + metric: Callable[[np.ndarray, np.ndarray], float]=lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), + linkage_method: LinkageString="single", + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ @@ -312,9 +328,15 @@ def flagDriftFromScaledNorm(data: DictOfSeries, field: str, flagger: Flagger, @register(masking='all', module="drift") -def correctExponentialDrift(data: DictOfSeries, field: str, flagger: Flagger, - maint_data_field: str, cal_mean: int=5, flag_maint_period: bool=False, - **kwargs) -> Tuple[DictOfSeries, Flagger]: +def correctExponentialDrift( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + maint_data_field: ColumnName, + cal_mean: int=5, + flag_maint_period: bool=False, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ The function fits an exponential model to chunks of data[field]. It is assumed, that between maintenance events, there is a drift effect shifting the meassurements in a way, that @@ -412,11 +434,15 @@ def correctExponentialDrift(data: DictOfSeries, field: str, flagger: Flagger, @register(masking='all', module="drift") -def correctRegimeAnomaly(data: DictOfSeries, field: str, flagger: Flagger, - cluster_field: str, - model: Callable[[np.array, Any], np.array], - regime_transmission: Optional[str]=None, - x_date: bool=False) -> Tuple[DictOfSeries, Flagger]: +def correctRegimeAnomaly( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + cluster_field: ColumnName, + model: CurveFitter, + regime_transmission: Optional[FreqString]=None, + x_date: bool=False +) -> Tuple[DictOfSeries, Flagger]: """ Function fits the passed model to the different regimes in data[field] and tries to correct those values, that have assigned a negative label by data[cluster_field]. @@ -521,14 +547,17 @@ def correctRegimeAnomaly(data: DictOfSeries, field: str, flagger: Flagger, @register(masking='all', module="drift") -def correctOffset(data: DictOfSeries, field: str, flagger: Flagger, - max_mean_jump: float, - normal_spread: float, - search_winsz: str, - min_periods: int, - regime_transmission: Optional[str]=None, - **kwargs - ) -> Tuple[DictOfSeries, Flagger]: +def correctOffset( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + max_mean_jump: float, + normal_spread: float, + search_winsz: FreqString, + min_periods: int, + regime_transmission: Optional[FreqString]=None, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ Parameters @@ -567,15 +596,19 @@ def correctOffset(data: DictOfSeries, field: str, flagger: Flagger, """ data, flagger = copy(data, field, flagger, field + '_CPcluster') - data, flagger = assignChangePointCluster(data, field + '_CPcluster', flagger, - lambda x, y: np.abs(np.mean(x) - np.mean(y)), - lambda x, y: max_mean_jump, - bwd_window=search_winsz, - min_periods_bwd=min_periods) + data, flagger = assignChangePointCluster( + data, field + '_CPcluster', flagger, + lambda x, y: np.abs(np.mean(x) - np.mean(y)), + lambda x, y: max_mean_jump, + bwd_window=search_winsz, + min_periods_bwd=min_periods + ) data, flagger = assignRegimeAnomaly(data, field, flagger, field + '_CPcluster', normal_spread) - data, flagger = correctRegimeAnomaly(data, field, flagger, field + '_CPcluster', - lambda x, p1: np.array([p1] * x.shape[0]), - regime_transmission=regime_transmission) + data, flagger = correctRegimeAnomaly( + data, field, flagger, field + '_CPcluster', + lambda x, p1: np.array([p1] * x.shape[0]), + regime_transmission=regime_transmission + ) data, flagger = drop(data, field + '_CPcluster', flagger) return data, flagger @@ -610,13 +643,17 @@ def _drift_fit(x, shift_target, cal_mean): @register(masking='all', module="drift") -def flagRegimeAnomaly(data: DictOfSeries, field: str, flagger: Flagger, - cluster_field: str, - norm_spread: float, - linkage_method: Literal["single", "complete", "average", "weighted", "centroid", "median", "ward"]="single", - metric: Callable[[np.array, np.array], float]=lambda x, y: np.abs(np.nanmean(x) - np.nanmean(y)), - norm_frac: float=0.5, - **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagRegimeAnomaly( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + cluster_field: ColumnName, + norm_spread: float, + linkage_method: LinkageString="single", + metric: Callable[[np.ndarray, np.ndarray], float]=lambda x, y: np.abs(np.nanmean(x) - np.nanmean(y)), + norm_frac: float=0.5, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ A function to flag values belonging to an anomalous regime regarding modelling regimes of field. @@ -671,15 +708,19 @@ def flagRegimeAnomaly(data: DictOfSeries, field: str, flagger: Flagger, @register(masking='all', module="drift") -def assignRegimeAnomaly(data: DictOfSeries, field: str, flagger: Flagger, - cluster_field: str, - norm_spread: float, - linkage_method: Literal["single", "complete", "average", "weighted", "centroid", "median", "ward"]="single", - metric: Callable[[np.array, np.array], float]=lambda x, y: np.abs(np.nanmean(x) - np.nanmean(y)), - norm_frac: float=0.5, - set_cluster: bool=True, - set_flags: bool=False, - **kwargs) -> Tuple[DictOfSeries, Flagger]: +def assignRegimeAnomaly( + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + cluster_field: ColumnName, + norm_spread: float, + linkage_method: LinkageString="single", + metric: Callable[[np.array, np.array], float]=lambda x, y: np.abs(np.nanmean(x) - np.nanmean(y)), + norm_frac: float=0.5, + set_cluster: bool=True, + set_flags: bool=False, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ A function to detect values belonging to an anomalous regime regarding modelling regimes of field. diff --git a/saqc/funcs/flagtools.py b/saqc/funcs/flagtools.py index d50984cde3fff2ff295520f8f7242dc1cf6d5230..364256e2926c49ef87f5c092b34a838144c5631a 100644 --- a/saqc/funcs/flagtools.py +++ b/saqc/funcs/flagtools.py @@ -12,20 +12,21 @@ from saqc.core.register import register from saqc.flagger import Flagger + @register(masking='field', module="flagtools") -def clearFlags(data: DictOfSeries, field: str, flagger: Flagger, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def clearFlags(data: DictOfSeries, field: ColumnName, flagger: Flagger, **kwargs) -> Tuple[DictOfSeries, Flagger]: flagger = flagger.clearFlags(field, **kwargs) return data, flagger @register(masking='field', module="flagtools") -def forceFlags(data: DictOfSeries, field: str, flagger: Flagger, flag: Any, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def forceFlags(data: DictOfSeries, field: ColumnName, flagger: Flagger, flag: Any, **kwargs) -> Tuple[DictOfSeries, Flagger]: flagger = flagger.clearFlags(field).setFlags(field, flag=flag, inplace=True, **kwargs) return data, flagger @register(masking='field', module="flagtools") -def flagDummy(data: DictOfSeries, field: str, flagger: Flagger, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagDummy(data: DictOfSeries, field: ColumnName, flagger: Flagger, **kwargs) -> Tuple[DictOfSeries, Flagger]: """ Function does nothing but returning data and flagger. @@ -49,7 +50,7 @@ def flagDummy(data: DictOfSeries, field: str, flagger: Flagger, **kwargs) -> Tu @register(masking='field', module="flagtools") -def flagForceFail(data: DictOfSeries, field: str, flagger: Flagger, **kwargs): +def flagForceFail(data: DictOfSeries, field: ColumnName, flagger: Flagger, **kwargs): """ Function raises a runtime error. @@ -67,7 +68,7 @@ def flagForceFail(data: DictOfSeries, field: str, flagger: Flagger, **kwargs): @register(masking='field', module="flagtools") -def flagUnflagged(data: DictOfSeries, field: str, flagger: Flagger, flag: Optional[Any]=None, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagUnflagged(data: DictOfSeries, field: ColumnName, flagger: Flagger, flag: Optional[Any]=None, **kwargs) -> Tuple[DictOfSeries, Flagger]: """ Function sets the GOOD flag to all values flagged better then GOOD. If there is an entry 'flag' in the kwargs dictionary passed, the @@ -99,7 +100,7 @@ def flagUnflagged(data: DictOfSeries, field: str, flagger: Flagger, flag: Option @register(masking='field', module="flagtools") -def flagGood(data: DictOfSeries, field: str, flagger: Flagger, flag: Optional[Any]=None, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagGood(data: DictOfSeries, field: ColumnName, flagger: Flagger, flag: Optional[Any]=None, **kwargs) -> Tuple[DictOfSeries, Flagger]: """ Function sets the GOOD flag to all values flagged better then GOOD. @@ -125,7 +126,7 @@ def flagGood(data: DictOfSeries, field: str, flagger: Flagger, flag: Optional[An @register(masking='field', module="flagtools") def flagManual( - data: DictOfSeries, field: str, flagger: Flagger, + data: DictOfSeries, field: ColumnName, flagger: Flagger, mdata: Union[pd.Series, pd.DataFrame, DictOfSeries], mflag: Any = 1, method=Literal["plain", "ontime", "left-open", "right-open"], diff --git a/saqc/funcs/outliers.py b/saqc/funcs/outliers.py index 827c6c267220111eb34b25adce349824e853b2e9..eb42a63bdca8b8a1bcc8dccefbb08085444cfab0 100644 --- a/saqc/funcs/outliers.py +++ b/saqc/funcs/outliers.py @@ -22,6 +22,7 @@ from saqc.lib.tools import ( findIndex, getFreqDelta ) +from saqc.lib.types import ColumnName, FreqString, IntegerWindow from saqc.funcs.scores import assignKNNScore import saqc.lib.ts_operators as ts_ops @@ -29,9 +30,9 @@ import saqc.lib.ts_operators as ts_ops @register(masking='field', module="outliers") def flagByStray( data: DictOfSeries, - field: str, + field: ColumnName, flagger: Flagger, - partition_freq: Optional[Union[str, int]]=None, + partition_freq: Optional[Union[IntegerWindow, FreqString]]=None, partition_min: int=11, iter_start: float=0.5, alpha: float=0.05, @@ -337,18 +338,18 @@ def _expFit(val_frame, scoring_method="kNNMaxGap", n_neighbors=10, iter_start=0. @register(masking='all', module="outliers") def flagMVScores( data: DictOfSeries, - field: str, + field: ColumnName, flagger: Flagger, - fields: Sequence[str], + fields: Sequence[ColumnName], trafo: Callable[[pd.Series], pd.Series]=lambda x: x, alpha: float=0.05, n_neighbors: int=10, scoring_func: Callable[[pd.Series], float]=np.sum, iter_start: float=0.5, - stray_partition: Optional[Union[str, int]]=None, + stray_partition: Optional[Union[IntegerWindow, FreqString]]=None, stray_partition_min: int=11, trafo_on_partition: bool=True, - reduction_range: Optional[str]=None, + reduction_range: Optional[FreqString]=None, reduction_drop_flagged: bool=False, reduction_thresh: float=3.5, reduction_min_periods: int=1, @@ -477,18 +478,18 @@ def flagMVScores( @register(masking='field', module="outliers") def flagRaise( - data: DictOfSeries, - field: str, - flagger: Flagger, - thresh: float, - raise_window: str, - intended_freq: str, - average_window: Optional[str]=None, - mean_raise_factor: float=2., - min_slope: Optional[float]=None, - min_slope_weight: float=0.8, - numba_boost: bool=True, - **kwargs, + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + thresh: float, + raise_window: FreqString, + intended_freq: FreqString, + average_window: Optional[FreqString]=None, + mean_raise_factor: float=2., + min_slope: Optional[float]=None, + min_slope_weight: float=0.8, + numba_boost: bool=True, + **kwargs, ) -> Tuple[DictOfSeries, Flagger]: """ The function flags raises and drops in value courses, that exceed a certain threshold @@ -642,7 +643,7 @@ def flagRaise( @register(masking='field', module="outliers") -def flagMAD(data: DictOfSeries, field: str, flagger: Flagger, window: str, z: float=3.5, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flagMAD(data: DictOfSeries, field: ColumnName, flagger: Flagger, window: FreqString, z: float=3.5, **kwargs) -> Tuple[DictOfSeries, Flagger]: """ @@ -703,11 +704,11 @@ def flagMAD(data: DictOfSeries, field: str, flagger: Flagger, window: str, z: fl @register(masking='field', module="outliers") def flagOffset( data: DictOfSeries, - field: str, + field: ColumnName, flagger: Flagger, thresh: float, tolerance: float, - window: Union[int, str], + window: Union[IntegerWindow, FreqString], rel_thresh: Optional[float]=None, numba_kickin: int=200000, **kwargs @@ -856,9 +857,9 @@ def flagOffset( @register(masking='field', module="outliers") def flagByGrubbs( data: DictOfSeries, - field: str, + field: ColumnName, flagger: Flagger, - winsz: Union[str, int], + winsz: Union[FreqString, IntegerWindow], alpha: float=0.05, min_periods: int=8, check_lagged: bool=False, @@ -959,7 +960,7 @@ def flagByGrubbs( @register(masking='field', module="outliers") def flagRange( data: DictOfSeries, - field: str, + field: ColumnName, flagger: Flagger, min: float=-np.inf, max: float=np.inf, @@ -1000,9 +1001,9 @@ def flagRange( @register(masking='all', module="outliers") def flagCrossStatistic( data: DictOfSeries, - field: str, + field: ColumnName, flagger: Flagger, - fields: Sequence[str], + fields: Sequence[ColumnName], thresh: float, cross_stat: Literal["modZscore", "Zscore"]="modZscore", **kwargs diff --git a/saqc/funcs/residues.py b/saqc/funcs/residues.py index 79a7a826c67080bf8942e19d6e9501963f72d1b5..77cdcd9488e6bb634dd43aa8bee1e1868227e923 100644 --- a/saqc/funcs/residues.py +++ b/saqc/funcs/residues.py @@ -111,7 +111,7 @@ def calculateRollingResidues( field: str, flagger: Flagger, winsz: Union[str, int], - func: Callable[[np.array], np.array]=np.mean, + func: Callable[[np.ndarray], np.ndarray]=np.mean, eval_flags: bool=True, min_periods: Optional[int]=0, center: bool=True, diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index f728779405775d17aa31e2cd0abb3c60a8c9ea1c..5698e815188dd24e4395b9809197f138f9559bea 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import re +import datetime from typing import Sequence, Union, Any, Iterator import itertools @@ -37,6 +38,13 @@ def toSequence(value: Union[T, Sequence[T]], default: Union[T, Sequence[T]] = No return value +def toOffset(freq_string: str, raw: bool = False) -> datetime.timedelta: + offset = pd.tseries.frequencies.to_offset(freq_string) + if raw: + return offset + return offset.delta.to_pytimedelta() + + @nb.jit(nopython=True, cache=True) def findIndex(iterable, value, start): i = start diff --git a/saqc/lib/types.py b/saqc/lib/types.py index 67bddeba6271ae2a34afb9fdae6c494768d29f03..650bfb0e779397b26e12b24710536b7ec1df0a3c 100644 --- a/saqc/lib/types.py +++ b/saqc/lib/types.py @@ -1,16 +1,30 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- -from typing import TypeVar, Union +from typing import TypeVar, Union, NewType +from typing_extensions import Protocol, Literal import numpy as np import pandas as pd -import dios -from saqc.flagger.flags import Flagger +from dios import DictOfSeries T = TypeVar("T") ArrayLike = TypeVar("ArrayLike", np.ndarray, pd.Series, pd.DataFrame) -PandasLike = TypeVar("PandasLike", pd.Series, pd.DataFrame, dios.DictOfSeries) -DiosLikeT = Union[dios.DictOfSeries, pd.DataFrame] +PandasLike = TypeVar("PandasLike", pd.Series, pd.DataFrame, DictOfSeries) +DiosLikeT = Union[DictOfSeries, pd.DataFrame] FuncReturnT = [dios.DictOfSeries, Flagger] + +# we only support fixed length offsets +FreqString = NewType("FreqString", Literal["D", "H", "T", "min", "S", "L", "ms", "U", "us", "N"]) + +# we define a bunch of type aliases, mostly needed to generate appropiate fuzzy data through hypothesis +ColumnName = NewType("ColumnName", str) +IntegerWindow = NewType("IntegerWindow", int) +TimestampColumnName = TypeVar("TimestampColumnName", bound=str) + +# needed for deeper typy hinting magic +class CurveFitter(Protocol): + def __call__(self, data: np.ndarray, *params: float) -> np.ndarray: + ... + diff --git a/setup.py b/setup.py index 0048952077db64fb09076749b32a99b8894e10e4..1aef9e71bb4959720f116c55807b9db8c6be9a32 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,8 @@ setup( "pyarrow", "python-intervals", "astor", - "dios" + "dios", + "typing_extensions", ], license="GPLv3", entry_points={"console_scripts": ["saqc=saqc.__main__:main"],}, diff --git a/test/common.py b/test/common.py index d5867e94476b9e7744826fd6f7d40f88770a0fbd..e07cc5cfb579e12f9f775e7112eab1f02ebca88b 100644 --- a/test/common.py +++ b/test/common.py @@ -2,13 +2,32 @@ # -*- coding: utf-8 -*- import io +from typing import get_type_hints import numpy as np import pandas as pd import dios +from hypothesis.strategies import ( + lists, + sampled_from, + composite, + from_regex, + sampled_from, + datetimes, + integers, + register_type_strategy, + from_type, +) +from hypothesis.extra.numpy import arrays, from_dtype +from hypothesis.strategies._internal.types import _global_type_lookup + +from dios import DictOfSeries + +from saqc.core.register import FUNC_MAP +from saqc.core.lib import SaQCFunction +from saqc.lib.types import FreqString, ColumnName, IntegerWindow from saqc.flagger import ( - PositionalFlagger, CategoricalFlagger, SimpleFlagger, DmpFlagger, @@ -24,6 +43,7 @@ TESTFLAGGER = ( DmpFlagger(), ) + def flagAll(data, field, flagger, **kwargs): # NOTE: remember to rename flag -> flag_values return data, flagger.setFlags(field=field, flag=flagger.BAD) @@ -48,3 +68,143 @@ def writeIO(content): f.write(content) f.seek(0) return f + + +MAX_EXAMPLES = 50 #100000 + + +@composite +def dioses(draw, min_cols=1): + """ + initialize data according to the current restrictions + """ + # NOTE: + # The following restriction showed up and should be enforced during init: + # - Column names need to satisify the following regex: [A-Za-z0-9_-]+ + # - DatetimeIndex needs to be sorted + # - Integer values larger than 2**53 lead to numerical instabilities during + # the integer->float->integer type conversion in _maskData/_unmaskData. + + cols = draw(lists(columnNames(), unique=True, min_size=min_cols)) + columns = { + c: draw(dataSeries(min_size=3)) + for c in cols + } + return DictOfSeries(columns) + +import numbers + +@composite +def dataSeries(draw, min_size=0, max_size=100, dtypes=("float32", "float64", "int32", "int64")): + if np.isscalar(dtypes): + dtypes = (dtypes,) + + dtype = np.dtype(draw(sampled_from(dtypes))) + if issubclass(dtype.type, numbers.Integral): + info = np.iinfo(dtype) + elif issubclass(dtype.type, numbers.Real): + info = np.finfo(dtype) + else: + raise ValueError("only numerical dtypes are supported") + # we don't want to fail just because of overflows + elements = from_dtype(dtype, min_value=info.min+1, max_value=info.max-1) + + index = draw(daterangeIndexes(min_size=min_size, max_size=max_size)) + values = draw(arrays(dtype=dtype, elements=elements, shape=len(index))) + return pd.Series(data=values, index=index) + + +@composite +def columnNames(draw): + return draw(from_regex(r"[A-Za-z0-9_-]+", fullmatch=True)) + + +@composite +def flaggers(draw, data): + """ + initialize a flagger and set some flags + """ + # flagger = draw(sampled_from(TESTFLAGGER)).initFlags(data) + flagger = draw(sampled_from([SimpleFlagger()])).initFlags(data) + for col, srs in data.items(): + loc_st = lists(sampled_from(sorted(srs.index)), unique=True, max_size=len(srs)-1) + flagger = flagger.setFlags(field=col, loc=draw(loc_st)) + return flagger + + +@composite +def functions(draw, module: str=None): + samples = tuple(FUNC_MAP.values()) + if module: + samples = tuple(f for f in samples if f.name.startswith(module)) + # samples = [FUNC_MAP["drift.correctExponentialDrift"]] + return draw(sampled_from(samples)) + + +@composite +def daterangeIndexes(draw, min_size=0, max_size=100): + min_date = pd.Timestamp("1900-01-01").to_pydatetime() + max_date = pd.Timestamp("2099-12-31").to_pydatetime() + start = draw(datetimes(min_value=min_date, max_value=max_date)) + periods = draw(integers(min_value=min_size, max_value=max_size)) + freq = draw(sampled_from(["D", "H", "T", "min", "S", "L", "ms", "U", "us", "N"])) + return pd.date_range(start, periods=periods, freq=freq) + + +@composite +def frequencyStrings(draw, _): + freq = draw(sampled_from(["D", "H", "T", "min", "S", "L", "ms", "U", "us", "N"])) + mult = draw(integers(min_value=1, max_value=10)) + value = f"{mult}{freq}" + return value + +@composite +def dataFieldFlagger(draw): + data = draw(dioses()) + field = draw(sampled_from(sorted(data.columns))) + flagger = draw(flaggers(data)) + return data, field, flagger + + +@composite +def functionCalls(draw, module: str=None): + func = draw(functions(module)) + kwargs = draw(functionKwargs(func)) + return func, kwargs + + +@composite +def functionKwargs(draw, func: SaQCFunction): + data = draw(dioses()) + field = draw(sampled_from(sorted(data.columns))) + + kwargs = { + "data": data, + "field": field, + "flagger": draw(flaggers(data)) + } + + column_name_strategy = lambda _: sampled_from(sorted(c for c in data.columns if c != field)) + interger_window_strategy = lambda _: integers(min_value=1, max_value=len(data[field]) - 1) + + register_type_strategy(FreqString, frequencyStrings) + register_type_strategy(ColumnName, column_name_strategy) + register_type_strategy(IntegerWindow, interger_window_strategy) + + for k, v in get_type_hints(func.func).items(): + if k not in {"data", "field", "flagger", "return"}: + value = draw(from_type(v)) + # if v is TimestampColumnName: + # value = draw(columnNames()) + # # we don't want to overwrite 'field' + # assume(value != field) + # # let's generate and add a timestamp column + # data[value] = draw(dataSeries(dtypes="datetime64[ns]", length=len(data[field]))) + # # data[value] = draw(dataSeries(dtypes="datetime64[ns]")) + kwargs[k] = value + + del _global_type_lookup[FreqString] + del _global_type_lookup[ColumnName] + del _global_type_lookup[IntegerWindow] + + return kwargs diff --git a/test/core/test_masking.py b/test/core/test_masking.py index 270aba430a40133b0b9ae36eb8f017e4a7923148..4d285eabf28cc44a7e688dd178e2d55b383bd1c0 100644 --- a/test/core/test_masking.py +++ b/test/core/test_masking.py @@ -3,117 +3,148 @@ import logging -import pytest import pandas as pd -from saqc import SaQC, register -from test.common import initData, TESTFLAGGER +from hypothesis import given, settings +from hypothesis.strategies import ( + sampled_from, + composite, + sampled_from, +) +from saqc.core.core import _maskData, _unmaskData -logging.disable(logging.CRITICAL) +from test.common import dataFieldFlagger, MAX_EXAMPLES -@pytest.fixture -def data(): - return initData(3) +logging.disable(logging.CRITICAL) -@pytest.mark.parametrize("flagger", TESTFLAGGER) -def test_masking(data, flagger): +@settings(max_examples=MAX_EXAMPLES, deadline=None) +@given(data_field_flagger=dataFieldFlagger()) +def test_maskingMasksData(data_field_flagger): """ - test if flagged values are exluded during the preceding tests + test if flagged values are replaced by np.nan """ - flagger = flagger.initFlags(data) - var1 = 'var1' - mn = min(data[var1]) - mx = max(data[var1]) / 2 + data_in, field, flagger = data_field_flagger + data_masked, _ = _maskData(data_in, flagger, columns=[field], to_mask=flagger.BAD) + assert data_masked.aloc[flagger.isFlagged(flagger.BAD)].isna().all(axis=None) - qc = SaQC(flagger, data) - qc = qc.outliers.flagRange(var1, mn, mx) - # min is not considered because its the smalles possible value. - # if masking works, `data > max` will be masked, - # so the following will deliver True for in range (data < max), - # otherwise False, like an inverse range-test - qc = qc.generic.process("dummy", func=lambda var1: var1 >= mn) - pdata, pflagger = qc.getResult(raw=True) - out_of_range = pflagger.isFlagged(var1) - in_range = ~out_of_range +@settings(max_examples=MAX_EXAMPLES, deadline=None) +@given(data_field_flagger=dataFieldFlagger()) +def test_dataMutationPreventsUnmasking(data_field_flagger): + """ test if (un)masking works as expected on data-changes. - assert (pdata.loc[out_of_range, "dummy"] == False).all() - assert (pdata.loc[in_range, "dummy"] == True).all() + if `data` is mutated after `_maskData`, `_unmaskData` should be a no-op + """ + filler = -9999 + data_in, field, flagger = data_field_flagger + data_masked, mask = _maskData(data_in, flagger, columns=[field], to_mask=flagger.BAD) + data_masked[field] = filler + data_out = _unmaskData(data_in, mask, data_masked, flagger, to_mask=flagger.BAD) + assert (data_out[field] == filler).all(axis=None) -@pytest.mark.parametrize("flagger", TESTFLAGGER) -def test_masking_UnmaskingOnDataChange(data, flagger): - """ test if (un)masking works as expected on data-change. - If the data change in the func, unmasking should respect this changes and - should not reapply original data, instead take the new data (and flags) as is. - Also if flags change, the data should be taken as is. +@settings(max_examples=MAX_EXAMPLES, deadline=None) +@given(data_field_flagger=dataFieldFlagger()) +def test_flaggerMutationPreventsUnmasking(data_field_flagger): + """ test if (un)masking works as expected on flagger-changes. + + if `flagger` is mutated after `_maskData`, `_unmaskData` should be a no-op """ - FILLER = -9999 - - @register(masking='all') - def changeData(data, field, flagger, **kwargs): - mask = data.isna() - data.aloc[mask] = FILLER - return data, flagger - - @register(masking='all') - def changeFlags(data, field, flagger, **kwargs): - mask = data.isna() - flagger = flagger.setFlags(field, loc=mask[field], flag=flagger.UNFLAGGED, force=True) - return data, flagger - - var = data.columns[0] - var_data = data[var] - mn, mx = var_data.max() * .25, var_data.max() * .75 - range_mask = (var_data < mn) | (var_data > mx) - - qc = SaQC(flagger, data) - qc = qc.outliers.flagRange(var, mn, mx) - qcD = qc.changeData(var) - qcF = qc.changeFlags(var) - - data, flagger = qcD.getResult() - assert (data[var][range_mask] == FILLER).all(axis=None) - # only flags change so the data should be still NaN, because - # the unmasking was disabled, but the masking indeed was happening - data, flagger = qcF.getResult() - assert data[var][range_mask].isna().all(axis=None) - - -@pytest.mark.parametrize("flagger", TESTFLAGGER) -def test_shapeDiffUnmasking(data, flagger): - """ test if (un)masking works as expected on index-change. + data_in, field, flagger = data_field_flagger + data_masked, mask = _maskData(data_in, flagger, columns=[field], to_mask=flagger.BAD) + flagger = flagger.setFlags(field, flag=flagger.UNFLAGGED, force=True) + data_out = _unmaskData(data_in, mask, data_masked, flagger, to_mask=flagger.BAD) + assert (data_out.loc[flagger.isFlagged(field, flag=flagger.BAD), field].isna()).all(axis=None) + + +@settings(max_examples=MAX_EXAMPLES, deadline=None) +@given(data_field_flagger=dataFieldFlagger()) +def test_reshapingPreventsUnmasking(data_field_flagger): + """ test if (un)masking works as expected on index-changes. If the index of data (and flags) change in the func, the unmasking, should not reapply original data, instead take the new data (and flags) as is. """ - FILLER = -1111 + filler = -1111 + + data_in, field, flagger = data_field_flagger + data_masked, mask = _maskData(data_in, flagger, columns=[field], to_mask=flagger.BAD) + + # mutate indexes of `data` and `flagger` + index = data_masked[field].index.to_series() + index.iloc[-len(data_masked[field])//2:] += pd.Timedelta("7.5Min") + data_masked[field] = pd.Series(data=filler, index=index) + flags = flagger.getFlags() + flags[field] = pd.Series(data=flags[field].values, index=index) + flagger = flagger.initFlags(flags=flags) + + data_out = _unmaskData(data_in, mask, data_masked, flagger, to_mask=flagger.BAD) + assert (data_out[field] == filler).all(axis=None) + + +@settings(max_examples=MAX_EXAMPLES, deadline=None) +@given(data_field_flagger=dataFieldFlagger()) +def test_unmaskingInvertsMasking(data_field_flagger): + """ + unmasking data should invert the masking + """ + data_in, field, flagger = data_field_flagger + data_masked, mask = _maskData(data_in, flagger, columns=[field], to_mask=flagger.BAD) + data_out = _unmaskData(data_in, mask, data_masked, flagger, to_mask=flagger.BAD) + assert data_in.to_df().equals(data_out.to_df()) + + +# @settings(max_examples=MAX_EXAMPLES, deadline=None) +# @given(data_field_flagger=dataFieldFlagger(), func_kwargs=flagFuncsKwargs()) +# def test_maskingPreservesData(data_field_flagger, func_kwargs): +# """ +# no mutations on pre-flagged data + +# calling a function on pre-flagged data should yield the same +# behavior as calling this function on data where the flagged values +# are removed +# """ + +# data_in, field, flagger = data_field_flagger + +# data_masked, mask = _maskData(data_in, flagger, columns=[field], to_mask=flagger.BAD) +# func, kwargs = func_kwargs +# data_masked, _ = func(data_masked, field, flagger, **kwargs) +# data_out = _unmaskData(data_in, mask, data_masked, flagger, to_mask=flagger.BAD) + +# flags_in = flagger.isFlagged(flag=flagger.BAD) +# assert data_in.aloc[flags_in].equals(data_out.aloc[flags_in]) - @register(masking='none') - def pseudoHarmo(data, field, flagger, **kwargs): - index = data[field].index.to_series() - index.iloc[-len(data[field])//2:] += pd.Timedelta("7.5Min") - data[field] = pd.Series(data=FILLER, index=index) +# @settings(max_examples=MAX_EXAMPLES, deadline=None) +# @given(data_field_flagger=dataFieldFlagger(), func_kwargs=flagFuncsKwargs()) +# def test_maskingEqualsRemoval(data_field_flagger, func_kwargs): +# """ +# calling a function on pre-flagged data should yield the same +# results as calling this function on data where the flagged values +# are removed +# """ +# func, kwargs = func_kwargs - flags = flagger.getFlags() - flags[field] = pd.Series(data=flags[field].values, index=index) +# data, field, flagger = data_field_flagger +# flagged_in = flagger.isFlagged(flag=flagger.BAD, comparator=">=") - flagger = flagger.initFlags(flags=flags) - return data, flagger +# # mask and call +# data_left, _ = _maskData(data, flagger, columns=[field], to_mask=flagger.BAD) +# data_left, _ = func(data_left, field, flagger, **kwargs) - var = data.columns[0] - var_data = data[var] - mn, mx = var_data.max() * .25, var_data.max() * .75 +# # remove and call +# data_right = data.aloc[~flagged_in] +# flagger_right = flagger.initFlags(flagger.getFlags().aloc[~flagged_in]) +# data_right, _ = func(data_right, field, flagger_right, **kwargs) - qc = SaQC(flagger, data) - qc = qc.outliers.flagRange(var, mn, mx) - qc = qc.pseudoHarmo(var) +# # NOTE: we need to handle the implicit type conversion in `_maskData` +# data_left_compare = data_left.aloc[~flagged_in] +# data_left_compare[field] = data_left_compare[field].astype(data[field].dtype) - data, flagger = qc.getResult(raw=True) - assert (data[var] == FILLER).all(axis=None) +# assert data_right.equals(data_left_compare)