From baf2063d654380d437d55331621661f6215c6240 Mon Sep 17 00:00:00 2001 From: Bert Palm <bert.palm@ufz.de> Date: Tue, 23 Mar 2021 23:39:18 +0100 Subject: [PATCH] exposed 'flag' to signature --- saqc/core/modules/breaks.py | 4 ++ saqc/core/modules/changepoints.py | 3 ++ saqc/core/modules/constants.py | 11 ++++-- saqc/core/modules/curvefit.py | 21 ++++++---- saqc/core/modules/drift.py | 5 +++ saqc/core/modules/flagtools.py | 3 +- saqc/core/modules/generic.py | 19 +++++++-- saqc/core/modules/outliers.py | 15 +++++++- saqc/core/modules/pattern.py | 11 ++++-- saqc/core/modules/resampling.py | 2 + saqc/core/modules/residues.py | 3 ++ saqc/core/modules/rolling.py | 2 + saqc/funcs/breaks.py | 16 ++++++-- saqc/funcs/changepoints.py | 38 ++++++++++++------ saqc/funcs/constants.py | 19 ++++++--- saqc/funcs/curvefit.py | 40 +++++++++++-------- saqc/funcs/drift.py | 47 +++++++++++++++-------- saqc/funcs/flagtools.py | 12 ++++-- saqc/funcs/generic.py | 24 +++++++++--- saqc/funcs/outliers.py | 64 ++++++++++++++++++++++++------- saqc/funcs/pattern.py | 22 ++++++----- saqc/funcs/resampling.py | 15 +++++++- saqc/funcs/residues.py | 7 ++++ saqc/funcs/rolling.py | 9 ++--- 24 files changed, 299 insertions(+), 113 deletions(-) diff --git a/saqc/core/modules/breaks.py b/saqc/core/modules/breaks.py index 02b237a60..b07edba09 100644 --- a/saqc/core/modules/breaks.py +++ b/saqc/core/modules/breaks.py @@ -5,6 +5,7 @@ from typing import Tuple import numpy as np from dios import DictOfSeries +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase from saqc.lib.types import FreqString, IntegerWindow, ColumnName @@ -16,6 +17,7 @@ class Breaks(ModuleBase): self, field: ColumnName, nodata: float = np.nan, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagMissing", locals()) @@ -25,6 +27,7 @@ class Breaks(ModuleBase): field: ColumnName, gap_window: FreqString, group_window: FreqString, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagIsolated", locals()) @@ -35,6 +38,7 @@ class Breaks(ModuleBase): thresh: float, winsz: FreqString, min_periods: IntegerWindow = 1, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagJumps", locals()) diff --git a/saqc/core/modules/changepoints.py b/saqc/core/modules/changepoints.py index bab02fc86..19ed26d29 100644 --- a/saqc/core/modules/changepoints.py +++ b/saqc/core/modules/changepoints.py @@ -7,6 +7,7 @@ import numpy as np from dios import DictOfSeries from typing_extensions import Literal +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase from saqc.lib.types import FreqString, IntegerWindow @@ -26,6 +27,7 @@ class ChangePoints(ModuleBase): try_to_jit: bool = True, # TODO rm, not a user decision reduce_window: FreqString = None, reduce_func: Callable[[np.ndarray, np.ndarray], int] = lambda x, _: x.argmax(), + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagChangePoints", locals()) @@ -45,6 +47,7 @@ class ChangePoints(ModuleBase): model_by_resids: bool = False, flag_changepoints: bool = False, assign_cluster: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("assignChangePointCluster", locals()) diff --git a/saqc/core/modules/constants.py b/saqc/core/modules/constants.py index 09f55eb00..22239aa09 100644 --- a/saqc/core/modules/constants.py +++ b/saqc/core/modules/constants.py @@ -4,6 +4,7 @@ from typing import Tuple from dios import DictOfSeries +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase from saqc.lib.types import FreqString, ColumnName @@ -14,10 +15,11 @@ class Constants(ModuleBase): def flagByVariance( self, field: ColumnName, - window: FreqString="12h", - thresh: float=0.0005, - max_missing: int=None, - max_consec_missing: int=None, + window: FreqString = "12h", + thresh: float = 0.0005, + max_missing: int = None, + max_consec_missing: int = None, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagByVariance", locals()) @@ -27,6 +29,7 @@ class Constants(ModuleBase): field: ColumnName, thresh: float, window: FreqString, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagConstants", locals()) diff --git a/saqc/core/modules/curvefit.py b/saqc/core/modules/curvefit.py index 595126406..c24ce08b0 100644 --- a/saqc/core/modules/curvefit.py +++ b/saqc/core/modules/curvefit.py @@ -5,17 +5,22 @@ from typing import Union, Tuple from dios import DictOfSeries from typing_extensions import Literal +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase class Curvefit(ModuleBase): - def fitPolynomial(self, field: str, - winsz: Union[int, str], - polydeg: int, - numba: Literal[True, False, "auto"] = "auto", - eval_flags: bool = True, - min_periods: int = 0, - return_residues: bool = False, - **kwargs) -> Tuple[DictOfSeries, Flagger]: + def fitPolynomial( + self, + field: str, + winsz: Union[int, str], + polydeg: int, + numba: Literal[True, False, "auto"] = "auto", + eval_flags: bool = True, + min_periods: int = 0, + return_residues: bool = False, + flag: float = BAD, + **kwargs + ) -> Tuple[DictOfSeries, Flagger]: return self.defer("fitPolynomial", locals()) diff --git a/saqc/core/modules/drift.py b/saqc/core/modules/drift.py index 3c422fc93..e063e62f3 100644 --- a/saqc/core/modules/drift.py +++ b/saqc/core/modules/drift.py @@ -6,6 +6,7 @@ from typing import Sequence, Callable, Optional, Tuple import numpy as np from scipy.spatial.distance import pdist +from saqc.constants import * from saqc.core.modules.base import ModuleBase from saqc.funcs import LinkageString, DictOfSeries, Flagger from saqc.lib.types import ColumnName, FreqString, CurveFitter @@ -21,6 +22,7 @@ class Drift(ModuleBase): norm_frac: float = 0.5, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), linkage_method: LinkageString = "single", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagDriftFromNorm", locals()) @@ -32,6 +34,7 @@ class Drift(ModuleBase): segment_freq: FreqString, thresh: float, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagDriftFromReference", locals()) @@ -46,6 +49,7 @@ class Drift(ModuleBase): norm_frac: float = 0.5, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), linkage_method: LinkageString = "single", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagDriftFromScaledNorm", locals()) @@ -56,6 +60,7 @@ class Drift(ModuleBase): maint_data_field: ColumnName, cal_mean: int = 5, flag_maint_period: bool = False, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("correctExponentialDrift", locals()) diff --git a/saqc/core/modules/flagtools.py b/saqc/core/modules/flagtools.py index 426dfb276..7cc2b1633 100644 --- a/saqc/core/modules/flagtools.py +++ b/saqc/core/modules/flagtools.py @@ -41,7 +41,8 @@ class FlagTools(ModuleBase): self, field: ColumnName, mdata: Union[pd.Series, pd.DataFrame, DictOfSeries], mflag: Any = 1, - method=Literal["plain", "ontime", "left-open", "right-open"], + method: Literal["plain", "ontime", "left-open", "right-open"] = 'plain', + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagManual", locals()) diff --git a/saqc/core/modules/generic.py b/saqc/core/modules/generic.py index 3f44c45f7..da80700c3 100644 --- a/saqc/core/modules/generic.py +++ b/saqc/core/modules/generic.py @@ -13,10 +13,21 @@ from saqc.core.modules.base import ModuleBase class Generic(ModuleBase): - def process(self, field: str, func: Callable[[pd.Series], pd.Series], - nodata: float = np.nan, **kwargs) -> Tuple[DictOfSeries, Flagger]: + def process( + self, + field: str, + func: Callable[[pd.Series], pd.Series], + nodata: float = np.nan, + **kwargs + ) -> Tuple[DictOfSeries, Flagger]: return self.defer("process", locals()) - def flag(self, field: str, func: Callable[[pd.Series], pd.Series], - nodata: float = np.nan, flag=BAD, **kwargs) -> Tuple[DictOfSeries, Flagger]: + def flag( + self, + field: str, + func: Callable[[pd.Series], pd.Series], + nodata: float = np.nan, + flag: float = BAD, + **kwargs + ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flag", locals()) diff --git a/saqc/core/modules/outliers.py b/saqc/core/modules/outliers.py index 08efb1068..bf8152039 100644 --- a/saqc/core/modules/outliers.py +++ b/saqc/core/modules/outliers.py @@ -8,6 +8,7 @@ import pandas as pd from dios import DictOfSeries from typing_extensions import Literal +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase from saqc.lib.types import IntegerWindow, FreqString, ColumnName @@ -22,6 +23,7 @@ class Outliers(ModuleBase): partition_min: int = 11, iter_start: float = 0.5, alpha: float = 0.05, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagByStray", locals()) @@ -42,6 +44,7 @@ class Outliers(ModuleBase): reduction_drop_flagged: bool = False, # TODO: still a case ? reduction_thresh: float = 3.5, reduction_min_periods: int = 1, + flag: float = BAD, **kwargs, ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagMVScores", locals()) @@ -57,12 +60,18 @@ class Outliers(ModuleBase): min_slope: Optional[float] = None, min_slope_weight: float = 0.8, numba_boost: bool = True, # TODO: rm, not a user decision + flag: float = BAD, **kwargs, ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagRaise", locals()) def flagMAD( - self, field: ColumnName, window: FreqString, z: float = 3.5, **kwargs + self, + field: ColumnName, + window: FreqString, + z: float = 3.5, + flag: float = BAD, + **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagMAD", locals()) @@ -74,6 +83,7 @@ class Outliers(ModuleBase): window: Union[IntegerWindow, FreqString], rel_thresh: Optional[float] = None, numba_kickin: int = 200000, # TODO: rm, not a user decision + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagOffset", locals()) @@ -85,6 +95,7 @@ class Outliers(ModuleBase): alpha: float = 0.05, min_periods: int = 8, check_lagged: bool = False, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagByGrubbs", locals()) @@ -94,6 +105,7 @@ class Outliers(ModuleBase): field: ColumnName, min: float = -np.inf, max: float = np.inf, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagRange", locals()) @@ -104,6 +116,7 @@ class Outliers(ModuleBase): fields: Sequence[ColumnName], thresh: float, cross_stat: Literal["modZscore", "Zscore"] = "modZscore", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagCrossStatistic", locals()) diff --git a/saqc/core/modules/pattern.py b/saqc/core/modules/pattern.py index 06c9ab26c..38d083945 100644 --- a/saqc/core/modules/pattern.py +++ b/saqc/core/modules/pattern.py @@ -5,6 +5,7 @@ from typing import Sequence, Tuple from dios import DictOfSeries +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase @@ -15,8 +16,9 @@ class Pattern(ModuleBase): self, field: str, ref_field: str, - widths: Sequence[int]=(1, 2, 4, 8), - waveform: str="mexh", + widths: Sequence[int] = (1, 2, 4, 8), + waveform: str = "mexh", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagPatternByDTW", locals()) @@ -25,8 +27,9 @@ class Pattern(ModuleBase): self, field: str, ref_field: str, - max_distance: float=0.03, - normalize: bool=True, + max_distance: float = 0.03, + normalize: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagPatternByWavelet", locals()) diff --git a/saqc/core/modules/resampling.py b/saqc/core/modules/resampling.py index be4859bb5..9822bbd8b 100644 --- a/saqc/core/modules/resampling.py +++ b/saqc/core/modules/resampling.py @@ -8,6 +8,7 @@ import pandas as pd from dios import DictOfSeries from typing_extensions import Literal +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase from saqc.funcs.interpolation import _SUPPORTED_METHODS @@ -22,6 +23,7 @@ class Resampling(ModuleBase): value_func, flag_func: Callable[[pd.Series], float] = np.nanmax, method: Literal["fagg", "bagg", "nagg"] = "nagg", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("aggregate", locals()) diff --git a/saqc/core/modules/residues.py b/saqc/core/modules/residues.py index 85d7426f0..877323546 100644 --- a/saqc/core/modules/residues.py +++ b/saqc/core/modules/residues.py @@ -7,6 +7,7 @@ import numpy as np from dios import DictOfSeries from typing_extensions import Literal +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase @@ -21,6 +22,7 @@ class Residues(ModuleBase): numba: Literal[True, False, "auto"] = "auto", # TODO: rm, not a a user decision eval_flags: bool = True, # TODO, not valid anymore, if still needed, maybe assign user-passed ``flag``? min_periods: Optional[int] = 0, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("calculatePolynomialResidues", locals()) @@ -33,6 +35,7 @@ class Residues(ModuleBase): eval_flags: bool = True, min_periods: Optional[int] = 0, center: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("calculateRollingResidues", locals()) diff --git a/saqc/core/modules/rolling.py b/saqc/core/modules/rolling.py index f9c6be163..d29cb4018 100644 --- a/saqc/core/modules/rolling.py +++ b/saqc/core/modules/rolling.py @@ -6,6 +6,7 @@ from typing import Union, Callable import numpy as np import pandas as pd +from saqc.constants import * from saqc.core.modules.base import ModuleBase @@ -19,6 +20,7 @@ class Rolling(ModuleBase): min_periods: int=0, center: bool=True, return_residues=False, # TODO: this should not be public, a wrapper would be better + flag: float = BAD, **kwargs ): return self.defer("roll", locals()) diff --git a/saqc/funcs/breaks.py b/saqc/funcs/breaks.py index 6c394e3e7..8f10e9b72 100644 --- a/saqc/funcs/breaks.py +++ b/saqc/funcs/breaks.py @@ -16,6 +16,7 @@ import pandas.tseries.frequencies from dios import DictOfSeries +from saqc.constants import * from saqc.lib.tools import groupConsecutives from saqc.lib.types import FreqString, ColumnName, IntegerWindow from saqc.funcs.changepoints import assignChangePointCluster @@ -29,6 +30,7 @@ def flagMissing( field: ColumnName, flagger: Flagger, nodata: float = np.nan, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -44,6 +46,8 @@ def flagMissing( A flagger object, holding flags and additional Informations related to `data`. nodata : any, default np.nan A value that defines missing data. + flag : float, default BAD + flag to set. Returns ------- @@ -59,7 +63,7 @@ def flagMissing( else: mask = datacol == nodata - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -70,6 +74,7 @@ def flagIsolated( flagger: Flagger, gap_window: FreqString, group_window: FreqString, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -92,6 +97,8 @@ def flagIsolated( group_window : str The maximum temporal extension allowed for a group that is isolated by gaps of size 'gap_window', to be actually flagged as isolated group. See condition (1). + flag : float, default BAD + flag to set. Returns ------- @@ -130,7 +137,7 @@ def flagIsolated( if right.all(): flags[start:stop] = True - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -142,6 +149,7 @@ def flagJumps( thresh: float, winsz: FreqString, min_periods: IntegerWindow = 1, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -163,6 +171,8 @@ def flagJumps( min_periods : int, default 1 Minimum number of periods that have to be present in a window of size `winsz`, so that the mean value obtained from that window is regarded valid. + flag : float, default BAD + flag to set. """ return assignChangePointCluster( data, field, flagger, @@ -173,6 +183,6 @@ def flagJumps( flag_changepoints=True, model_by_resids=False, assign_cluster=False, + flag=flag, **kwargs ) - diff --git a/saqc/funcs/changepoints.py b/saqc/funcs/changepoints.py index 7025ad712..4ef620f54 100644 --- a/saqc/funcs/changepoints.py +++ b/saqc/funcs/changepoints.py @@ -33,6 +33,7 @@ def flagChangePoints( try_to_jit: bool = True, # TODO rm, not a user decision reduce_window: FreqString = None, reduce_func: Callable[[np.ndarray, np.ndarray], int] = lambda x, _: x.argmax(), + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -79,19 +80,31 @@ def flagChangePoints( First input parameter will hold the result from the stat_func evaluation for every reduction window. Second input parameter holds the result from the thresh_func evaluation. The default reduction function just selects the value that maximizes the stat_func. - + flag : float, default BAD + flag to set. Returns ------- - """ return assignChangePointCluster( - data, field, flagger, stat_func=stat_func, thresh_func=thresh_func, - bwd_window=bwd_window, min_periods_bwd=min_periods_bwd, - fwd_window=fwd_window, min_periods_fwd=min_periods_fwd, closed=closed, - try_to_jit=try_to_jit, reduce_window=reduce_window, - reduce_func=reduce_func, flag_changepoints=True, model_by_resids=False, - assign_cluster=False, **kwargs + data, + field, + flagger, + stat_func=stat_func, + thresh_func=thresh_func, + bwd_window=bwd_window, + min_periods_bwd=min_periods_bwd, + fwd_window=fwd_window, + min_periods_fwd=min_periods_fwd, + closed=closed, + try_to_jit=try_to_jit, + reduce_window=reduce_window, + reduce_func=reduce_func, + flag_changepoints=True, + model_by_resids=False, + assign_cluster=False, + flag=flag, + **kwargs ) @@ -111,6 +124,7 @@ def assignChangePointCluster( model_by_resids: bool = False, flag_changepoints: bool = False, assign_cluster: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -160,15 +174,16 @@ def assignChangePointCluster( reduction window. Second input parameter holds the result from the thresh_func evaluation. The default reduction function just selects the value that maximizes the stat_func. flag_changepoints : bool, default False - If true, the points, where there is a change in data modelling regime detected get flagged BAD. + If true, the points, where there is a change in data modelling regime detected gets flagged. model_by_resids : bool, default False If True, the data is replaced by the stat_funcs results instead of regime labels. assign_cluster : bool, default True Is set to False, if called by function that oly wants to calculate flags. + flag : float, default BAD + flag to set. Returns ------- - """ data = data.copy() data_ser = data[field].dropna() @@ -242,8 +257,7 @@ def assignChangePointCluster( flagger[:, field] = UNFLAGGED if flag_changepoints: - # TODO: does not respect kwargs[flag] - flagger[det_index, field] = BAD + flagger[det_index, field] = flag return data, flagger diff --git a/saqc/funcs/constants.py b/saqc/funcs/constants.py index 02327498f..5d0b30804 100644 --- a/saqc/funcs/constants.py +++ b/saqc/funcs/constants.py @@ -9,6 +9,7 @@ import pandas as pd from dios import DictOfSeries +from saqc.constants import * from saqc.core.register import register from saqc.flagger import Flagger from saqc.lib.ts_operators import varQC @@ -23,6 +24,7 @@ def flagConstants( flagger: Flagger, thresh: float, window: FreqString, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -48,6 +50,8 @@ def flagConstants( Upper bound for the maximum total change of an interval to be flagged constant. window : str Lower bound for the size of an interval to be flagged constant. + flag : float, default BAD + flag to set. Returns ------- @@ -73,7 +77,7 @@ def flagConstants( m2 = r.max() - r.min() <= thresh mask = m1 | m2 - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -82,10 +86,11 @@ def flagByVariance( data: DictOfSeries, field: ColumnName, flagger: Flagger, - window: FreqString="12h", - thresh: float=0.0005, - max_missing: int=None, - max_consec_missing: int=None, + window: FreqString = "12h", + thresh: float = 0.0005, + max_missing: int = None, + max_consec_missing: int = None, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -114,6 +119,8 @@ def flagByVariance( Maximum number of consecutive nan values allowed in an interval to retrieve a valid variance from it. (Intervals with a number of nans exceeding "max_consec_missing" have no chance to get flagged a plateau!) + flag : float, default BAD + flag to set. Returns ------- @@ -154,5 +161,5 @@ def flagByVariance( # result: plateaus = (plateaus[plateaus == 1.0]).index - flagger[plateaus, field] = kwargs['flag'] + flagger[plateaus, field] = flag return data, flagger diff --git a/saqc/funcs/curvefit.py b/saqc/funcs/curvefit.py index fc04cbeac..f77b75346 100644 --- a/saqc/funcs/curvefit.py +++ b/saqc/funcs/curvefit.py @@ -4,29 +4,37 @@ from math import floor from typing import Tuple, Union from typing_extensions import Literal - import numpy as np import pandas as pd - from dios import DictOfSeries +from saqc.constants import * from saqc.core.register import register - from saqc.lib.tools import getFreqDelta from saqc.flagger import Flagger -from saqc.lib.ts_operators import polyRollerIrregular, polyRollerNumba, polyRoller, polyRollerNoMissingNumba, \ +from saqc.lib.ts_operators import ( + polyRollerIrregular, + polyRollerNumba, + polyRoller, + polyRollerNoMissingNumba, polyRollerNoMissing +) @register(masking='field', module="curvefit") -def fitPolynomial(data: DictOfSeries, field: str, flagger: Flagger, - winsz: Union[int, str], - polydeg: int, - numba: Literal[True, False, "auto"] = "auto", - eval_flags: bool = True, - min_periods: int = 0, - return_residues: bool = False, - **kwargs) -> Tuple[DictOfSeries, Flagger]: +def fitPolynomial( + data: DictOfSeries, + field: str, + flagger: Flagger, + winsz: Union[int, str], + polydeg: int, + numba: Literal[True, False, "auto"] = "auto", + eval_flags: bool = True, + min_periods: int = 0, + return_residues: bool = False, + flag: float = BAD, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ Function fits a polynomial model to the data and returns the fitted data curve. @@ -91,6 +99,8 @@ def fitPolynomial(data: DictOfSeries, field: str, flagger: Flagger, set the minimum number of periods to the number of values in an offset defined window size, pass np.nan. return_residues : bool, default False Internal parameter. Makes the method return the residues instead of the fit. + flag : float, default BAD + flag to set. Returns ------- @@ -149,8 +159,8 @@ def fitPolynomial(data: DictOfSeries, field: str, flagger: Flagger, lambda x, y: x[y], raw=True, args=(center_index,) ) - # we need a missing value marker that is not nan, because nan values dont get passed by pandas rolling - # method + # we need a missing value marker that is not nan, + # because nan values dont get passed by pandas rolling method miss_marker = to_fit.min() miss_marker = np.floor(miss_marker - 1) na_mask = to_fit.isna() @@ -192,8 +202,6 @@ def fitPolynomial(data: DictOfSeries, field: str, flagger: Flagger, data[field] = residues if eval_flags: - # with the new flagger we dont have to care - # about to set NaNs to the original flags anymore # TODO: we does not get any flags here, because of masking=field worst = flagger[field].rolling(winsz, center=True, min_periods=min_periods).max() flagger[field] = worst diff --git a/saqc/funcs/drift.py b/saqc/funcs/drift.py index 14417c3f0..d8605f67d 100644 --- a/saqc/funcs/drift.py +++ b/saqc/funcs/drift.py @@ -1,17 +1,19 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- + +import numpy as np +import pandas as pd import functools +from dios import DictOfSeries + from typing import Optional, Tuple, Sequence, Callable, Optional from typing_extensions import Literal -import numpy as np -import pandas as pd from scipy import stats from scipy.optimize import curve_fit from scipy.spatial.distance import pdist -from dios import DictOfSeries - +from saqc.constants import * from saqc.core.register import register from saqc.flagger import Flagger from saqc.funcs.resampling import shift @@ -35,6 +37,7 @@ def flagDriftFromNorm( norm_frac: float = 0.5, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), linkage_method: LinkageString = "single", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -76,7 +79,8 @@ def flagDriftFromNorm( The keyword gets passed on to scipy.hierarchy.linkage. See its documentation to learn more about the different keywords (References [1]). See wikipedia for an introduction to hierarchical clustering (References [2]). - kwargs + flag : float, default BAD + flag to set. Returns ------- @@ -122,7 +126,6 @@ def flagDriftFromNorm( Introduction to Hierarchical clustering: [2] https://en.wikipedia.org/wiki/Hierarchical_clustering """ - data_to_flag = data[fields].to_df() data_to_flag.dropna(inplace=True) @@ -135,7 +138,7 @@ def flagDriftFromNorm( drifters = detectDeviants(segment[1], metric, norm_spread, norm_frac, linkage_method, 'variables') for var in drifters: - flagger[segment[1].index, fields[var]] = kwargs['flag'] + flagger[segment[1].index, fields[var]] = flag return data, flagger @@ -149,6 +152,7 @@ def flagDriftFromReference( segment_freq: FreqString, thresh: float, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -175,7 +179,8 @@ def flagDriftFromReference( A distance function. It should be a function of 2 1-dimensional arrays and return a float scalar value. This value is interpreted as the distance of the two input arrays. The default is the averaged manhatten metric. See the Notes section to get an idea of why this could be a good choice. - kwargs + flag : float, default BAD + flag to set. Returns ------- @@ -211,7 +216,7 @@ def flagDriftFromReference( dist = metric(segment[1].iloc[:, i].values, segment[1].loc[:, field].values) if dist > thresh: - flagger[segment[1].index, fields[i]] = kwargs['flag'] + flagger[segment[1].index, fields[i]] = flag return data, flagger @@ -228,6 +233,7 @@ def flagDriftFromScaledNorm( norm_frac: float = 0.5, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), linkage_method: LinkageString = "single", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -277,7 +283,8 @@ def flagDriftFromScaledNorm( The keyword gets passed on to scipy.hierarchy.linkage. See its documentation to learn more about the different keywords (References [1]). See wikipedia for an introduction to hierarchical clustering (References [2]). - kwargs + flag : float, default BAD + flag to set. Returns ------- @@ -327,7 +334,7 @@ def flagDriftFromScaledNorm( drifters = detectDeviants(segment[1], metric, norm_spread, norm_frac, linkage_method, 'variables') for var in drifters: - flagger[segment[1].index, fields[var]] = kwargs['flag'] + flagger[segment[1].index, fields[var]] = flag return data, flagger @@ -340,6 +347,7 @@ def correctExponentialDrift( maint_data_field: ColumnName, cal_mean: int = 5, flag_maint_period: bool = False, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -390,6 +398,8 @@ def correctExponentialDrift( directly before maintenance event. This values are needed for shift calibration. (see above description) flag_maint_period : bool, default False Whether or not to flag the values obtained while maintenance. + flag : float, default BAD + flag to set. Returns ------- @@ -436,7 +446,7 @@ def correctExponentialDrift( to_flag = drift_frame["drift_group"] to_flag = to_flag.drop(to_flag[: maint_data.index[0]].index) to_flag = to_flag.dropna() - flagger[to_flag, field] = kwargs['flag'] + flagger[to_flag, field] = flag return data, flagger @@ -487,7 +497,6 @@ def correctRegimeAnomaly( x_date : bool, default False If True, use "seconds from epoch" as x input to the model func, instead of "seconds from regime start". - Returns ------- data : dios.DictOfSeries @@ -592,7 +601,6 @@ def correctOffset( start and right before the end of any regime is ignored when calculating a regimes mean for data correcture. This is to account for the unrelyability of data near the changepoints of regimes. - Returns ------- data : dios.DictOfSeries @@ -600,7 +608,6 @@ def correctOffset( Data values may have changed relatively to the data input. flagger : saqc.flagger.Flagger The flagger object, holding flags and additional Informations related to `data`. - """ data, flagger = copy(data, field, flagger, field + '_CPcluster') data, flagger = assignChangePointCluster( @@ -659,6 +666,7 @@ def flagRegimeAnomaly( linkage_method: LinkageString = "single", metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: np.abs(np.nanmean(x) - np.nanmean(y)), norm_frac: float = 0.5, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -695,6 +703,8 @@ def flagRegimeAnomaly( norm_frac : float Has to be in [0,1]. Determines the minimum percentage of samples, the "normal" group has to comprise to be the normal group actually. + flag : float, default BAD + flag to set. Returns ------- @@ -714,6 +724,7 @@ def flagRegimeAnomaly( norm_frac=norm_frac, set_cluster=False, set_flags=True, + flag=flag, **kwargs ) @@ -730,6 +741,7 @@ def assignRegimeAnomaly( norm_frac: float = 0.5, set_cluster: bool = True, set_flags: bool = False, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -775,10 +787,11 @@ def assignRegimeAnomaly( set_flags : bool, default True Wheather or not to flag abnormal values (do not flag them, if you want to correct them afterwards, becasue flagged values usually are not visible in further tests.). + flag : float, default BAD + flag to set. Returns ------- - data : dios.DictOfSeries A dictionary of pandas.Series, holding all the data. flagger : saqc.flagger.Flagger @@ -792,7 +805,7 @@ def assignRegimeAnomaly( if set_flags: for p in plateaus: - flagger[cluster_dios.iloc[:, p].index, field] = kwargs['flags'] + flagger[cluster_dios.iloc[:, p].index, field] = flag if set_cluster: for p in plateaus: diff --git a/saqc/funcs/flagtools.py b/saqc/funcs/flagtools.py index 5c2b341a9..56c6a689c 100644 --- a/saqc/funcs/flagtools.py +++ b/saqc/funcs/flagtools.py @@ -76,6 +76,7 @@ def clearFlags(data: DictOfSeries, field: ColumnName, flagger: Flagger, **kwargs flagUnflagged : set flag value at all unflagged positions """ if 'flag' in kwargs: + kwargs = {**kwargs} # copy flag = kwargs.pop('flag') warnings.warn(f'`flag={flag}` is ignored here.') @@ -98,7 +99,7 @@ def flagUnflagged( flagger : saqc.flagger.Flagger A flagger object, holding flags and additional informations related to `data`. flag : float, default BAD - flag value to set, has NO default + flag value to set kwargs : Dict unused @@ -149,7 +150,8 @@ def flagManual( data: DictOfSeries, field: ColumnName, flagger: Flagger, mdata: Union[pd.Series, pd.DataFrame, DictOfSeries], mflag: Any = 1, - method=Literal["plain", "ontime", "left-open", "right-open"], + method: Literal["plain", "ontime", "left-open", "right-open"] = 'plain', + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -172,6 +174,7 @@ def flagManual( The "manually generated" data mflag : scalar The flag that indicates data points in `mdata`, of wich the projection in data should be flagged. + method : {'plain', 'ontime', 'left-open', 'right-open'}, default plain Defines how mdata is projected on data. Except for the 'plain' method, the methods assume mdata to have an index. @@ -183,6 +186,9 @@ def flagManual( the value at t_1 gets projected onto all data timestamps t with t_1 <= t < t_2. * 'left-open': like 'right-open', but the projected interval now covers all t with t_1 < t <= t_2. + flag : float, default BAD + flag to set. + Returns ------- data : original data @@ -277,7 +283,7 @@ def flagManual( mask = mdata == mflag mask = mask.reindex(dat.index).fillna(False) - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger diff --git a/saqc/funcs/generic.py b/saqc/funcs/generic.py index 1058da740..b8677f199 100644 --- a/saqc/funcs/generic.py +++ b/saqc/funcs/generic.py @@ -81,8 +81,14 @@ def _execGeneric(flagger: Flagger, data: DictOfSeries, func: Callable[[pd.Series @register(masking='all', module="generic") -def process(data: DictOfSeries, field: str, flagger: Flagger, func: Callable[[pd.Series], pd.Series], - nodata: float = np.nan, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def process( + data: DictOfSeries, + field: str, + flagger: Flagger, + func: Callable[[pd.Series], pd.Series], + nodata: float = np.nan, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ generate/process data with generically defined functions. @@ -131,7 +137,6 @@ def process(data: DictOfSeries, field: str, flagger: Flagger, func: Callable[[pd You also can pass numpy and pandas functions: >>> lambda temperature, uncertainty: np.round(temperature) * np.sqrt(uncertainty) - """ data[field] = _execGeneric(flagger, data, func, field, nodata).squeeze() @@ -145,8 +150,15 @@ def process(data: DictOfSeries, field: str, flagger: Flagger, func: Callable[[pd @register(masking='all', module="generic") -def flag(data: DictOfSeries, field: str, flagger: Flagger, func: Callable[[pd.Series], pd.Series], - nodata: float = np.nan, flag=BAD, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flag( + data: DictOfSeries, + field: str, + flagger: Flagger, + func: Callable[[pd.Series], pd.Series], + nodata: float = np.nan, + flag: float = BAD, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: # TODO : fix docstring, check if all still works """ a function to flag a data column by evaluation of a generic expression. @@ -181,6 +193,8 @@ def flag(data: DictOfSeries, field: str, flagger: Flagger, func: Callable[[pd.Se See the examples section to learn more. nodata : any, default np.nan The value that indicates missing/invalid data + flag : float, default BAD + flag to set. Returns ------- diff --git a/saqc/funcs/outliers.py b/saqc/funcs/outliers.py index 189995cc5..d8b2fcd26 100644 --- a/saqc/funcs/outliers.py +++ b/saqc/funcs/outliers.py @@ -36,6 +36,7 @@ def flagByStray( partition_min: int = 11, iter_start: float = 0.5, alpha: float = 0.05, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -75,6 +76,9 @@ def flagByStray( Level of significance by which it is tested, if a score might be drawn from another distribution, than the majority of the data. + flag : float, default BAD + flag to set. + References ---------- [1] Talagala, P. D., Hyndman, R. J., & Smith-Miles, K. (2019). Anomaly detection in high dimensional data. @@ -121,7 +125,7 @@ def flagByStray( for iter_index in range(i_start - 1, sample_size): if gaps[iter_index] > log_alpha * ghat[iter_index]: index = partition.index[sorted_i[iter_index:]] - flagger[index, field] = kwargs['flag'] + flagger[index, field] = flag break return data, flagger @@ -137,6 +141,7 @@ def _evalStrayLabels( reduction_thresh: float = 3.5, reduction_min_periods: int = 1, at_least_one: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -173,6 +178,8 @@ def _evalStrayLabels( at_least_one : bool, default True If none of the variables, the outlier label shall be reduced to, is an outlier with regard to the test, all (True) or none (False) of the variables are flagged + flag : float, default BAD + flag to set. References ---------- @@ -185,7 +192,7 @@ def _evalStrayLabels( if reduction_range is None: for field in to_flag_frame.columns: - flagger[to_flag_frame.index, field] = kwargs['flag'] + flagger[to_flag_frame.index, field] = flag return data, flagger for var in fields: @@ -233,7 +240,7 @@ def _evalStrayLabels( for field in to_flag_frame.columns: col = to_flag_frame[field] - flagger[col[col].index, field] = kwargs['flag'] + flagger[col[col].index, field] = flag return data, flagger @@ -367,6 +374,7 @@ def flagMVScores( reduction_drop_flagged: bool = False, # TODO: still a case ? reduction_thresh: float = 3.5, reduction_min_periods: int = 1, + flag: float = BAD, **kwargs, ) -> Tuple[DictOfSeries, Flagger]: """ @@ -430,6 +438,8 @@ def flagMVScores( Only effective when `reduction_range` is not ``None``. Minimum number of meassurements necessarily present in a reduction interval for reduction actually to be performed. + flag : float, default BAD + flag to set. Returns ------- @@ -488,7 +498,9 @@ def flagMVScores( partition_freq=stray_partition, partition_min=stray_partition_min, iter_start=iter_start, - alpha=alpha, **kwargs) + alpha=alpha, + flag=flag, + **kwargs) data, flagger = _evalStrayLabels( data, 'kNN_scores', flagger, @@ -496,7 +508,9 @@ def flagMVScores( reduction_range=reduction_range, reduction_drop_flagged=reduction_drop_flagged, reduction_thresh=reduction_thresh, - reduction_min_periods=reduction_min_periods, **kwargs) + reduction_min_periods=reduction_min_periods, + flag=flag, + **kwargs) return data, flagger @@ -514,6 +528,7 @@ def flagRaise( min_slope: Optional[float] = None, min_slope_weight: float = 0.8, numba_boost: bool = True, # TODO: rm, not a user decision + flag: float = BAD, **kwargs, ) -> Tuple[DictOfSeries, Flagger]: """ @@ -553,6 +568,9 @@ def flagRaise( min_slope_weight : float, default 0.8 See third condition listed in the notes below. numba_boost : bool, default True + deprecated ? + flag : float, default BAD + flag to set. Returns ------- @@ -662,14 +680,20 @@ def flagRaise( # check means against critical raise value: to_flag = dataseries >= weighted_rolling_mean + (raise_series / mean_raise_factor) to_flag &= raise_series.notna() - flagger[to_flag[to_flag].index, field] = kwargs['flag'] + flagger[to_flag[to_flag].index, field] = flag return data, flagger @register(masking='field', module="outliers") def flagMAD( - data: DictOfSeries, field: ColumnName, flagger: Flagger, window: FreqString, z: float = 3.5, **kwargs + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + window: FreqString, + z: float = 3.5, + flag: float = BAD, + **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ The function represents an implementation of the modyfied Z-score outlier detection method. @@ -690,6 +714,8 @@ def flagMAD( Offset string. Denoting the windows size that the "Z-scored" values have to lie in. z: float, default 3.5 The value the Z-score is tested against. Defaulting to 3.5 (Recommendation of [1]) + flag : float, default BAD + flag to set. Returns ------- @@ -721,7 +747,7 @@ def flagMAD( index = mask.index mask.loc[index < index[0] + pd.to_timedelta(window)] = False - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -735,6 +761,7 @@ def flagOffset( window: Union[IntegerWindow, FreqString], rel_thresh: Optional[float] = None, numba_kickin: int = 200000, # TODO: rm, not a user decision + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -776,7 +803,8 @@ def flagOffset( When there are detected more than `numba_kickin` incidents of potential spikes, the pandas.rolling - part of computation gets "jitted" with numba. Default value hast proven to be around the break even point between "jit-boost" and "jit-costs". - + flag : float, default BAD + flag to set. Returns ------- @@ -877,7 +905,7 @@ def flagOffset( cresult = calcResult(result) cresult = cresult[cresult].index - flagger[cresult, field] = kwargs['flag'] + flagger[cresult, field] = flag return data, flagger @@ -890,6 +918,7 @@ def flagByGrubbs( alpha: float = 0.05, min_periods: int = 8, check_lagged: bool = False, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -927,6 +956,8 @@ def flagByGrubbs( If True, every value gets checked twice for being an outlier. Ones in the initial rolling window and one more time in a rolling window that is lagged by half the windows delimeter (winsz/2). Recommended for avoiding false positives at the window edges. Only available when rolling with integer defined window size. + flag : float, default BAD + flag to set. Returns ------- @@ -983,7 +1014,7 @@ def flagByGrubbs( to_flag &= to_flag_lagged - flagger[to_flag, field] = kwargs['flag'] + flagger[to_flag, field] = flag return data, flagger @@ -994,6 +1025,7 @@ def flagRange( flagger: Flagger, min: float = -np.inf, max: float = np.inf, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -1011,6 +1043,8 @@ def flagRange( Lower bound for valid data. max : float Upper bound for valid data. + flag : float, default BAD + flag to set. Returns ------- @@ -1024,7 +1058,7 @@ def flagRange( # using .values is much faster datacol = data[field].values mask = (datacol < min) | (datacol > max) - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -1036,6 +1070,7 @@ def flagCrossStatistic( fields: Sequence[ColumnName], thresh: float, cross_stat: Literal["modZscore", "Zscore"] = "modZscore", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -1071,6 +1106,9 @@ def flagCrossStatistic( * ``'Zscore'``: Score values by how many times the standard deviation they differ from the median. See References [1] + flag : float, default BAD + flag to set. + Returns ------- data : dios.DictOfSeries @@ -1109,6 +1147,6 @@ def flagCrossStatistic( mask = diff_scores > thresh for var in fields: - flagger[mask[var], var] = kwargs['flag'] + flagger[mask[var], var] = flag return data, flagger diff --git a/saqc/funcs/pattern.py b/saqc/funcs/pattern.py index a33cdceae..6562fd0d3 100644 --- a/saqc/funcs/pattern.py +++ b/saqc/funcs/pattern.py @@ -8,6 +8,7 @@ import pywt from mlxtend.evaluate import permutation_test from dios.dios import DictOfSeries +from saqc.constants import * from saqc.core.register import register from saqc.flagger import Flagger from saqc.lib.tools import customRoller @@ -19,8 +20,9 @@ def flagPatternByDTW( field: str, flagger: Flagger, ref_field: str, - widths: Sequence[int]=(1, 2, 4, 8), - waveform: str="mexh", + widths: Sequence[int] = (1, 2, 4, 8), + waveform: str = "mexh", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -46,6 +48,8 @@ def flagPatternByDTW( Widths for wavelet decomposition. [1] recommends a dyadic scale. Default: (1,2,4,8) waveform: str. Wavelet to be used for decomposition. Default: 'mexh'. See [2] for a list. + flag : float, default BAD + flag to set. kwargs @@ -94,7 +98,7 @@ def flagPatternByDTW( sz = len(ref) mask = customRoller(dat, window=sz, min_periods=sz).apply(isPattern, raw=True) - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -104,8 +108,9 @@ def flagPatternByWavelet( field: str, flagger: Flagger, ref_field: str, - max_distance: float=0.03, - normalize: bool=True, + max_distance: float = 0.03, + normalize: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ Pattern Recognition via Dynamic Time Warping. @@ -130,9 +135,8 @@ def flagPatternByWavelet( Maximum dtw-distance between partition and pattern, so that partition is recognized as pattern. Default: 0.03 normalize: boolean. Normalizing dtw-distance (see [1]). Default: True - - - kwargs + flag : float, default BAD + flag to set. Returns ------- @@ -166,5 +170,5 @@ def flagPatternByWavelet( sz = len(ref) mask = customRoller(dat, window=sz, min_periods=sz).apply(isPattern, raw=True) - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger diff --git a/saqc/funcs/resampling.py b/saqc/funcs/resampling.py index b5d2a109f..f69b12bb8 100644 --- a/saqc/funcs/resampling.py +++ b/saqc/funcs/resampling.py @@ -43,6 +43,7 @@ def aggregate( value_func, flag_func: Callable[[pd.Series], float] = np.nanmax, method: Literal["fagg", "bagg", "nagg"] = "nagg", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -94,6 +95,10 @@ def aggregate( Specifies which intervals to be aggregated for a certain timestamp. (preceeding, succeeding or "surrounding" interval). See description above for more details. + flag : float, default BAD + flag to set. + + Returns ------- data : dios.DictOfSeries @@ -106,7 +111,13 @@ def aggregate( data, flagger = copy(data, field, flagger, field + '_original') return resample( - data, field, flagger, freq=freq, agg_func=value_func, flag_agg_func=flag_func, method=method, **kwargs + data, field, flagger, + freq=freq, + agg_func=value_func, + flag_agg_func=flag_func, + method=method, + flag=flag, + **kwargs ) @@ -674,7 +685,7 @@ def reindexFlags( target_datcol = data[field] target_flagscol = flagger[field] - dummy = pd.Series(np.nan, target_flagscol.index) + dummy = pd.Series(np.nan, target_flagscol.index, dtype=float) if method[-13:] == "interpolation": ignore = _getChunkBounds(target_datcol, flagscol, freq) diff --git a/saqc/funcs/residues.py b/saqc/funcs/residues.py index 6abcfd2d6..0b0046bea 100644 --- a/saqc/funcs/residues.py +++ b/saqc/funcs/residues.py @@ -8,6 +8,7 @@ import numpy as np from dios import DictOfSeries +from saqc.constants import * from saqc.core.register import register from saqc.flagger import Flagger from saqc.funcs.rolling import roll @@ -24,6 +25,7 @@ def calculatePolynomialResidues( numba: Literal[True, False, "auto"] = "auto", # TODO: rm, not a a user decision eval_flags: bool = True, # TODO, not valid anymore, if still needed, maybe assign user-passed ``flag``? min_periods: Optional[int] = 0, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -88,6 +90,8 @@ def calculatePolynomialResidues( fit to be performed. If there are not enough values, np.nan gets assigned. Default (0) results in fitting regardless of the number of values present (results in overfitting for too sparse intervals). To automatically set the minimum number of periods to the number of values in an offset defined window size, pass np.nan. + flag : float, default BAD + flag to set. Returns ------- @@ -107,6 +111,7 @@ def calculatePolynomialResidues( eval_flags=eval_flags, min_periods=min_periods, return_residues=True, + flag=flag, **kwargs ) @@ -121,6 +126,7 @@ def calculateRollingResidues( eval_flags: bool = True, min_periods: Optional[int] = 0, center: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ TODO: docstring needed""" @@ -132,5 +138,6 @@ def calculateRollingResidues( min_periods=min_periods, center=center, return_residues=True, + flag=flag, **kwargs ) diff --git a/saqc/funcs/rolling.py b/saqc/funcs/rolling.py index 6a40c93c2..6d58dfbc6 100644 --- a/saqc/funcs/rolling.py +++ b/saqc/funcs/rolling.py @@ -5,9 +5,9 @@ from typing import Union, Callable import numpy as np import pandas as pd - from dios import DictOfSeries +from saqc.constants import * from saqc.core.register import register from saqc.flagger import Flagger from saqc.lib.tools import getFreqDelta @@ -24,6 +24,7 @@ def roll( min_periods: int=0, center: bool=True, return_residues=False, # TODO: this should not be public, a wrapper would be better + flag: float = BAD, **kwargs ): """ @@ -59,6 +60,8 @@ def roll( center : bool, default True Wheather or not to center the window the mean is calculated of around the reference value. If False, the reference value is placed to the right of the window (classic rolling mean with lag.) + flag : float, default BAD + flag to set. Returns ------- @@ -69,7 +72,6 @@ def roll( The flagger object, holding flags and additional Informations related to `data`. Flags values may have changed relatively to the flagger input. """ - data = data.copy() to_fit = data[field] if to_fit.empty: @@ -122,9 +124,6 @@ def roll( data[field] = means if eval_flags: - # with the new flagger we dont have to care - # about to set NaNs to the original flags anymore - # TODO: we does not get any flags here, because of masking=field worst = flagger[field].rolling(winsz, center=True, min_periods=min_periods).max() flagger[field] = worst -- GitLab