diff --git a/saqc/core/modules/breaks.py b/saqc/core/modules/breaks.py index 02b237a6046550a99d729c2b8a9439a11190f819..b07edba09759687f21746222bea0cf10c2a70562 100644 --- a/saqc/core/modules/breaks.py +++ b/saqc/core/modules/breaks.py @@ -5,6 +5,7 @@ from typing import Tuple import numpy as np from dios import DictOfSeries +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase from saqc.lib.types import FreqString, IntegerWindow, ColumnName @@ -16,6 +17,7 @@ class Breaks(ModuleBase): self, field: ColumnName, nodata: float = np.nan, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagMissing", locals()) @@ -25,6 +27,7 @@ class Breaks(ModuleBase): field: ColumnName, gap_window: FreqString, group_window: FreqString, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagIsolated", locals()) @@ -35,6 +38,7 @@ class Breaks(ModuleBase): thresh: float, winsz: FreqString, min_periods: IntegerWindow = 1, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagJumps", locals()) diff --git a/saqc/core/modules/changepoints.py b/saqc/core/modules/changepoints.py index bab02fc8664f4f1cb0f3c31409a82f76909920da..19ed26d296d28baeb6a7182e1069ed39a2082ad8 100644 --- a/saqc/core/modules/changepoints.py +++ b/saqc/core/modules/changepoints.py @@ -7,6 +7,7 @@ import numpy as np from dios import DictOfSeries from typing_extensions import Literal +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase from saqc.lib.types import FreqString, IntegerWindow @@ -26,6 +27,7 @@ class ChangePoints(ModuleBase): try_to_jit: bool = True, # TODO rm, not a user decision reduce_window: FreqString = None, reduce_func: Callable[[np.ndarray, np.ndarray], int] = lambda x, _: x.argmax(), + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagChangePoints", locals()) @@ -45,6 +47,7 @@ class ChangePoints(ModuleBase): model_by_resids: bool = False, flag_changepoints: bool = False, assign_cluster: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("assignChangePointCluster", locals()) diff --git a/saqc/core/modules/constants.py b/saqc/core/modules/constants.py index 09f55eb0064a5671aa1fdfe1addd1241864b2f06..22239aa09e09ba4299ae7fe410c70f56731ff894 100644 --- a/saqc/core/modules/constants.py +++ b/saqc/core/modules/constants.py @@ -4,6 +4,7 @@ from typing import Tuple from dios import DictOfSeries +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase from saqc.lib.types import FreqString, ColumnName @@ -14,10 +15,11 @@ class Constants(ModuleBase): def flagByVariance( self, field: ColumnName, - window: FreqString="12h", - thresh: float=0.0005, - max_missing: int=None, - max_consec_missing: int=None, + window: FreqString = "12h", + thresh: float = 0.0005, + max_missing: int = None, + max_consec_missing: int = None, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagByVariance", locals()) @@ -27,6 +29,7 @@ class Constants(ModuleBase): field: ColumnName, thresh: float, window: FreqString, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagConstants", locals()) diff --git a/saqc/core/modules/curvefit.py b/saqc/core/modules/curvefit.py index 595126406d2071b786f7e07d180e76886938b7f6..c24ce08b046fbb490f26550eeca2aeccd728db69 100644 --- a/saqc/core/modules/curvefit.py +++ b/saqc/core/modules/curvefit.py @@ -5,17 +5,22 @@ from typing import Union, Tuple from dios import DictOfSeries from typing_extensions import Literal +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase class Curvefit(ModuleBase): - def fitPolynomial(self, field: str, - winsz: Union[int, str], - polydeg: int, - numba: Literal[True, False, "auto"] = "auto", - eval_flags: bool = True, - min_periods: int = 0, - return_residues: bool = False, - **kwargs) -> Tuple[DictOfSeries, Flagger]: + def fitPolynomial( + self, + field: str, + winsz: Union[int, str], + polydeg: int, + numba: Literal[True, False, "auto"] = "auto", + eval_flags: bool = True, + min_periods: int = 0, + return_residues: bool = False, + flag: float = BAD, + **kwargs + ) -> Tuple[DictOfSeries, Flagger]: return self.defer("fitPolynomial", locals()) diff --git a/saqc/core/modules/drift.py b/saqc/core/modules/drift.py index 3c422fc93378197bee658052829d36428a874dcf..e063e62f327759857fae71f52d4e604e96582479 100644 --- a/saqc/core/modules/drift.py +++ b/saqc/core/modules/drift.py @@ -6,6 +6,7 @@ from typing import Sequence, Callable, Optional, Tuple import numpy as np from scipy.spatial.distance import pdist +from saqc.constants import * from saqc.core.modules.base import ModuleBase from saqc.funcs import LinkageString, DictOfSeries, Flagger from saqc.lib.types import ColumnName, FreqString, CurveFitter @@ -21,6 +22,7 @@ class Drift(ModuleBase): norm_frac: float = 0.5, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), linkage_method: LinkageString = "single", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagDriftFromNorm", locals()) @@ -32,6 +34,7 @@ class Drift(ModuleBase): segment_freq: FreqString, thresh: float, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagDriftFromReference", locals()) @@ -46,6 +49,7 @@ class Drift(ModuleBase): norm_frac: float = 0.5, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), linkage_method: LinkageString = "single", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagDriftFromScaledNorm", locals()) @@ -56,6 +60,7 @@ class Drift(ModuleBase): maint_data_field: ColumnName, cal_mean: int = 5, flag_maint_period: bool = False, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("correctExponentialDrift", locals()) diff --git a/saqc/core/modules/flagtools.py b/saqc/core/modules/flagtools.py index 426dfb27642e60d587a65372aeeb40bb6928ef8f..7cc2b16337eff358929e103b4968f93ee94b7dda 100644 --- a/saqc/core/modules/flagtools.py +++ b/saqc/core/modules/flagtools.py @@ -41,7 +41,8 @@ class FlagTools(ModuleBase): self, field: ColumnName, mdata: Union[pd.Series, pd.DataFrame, DictOfSeries], mflag: Any = 1, - method=Literal["plain", "ontime", "left-open", "right-open"], + method: Literal["plain", "ontime", "left-open", "right-open"] = 'plain', + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagManual", locals()) diff --git a/saqc/core/modules/generic.py b/saqc/core/modules/generic.py index 3f44c45f767954387060dea41613cc00549940c9..da80700c325b233793a217bd73dd6ad0f7755f39 100644 --- a/saqc/core/modules/generic.py +++ b/saqc/core/modules/generic.py @@ -13,10 +13,21 @@ from saqc.core.modules.base import ModuleBase class Generic(ModuleBase): - def process(self, field: str, func: Callable[[pd.Series], pd.Series], - nodata: float = np.nan, **kwargs) -> Tuple[DictOfSeries, Flagger]: + def process( + self, + field: str, + func: Callable[[pd.Series], pd.Series], + nodata: float = np.nan, + **kwargs + ) -> Tuple[DictOfSeries, Flagger]: return self.defer("process", locals()) - def flag(self, field: str, func: Callable[[pd.Series], pd.Series], - nodata: float = np.nan, flag=BAD, **kwargs) -> Tuple[DictOfSeries, Flagger]: + def flag( + self, + field: str, + func: Callable[[pd.Series], pd.Series], + nodata: float = np.nan, + flag: float = BAD, + **kwargs + ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flag", locals()) diff --git a/saqc/core/modules/outliers.py b/saqc/core/modules/outliers.py index 08efb10686ad4d6dcd6baabdc3cfa0702c33b979..bf81520390f82b5dcb2f2a17e7a5bf918944da1d 100644 --- a/saqc/core/modules/outliers.py +++ b/saqc/core/modules/outliers.py @@ -8,6 +8,7 @@ import pandas as pd from dios import DictOfSeries from typing_extensions import Literal +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase from saqc.lib.types import IntegerWindow, FreqString, ColumnName @@ -22,6 +23,7 @@ class Outliers(ModuleBase): partition_min: int = 11, iter_start: float = 0.5, alpha: float = 0.05, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagByStray", locals()) @@ -42,6 +44,7 @@ class Outliers(ModuleBase): reduction_drop_flagged: bool = False, # TODO: still a case ? reduction_thresh: float = 3.5, reduction_min_periods: int = 1, + flag: float = BAD, **kwargs, ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagMVScores", locals()) @@ -57,12 +60,18 @@ class Outliers(ModuleBase): min_slope: Optional[float] = None, min_slope_weight: float = 0.8, numba_boost: bool = True, # TODO: rm, not a user decision + flag: float = BAD, **kwargs, ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagRaise", locals()) def flagMAD( - self, field: ColumnName, window: FreqString, z: float = 3.5, **kwargs + self, + field: ColumnName, + window: FreqString, + z: float = 3.5, + flag: float = BAD, + **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagMAD", locals()) @@ -74,6 +83,7 @@ class Outliers(ModuleBase): window: Union[IntegerWindow, FreqString], rel_thresh: Optional[float] = None, numba_kickin: int = 200000, # TODO: rm, not a user decision + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagOffset", locals()) @@ -85,6 +95,7 @@ class Outliers(ModuleBase): alpha: float = 0.05, min_periods: int = 8, check_lagged: bool = False, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagByGrubbs", locals()) @@ -94,6 +105,7 @@ class Outliers(ModuleBase): field: ColumnName, min: float = -np.inf, max: float = np.inf, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagRange", locals()) @@ -104,6 +116,7 @@ class Outliers(ModuleBase): fields: Sequence[ColumnName], thresh: float, cross_stat: Literal["modZscore", "Zscore"] = "modZscore", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagCrossStatistic", locals()) diff --git a/saqc/core/modules/pattern.py b/saqc/core/modules/pattern.py index 06c9ab26cc2b018d039447405277049d9dea1487..38d0839454ee27340ff830794b0e331ea8abfbef 100644 --- a/saqc/core/modules/pattern.py +++ b/saqc/core/modules/pattern.py @@ -5,6 +5,7 @@ from typing import Sequence, Tuple from dios import DictOfSeries +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase @@ -15,8 +16,9 @@ class Pattern(ModuleBase): self, field: str, ref_field: str, - widths: Sequence[int]=(1, 2, 4, 8), - waveform: str="mexh", + widths: Sequence[int] = (1, 2, 4, 8), + waveform: str = "mexh", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagPatternByDTW", locals()) @@ -25,8 +27,9 @@ class Pattern(ModuleBase): self, field: str, ref_field: str, - max_distance: float=0.03, - normalize: bool=True, + max_distance: float = 0.03, + normalize: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("flagPatternByWavelet", locals()) diff --git a/saqc/core/modules/resampling.py b/saqc/core/modules/resampling.py index be4859bb57cc2de8020a739bff8dcf8fd048240c..9822bbd8b146af03088548b6db3e7ca8f1436e8e 100644 --- a/saqc/core/modules/resampling.py +++ b/saqc/core/modules/resampling.py @@ -8,6 +8,7 @@ import pandas as pd from dios import DictOfSeries from typing_extensions import Literal +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase from saqc.funcs.interpolation import _SUPPORTED_METHODS @@ -22,6 +23,7 @@ class Resampling(ModuleBase): value_func, flag_func: Callable[[pd.Series], float] = np.nanmax, method: Literal["fagg", "bagg", "nagg"] = "nagg", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("aggregate", locals()) diff --git a/saqc/core/modules/residues.py b/saqc/core/modules/residues.py index 85d7426f0b167b7892e0a4ea889759b259e24809..877323546bbf4dc89c6d622cb45c4e45f261b415 100644 --- a/saqc/core/modules/residues.py +++ b/saqc/core/modules/residues.py @@ -7,6 +7,7 @@ import numpy as np from dios import DictOfSeries from typing_extensions import Literal +from saqc.constants import * from saqc import Flagger from saqc.core.modules.base import ModuleBase @@ -21,6 +22,7 @@ class Residues(ModuleBase): numba: Literal[True, False, "auto"] = "auto", # TODO: rm, not a a user decision eval_flags: bool = True, # TODO, not valid anymore, if still needed, maybe assign user-passed ``flag``? min_periods: Optional[int] = 0, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("calculatePolynomialResidues", locals()) @@ -33,6 +35,7 @@ class Residues(ModuleBase): eval_flags: bool = True, min_periods: Optional[int] = 0, center: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: return self.defer("calculateRollingResidues", locals()) diff --git a/saqc/core/modules/rolling.py b/saqc/core/modules/rolling.py index f9c6be16370d36b989e379fa261de91c4aa383bf..d29cb40180785cf9fe4eb25b1cd94b5a4cf2045a 100644 --- a/saqc/core/modules/rolling.py +++ b/saqc/core/modules/rolling.py @@ -6,6 +6,7 @@ from typing import Union, Callable import numpy as np import pandas as pd +from saqc.constants import * from saqc.core.modules.base import ModuleBase @@ -19,6 +20,7 @@ class Rolling(ModuleBase): min_periods: int=0, center: bool=True, return_residues=False, # TODO: this should not be public, a wrapper would be better + flag: float = BAD, **kwargs ): return self.defer("roll", locals()) diff --git a/saqc/funcs/breaks.py b/saqc/funcs/breaks.py index 6c394e3e75d12ad3a3a20c114b10164e2e3a03b2..8f10e9b7208b7a3d11d9a7911b1f6336e8a696ca 100644 --- a/saqc/funcs/breaks.py +++ b/saqc/funcs/breaks.py @@ -16,6 +16,7 @@ import pandas.tseries.frequencies from dios import DictOfSeries +from saqc.constants import * from saqc.lib.tools import groupConsecutives from saqc.lib.types import FreqString, ColumnName, IntegerWindow from saqc.funcs.changepoints import assignChangePointCluster @@ -29,6 +30,7 @@ def flagMissing( field: ColumnName, flagger: Flagger, nodata: float = np.nan, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -44,6 +46,8 @@ def flagMissing( A flagger object, holding flags and additional Informations related to `data`. nodata : any, default np.nan A value that defines missing data. + flag : float, default BAD + flag to set. Returns ------- @@ -59,7 +63,7 @@ def flagMissing( else: mask = datacol == nodata - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -70,6 +74,7 @@ def flagIsolated( flagger: Flagger, gap_window: FreqString, group_window: FreqString, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -92,6 +97,8 @@ def flagIsolated( group_window : str The maximum temporal extension allowed for a group that is isolated by gaps of size 'gap_window', to be actually flagged as isolated group. See condition (1). + flag : float, default BAD + flag to set. Returns ------- @@ -130,7 +137,7 @@ def flagIsolated( if right.all(): flags[start:stop] = True - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -142,6 +149,7 @@ def flagJumps( thresh: float, winsz: FreqString, min_periods: IntegerWindow = 1, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -163,6 +171,8 @@ def flagJumps( min_periods : int, default 1 Minimum number of periods that have to be present in a window of size `winsz`, so that the mean value obtained from that window is regarded valid. + flag : float, default BAD + flag to set. """ return assignChangePointCluster( data, field, flagger, @@ -173,6 +183,6 @@ def flagJumps( flag_changepoints=True, model_by_resids=False, assign_cluster=False, + flag=flag, **kwargs ) - diff --git a/saqc/funcs/changepoints.py b/saqc/funcs/changepoints.py index 7025ad71228801a2dc0a4b4af5cf7db4e4eaaf79..4ef620f54515a8362fc5a4a19bd47f9085b64223 100644 --- a/saqc/funcs/changepoints.py +++ b/saqc/funcs/changepoints.py @@ -33,6 +33,7 @@ def flagChangePoints( try_to_jit: bool = True, # TODO rm, not a user decision reduce_window: FreqString = None, reduce_func: Callable[[np.ndarray, np.ndarray], int] = lambda x, _: x.argmax(), + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -79,19 +80,31 @@ def flagChangePoints( First input parameter will hold the result from the stat_func evaluation for every reduction window. Second input parameter holds the result from the thresh_func evaluation. The default reduction function just selects the value that maximizes the stat_func. - + flag : float, default BAD + flag to set. Returns ------- - """ return assignChangePointCluster( - data, field, flagger, stat_func=stat_func, thresh_func=thresh_func, - bwd_window=bwd_window, min_periods_bwd=min_periods_bwd, - fwd_window=fwd_window, min_periods_fwd=min_periods_fwd, closed=closed, - try_to_jit=try_to_jit, reduce_window=reduce_window, - reduce_func=reduce_func, flag_changepoints=True, model_by_resids=False, - assign_cluster=False, **kwargs + data, + field, + flagger, + stat_func=stat_func, + thresh_func=thresh_func, + bwd_window=bwd_window, + min_periods_bwd=min_periods_bwd, + fwd_window=fwd_window, + min_periods_fwd=min_periods_fwd, + closed=closed, + try_to_jit=try_to_jit, + reduce_window=reduce_window, + reduce_func=reduce_func, + flag_changepoints=True, + model_by_resids=False, + assign_cluster=False, + flag=flag, + **kwargs ) @@ -111,6 +124,7 @@ def assignChangePointCluster( model_by_resids: bool = False, flag_changepoints: bool = False, assign_cluster: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -160,15 +174,16 @@ def assignChangePointCluster( reduction window. Second input parameter holds the result from the thresh_func evaluation. The default reduction function just selects the value that maximizes the stat_func. flag_changepoints : bool, default False - If true, the points, where there is a change in data modelling regime detected get flagged BAD. + If true, the points, where there is a change in data modelling regime detected gets flagged. model_by_resids : bool, default False If True, the data is replaced by the stat_funcs results instead of regime labels. assign_cluster : bool, default True Is set to False, if called by function that oly wants to calculate flags. + flag : float, default BAD + flag to set. Returns ------- - """ data = data.copy() data_ser = data[field].dropna() @@ -242,8 +257,7 @@ def assignChangePointCluster( flagger[:, field] = UNFLAGGED if flag_changepoints: - # TODO: does not respect kwargs[flag] - flagger[det_index, field] = BAD + flagger[det_index, field] = flag return data, flagger diff --git a/saqc/funcs/constants.py b/saqc/funcs/constants.py index 02327498f6e7e474b76f7df4d6653d4c1fc2b96c..5d0b30804f36b9081cf51bd0273ef70a933daf74 100644 --- a/saqc/funcs/constants.py +++ b/saqc/funcs/constants.py @@ -9,6 +9,7 @@ import pandas as pd from dios import DictOfSeries +from saqc.constants import * from saqc.core.register import register from saqc.flagger import Flagger from saqc.lib.ts_operators import varQC @@ -23,6 +24,7 @@ def flagConstants( flagger: Flagger, thresh: float, window: FreqString, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -48,6 +50,8 @@ def flagConstants( Upper bound for the maximum total change of an interval to be flagged constant. window : str Lower bound for the size of an interval to be flagged constant. + flag : float, default BAD + flag to set. Returns ------- @@ -73,7 +77,7 @@ def flagConstants( m2 = r.max() - r.min() <= thresh mask = m1 | m2 - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -82,10 +86,11 @@ def flagByVariance( data: DictOfSeries, field: ColumnName, flagger: Flagger, - window: FreqString="12h", - thresh: float=0.0005, - max_missing: int=None, - max_consec_missing: int=None, + window: FreqString = "12h", + thresh: float = 0.0005, + max_missing: int = None, + max_consec_missing: int = None, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -114,6 +119,8 @@ def flagByVariance( Maximum number of consecutive nan values allowed in an interval to retrieve a valid variance from it. (Intervals with a number of nans exceeding "max_consec_missing" have no chance to get flagged a plateau!) + flag : float, default BAD + flag to set. Returns ------- @@ -154,5 +161,5 @@ def flagByVariance( # result: plateaus = (plateaus[plateaus == 1.0]).index - flagger[plateaus, field] = kwargs['flag'] + flagger[plateaus, field] = flag return data, flagger diff --git a/saqc/funcs/curvefit.py b/saqc/funcs/curvefit.py index fc04cbeac17e63b7ed7a25c7d079fb2a9564ca62..f77b75346d66afda2457146b8705874e090a5ebf 100644 --- a/saqc/funcs/curvefit.py +++ b/saqc/funcs/curvefit.py @@ -4,29 +4,37 @@ from math import floor from typing import Tuple, Union from typing_extensions import Literal - import numpy as np import pandas as pd - from dios import DictOfSeries +from saqc.constants import * from saqc.core.register import register - from saqc.lib.tools import getFreqDelta from saqc.flagger import Flagger -from saqc.lib.ts_operators import polyRollerIrregular, polyRollerNumba, polyRoller, polyRollerNoMissingNumba, \ +from saqc.lib.ts_operators import ( + polyRollerIrregular, + polyRollerNumba, + polyRoller, + polyRollerNoMissingNumba, polyRollerNoMissing +) @register(masking='field', module="curvefit") -def fitPolynomial(data: DictOfSeries, field: str, flagger: Flagger, - winsz: Union[int, str], - polydeg: int, - numba: Literal[True, False, "auto"] = "auto", - eval_flags: bool = True, - min_periods: int = 0, - return_residues: bool = False, - **kwargs) -> Tuple[DictOfSeries, Flagger]: +def fitPolynomial( + data: DictOfSeries, + field: str, + flagger: Flagger, + winsz: Union[int, str], + polydeg: int, + numba: Literal[True, False, "auto"] = "auto", + eval_flags: bool = True, + min_periods: int = 0, + return_residues: bool = False, + flag: float = BAD, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ Function fits a polynomial model to the data and returns the fitted data curve. @@ -91,6 +99,8 @@ def fitPolynomial(data: DictOfSeries, field: str, flagger: Flagger, set the minimum number of periods to the number of values in an offset defined window size, pass np.nan. return_residues : bool, default False Internal parameter. Makes the method return the residues instead of the fit. + flag : float, default BAD + flag to set. Returns ------- @@ -149,8 +159,8 @@ def fitPolynomial(data: DictOfSeries, field: str, flagger: Flagger, lambda x, y: x[y], raw=True, args=(center_index,) ) - # we need a missing value marker that is not nan, because nan values dont get passed by pandas rolling - # method + # we need a missing value marker that is not nan, + # because nan values dont get passed by pandas rolling method miss_marker = to_fit.min() miss_marker = np.floor(miss_marker - 1) na_mask = to_fit.isna() @@ -192,8 +202,6 @@ def fitPolynomial(data: DictOfSeries, field: str, flagger: Flagger, data[field] = residues if eval_flags: - # with the new flagger we dont have to care - # about to set NaNs to the original flags anymore # TODO: we does not get any flags here, because of masking=field worst = flagger[field].rolling(winsz, center=True, min_periods=min_periods).max() flagger[field] = worst diff --git a/saqc/funcs/drift.py b/saqc/funcs/drift.py index 14417c3f0c6ca97fc65282d41e045df55fc77716..d8605f67d000f1c4f0411cffa27a34e6fe785c84 100644 --- a/saqc/funcs/drift.py +++ b/saqc/funcs/drift.py @@ -1,17 +1,19 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- + +import numpy as np +import pandas as pd import functools +from dios import DictOfSeries + from typing import Optional, Tuple, Sequence, Callable, Optional from typing_extensions import Literal -import numpy as np -import pandas as pd from scipy import stats from scipy.optimize import curve_fit from scipy.spatial.distance import pdist -from dios import DictOfSeries - +from saqc.constants import * from saqc.core.register import register from saqc.flagger import Flagger from saqc.funcs.resampling import shift @@ -35,6 +37,7 @@ def flagDriftFromNorm( norm_frac: float = 0.5, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), linkage_method: LinkageString = "single", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -76,7 +79,8 @@ def flagDriftFromNorm( The keyword gets passed on to scipy.hierarchy.linkage. See its documentation to learn more about the different keywords (References [1]). See wikipedia for an introduction to hierarchical clustering (References [2]). - kwargs + flag : float, default BAD + flag to set. Returns ------- @@ -122,7 +126,6 @@ def flagDriftFromNorm( Introduction to Hierarchical clustering: [2] https://en.wikipedia.org/wiki/Hierarchical_clustering """ - data_to_flag = data[fields].to_df() data_to_flag.dropna(inplace=True) @@ -135,7 +138,7 @@ def flagDriftFromNorm( drifters = detectDeviants(segment[1], metric, norm_spread, norm_frac, linkage_method, 'variables') for var in drifters: - flagger[segment[1].index, fields[var]] = kwargs['flag'] + flagger[segment[1].index, fields[var]] = flag return data, flagger @@ -149,6 +152,7 @@ def flagDriftFromReference( segment_freq: FreqString, thresh: float, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -175,7 +179,8 @@ def flagDriftFromReference( A distance function. It should be a function of 2 1-dimensional arrays and return a float scalar value. This value is interpreted as the distance of the two input arrays. The default is the averaged manhatten metric. See the Notes section to get an idea of why this could be a good choice. - kwargs + flag : float, default BAD + flag to set. Returns ------- @@ -211,7 +216,7 @@ def flagDriftFromReference( dist = metric(segment[1].iloc[:, i].values, segment[1].loc[:, field].values) if dist > thresh: - flagger[segment[1].index, fields[i]] = kwargs['flag'] + flagger[segment[1].index, fields[i]] = flag return data, flagger @@ -228,6 +233,7 @@ def flagDriftFromScaledNorm( norm_frac: float = 0.5, metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: pdist(np.array([x, y]), metric='cityblock') / len(x), linkage_method: LinkageString = "single", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -277,7 +283,8 @@ def flagDriftFromScaledNorm( The keyword gets passed on to scipy.hierarchy.linkage. See its documentation to learn more about the different keywords (References [1]). See wikipedia for an introduction to hierarchical clustering (References [2]). - kwargs + flag : float, default BAD + flag to set. Returns ------- @@ -327,7 +334,7 @@ def flagDriftFromScaledNorm( drifters = detectDeviants(segment[1], metric, norm_spread, norm_frac, linkage_method, 'variables') for var in drifters: - flagger[segment[1].index, fields[var]] = kwargs['flag'] + flagger[segment[1].index, fields[var]] = flag return data, flagger @@ -340,6 +347,7 @@ def correctExponentialDrift( maint_data_field: ColumnName, cal_mean: int = 5, flag_maint_period: bool = False, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -390,6 +398,8 @@ def correctExponentialDrift( directly before maintenance event. This values are needed for shift calibration. (see above description) flag_maint_period : bool, default False Whether or not to flag the values obtained while maintenance. + flag : float, default BAD + flag to set. Returns ------- @@ -436,7 +446,7 @@ def correctExponentialDrift( to_flag = drift_frame["drift_group"] to_flag = to_flag.drop(to_flag[: maint_data.index[0]].index) to_flag = to_flag.dropna() - flagger[to_flag, field] = kwargs['flag'] + flagger[to_flag, field] = flag return data, flagger @@ -487,7 +497,6 @@ def correctRegimeAnomaly( x_date : bool, default False If True, use "seconds from epoch" as x input to the model func, instead of "seconds from regime start". - Returns ------- data : dios.DictOfSeries @@ -592,7 +601,6 @@ def correctOffset( start and right before the end of any regime is ignored when calculating a regimes mean for data correcture. This is to account for the unrelyability of data near the changepoints of regimes. - Returns ------- data : dios.DictOfSeries @@ -600,7 +608,6 @@ def correctOffset( Data values may have changed relatively to the data input. flagger : saqc.flagger.Flagger The flagger object, holding flags and additional Informations related to `data`. - """ data, flagger = copy(data, field, flagger, field + '_CPcluster') data, flagger = assignChangePointCluster( @@ -659,6 +666,7 @@ def flagRegimeAnomaly( linkage_method: LinkageString = "single", metric: Callable[[np.ndarray, np.ndarray], float] = lambda x, y: np.abs(np.nanmean(x) - np.nanmean(y)), norm_frac: float = 0.5, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -695,6 +703,8 @@ def flagRegimeAnomaly( norm_frac : float Has to be in [0,1]. Determines the minimum percentage of samples, the "normal" group has to comprise to be the normal group actually. + flag : float, default BAD + flag to set. Returns ------- @@ -714,6 +724,7 @@ def flagRegimeAnomaly( norm_frac=norm_frac, set_cluster=False, set_flags=True, + flag=flag, **kwargs ) @@ -730,6 +741,7 @@ def assignRegimeAnomaly( norm_frac: float = 0.5, set_cluster: bool = True, set_flags: bool = False, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -775,10 +787,11 @@ def assignRegimeAnomaly( set_flags : bool, default True Wheather or not to flag abnormal values (do not flag them, if you want to correct them afterwards, becasue flagged values usually are not visible in further tests.). + flag : float, default BAD + flag to set. Returns ------- - data : dios.DictOfSeries A dictionary of pandas.Series, holding all the data. flagger : saqc.flagger.Flagger @@ -792,7 +805,7 @@ def assignRegimeAnomaly( if set_flags: for p in plateaus: - flagger[cluster_dios.iloc[:, p].index, field] = kwargs['flags'] + flagger[cluster_dios.iloc[:, p].index, field] = flag if set_cluster: for p in plateaus: diff --git a/saqc/funcs/flagtools.py b/saqc/funcs/flagtools.py index 5c2b341a97cf2d572dd83e63d23bdee3691c8e1c..56c6a689c0a489f0d36a1833a10ef801b82e5d5c 100644 --- a/saqc/funcs/flagtools.py +++ b/saqc/funcs/flagtools.py @@ -76,6 +76,7 @@ def clearFlags(data: DictOfSeries, field: ColumnName, flagger: Flagger, **kwargs flagUnflagged : set flag value at all unflagged positions """ if 'flag' in kwargs: + kwargs = {**kwargs} # copy flag = kwargs.pop('flag') warnings.warn(f'`flag={flag}` is ignored here.') @@ -98,7 +99,7 @@ def flagUnflagged( flagger : saqc.flagger.Flagger A flagger object, holding flags and additional informations related to `data`. flag : float, default BAD - flag value to set, has NO default + flag value to set kwargs : Dict unused @@ -149,7 +150,8 @@ def flagManual( data: DictOfSeries, field: ColumnName, flagger: Flagger, mdata: Union[pd.Series, pd.DataFrame, DictOfSeries], mflag: Any = 1, - method=Literal["plain", "ontime", "left-open", "right-open"], + method: Literal["plain", "ontime", "left-open", "right-open"] = 'plain', + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -172,6 +174,7 @@ def flagManual( The "manually generated" data mflag : scalar The flag that indicates data points in `mdata`, of wich the projection in data should be flagged. + method : {'plain', 'ontime', 'left-open', 'right-open'}, default plain Defines how mdata is projected on data. Except for the 'plain' method, the methods assume mdata to have an index. @@ -183,6 +186,9 @@ def flagManual( the value at t_1 gets projected onto all data timestamps t with t_1 <= t < t_2. * 'left-open': like 'right-open', but the projected interval now covers all t with t_1 < t <= t_2. + flag : float, default BAD + flag to set. + Returns ------- data : original data @@ -277,7 +283,7 @@ def flagManual( mask = mdata == mflag mask = mask.reindex(dat.index).fillna(False) - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger diff --git a/saqc/funcs/generic.py b/saqc/funcs/generic.py index 1058da7403f7bc2fdad57598f9dd8d9145990c29..b8677f199e3893634137f9580fd41f1354e3f98c 100644 --- a/saqc/funcs/generic.py +++ b/saqc/funcs/generic.py @@ -81,8 +81,14 @@ def _execGeneric(flagger: Flagger, data: DictOfSeries, func: Callable[[pd.Series @register(masking='all', module="generic") -def process(data: DictOfSeries, field: str, flagger: Flagger, func: Callable[[pd.Series], pd.Series], - nodata: float = np.nan, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def process( + data: DictOfSeries, + field: str, + flagger: Flagger, + func: Callable[[pd.Series], pd.Series], + nodata: float = np.nan, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: """ generate/process data with generically defined functions. @@ -131,7 +137,6 @@ def process(data: DictOfSeries, field: str, flagger: Flagger, func: Callable[[pd You also can pass numpy and pandas functions: >>> lambda temperature, uncertainty: np.round(temperature) * np.sqrt(uncertainty) - """ data[field] = _execGeneric(flagger, data, func, field, nodata).squeeze() @@ -145,8 +150,15 @@ def process(data: DictOfSeries, field: str, flagger: Flagger, func: Callable[[pd @register(masking='all', module="generic") -def flag(data: DictOfSeries, field: str, flagger: Flagger, func: Callable[[pd.Series], pd.Series], - nodata: float = np.nan, flag=BAD, **kwargs) -> Tuple[DictOfSeries, Flagger]: +def flag( + data: DictOfSeries, + field: str, + flagger: Flagger, + func: Callable[[pd.Series], pd.Series], + nodata: float = np.nan, + flag: float = BAD, + **kwargs +) -> Tuple[DictOfSeries, Flagger]: # TODO : fix docstring, check if all still works """ a function to flag a data column by evaluation of a generic expression. @@ -181,6 +193,8 @@ def flag(data: DictOfSeries, field: str, flagger: Flagger, func: Callable[[pd.Se See the examples section to learn more. nodata : any, default np.nan The value that indicates missing/invalid data + flag : float, default BAD + flag to set. Returns ------- diff --git a/saqc/funcs/outliers.py b/saqc/funcs/outliers.py index 189995cc58a7c2410bb228b0419e2b6f2c65e9a3..d8b2fcd26c80a33d3f98db12a636de232620bef0 100644 --- a/saqc/funcs/outliers.py +++ b/saqc/funcs/outliers.py @@ -36,6 +36,7 @@ def flagByStray( partition_min: int = 11, iter_start: float = 0.5, alpha: float = 0.05, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -75,6 +76,9 @@ def flagByStray( Level of significance by which it is tested, if a score might be drawn from another distribution, than the majority of the data. + flag : float, default BAD + flag to set. + References ---------- [1] Talagala, P. D., Hyndman, R. J., & Smith-Miles, K. (2019). Anomaly detection in high dimensional data. @@ -121,7 +125,7 @@ def flagByStray( for iter_index in range(i_start - 1, sample_size): if gaps[iter_index] > log_alpha * ghat[iter_index]: index = partition.index[sorted_i[iter_index:]] - flagger[index, field] = kwargs['flag'] + flagger[index, field] = flag break return data, flagger @@ -137,6 +141,7 @@ def _evalStrayLabels( reduction_thresh: float = 3.5, reduction_min_periods: int = 1, at_least_one: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -173,6 +178,8 @@ def _evalStrayLabels( at_least_one : bool, default True If none of the variables, the outlier label shall be reduced to, is an outlier with regard to the test, all (True) or none (False) of the variables are flagged + flag : float, default BAD + flag to set. References ---------- @@ -185,7 +192,7 @@ def _evalStrayLabels( if reduction_range is None: for field in to_flag_frame.columns: - flagger[to_flag_frame.index, field] = kwargs['flag'] + flagger[to_flag_frame.index, field] = flag return data, flagger for var in fields: @@ -233,7 +240,7 @@ def _evalStrayLabels( for field in to_flag_frame.columns: col = to_flag_frame[field] - flagger[col[col].index, field] = kwargs['flag'] + flagger[col[col].index, field] = flag return data, flagger @@ -367,6 +374,7 @@ def flagMVScores( reduction_drop_flagged: bool = False, # TODO: still a case ? reduction_thresh: float = 3.5, reduction_min_periods: int = 1, + flag: float = BAD, **kwargs, ) -> Tuple[DictOfSeries, Flagger]: """ @@ -430,6 +438,8 @@ def flagMVScores( Only effective when `reduction_range` is not ``None``. Minimum number of meassurements necessarily present in a reduction interval for reduction actually to be performed. + flag : float, default BAD + flag to set. Returns ------- @@ -488,7 +498,9 @@ def flagMVScores( partition_freq=stray_partition, partition_min=stray_partition_min, iter_start=iter_start, - alpha=alpha, **kwargs) + alpha=alpha, + flag=flag, + **kwargs) data, flagger = _evalStrayLabels( data, 'kNN_scores', flagger, @@ -496,7 +508,9 @@ def flagMVScores( reduction_range=reduction_range, reduction_drop_flagged=reduction_drop_flagged, reduction_thresh=reduction_thresh, - reduction_min_periods=reduction_min_periods, **kwargs) + reduction_min_periods=reduction_min_periods, + flag=flag, + **kwargs) return data, flagger @@ -514,6 +528,7 @@ def flagRaise( min_slope: Optional[float] = None, min_slope_weight: float = 0.8, numba_boost: bool = True, # TODO: rm, not a user decision + flag: float = BAD, **kwargs, ) -> Tuple[DictOfSeries, Flagger]: """ @@ -553,6 +568,9 @@ def flagRaise( min_slope_weight : float, default 0.8 See third condition listed in the notes below. numba_boost : bool, default True + deprecated ? + flag : float, default BAD + flag to set. Returns ------- @@ -662,14 +680,20 @@ def flagRaise( # check means against critical raise value: to_flag = dataseries >= weighted_rolling_mean + (raise_series / mean_raise_factor) to_flag &= raise_series.notna() - flagger[to_flag[to_flag].index, field] = kwargs['flag'] + flagger[to_flag[to_flag].index, field] = flag return data, flagger @register(masking='field', module="outliers") def flagMAD( - data: DictOfSeries, field: ColumnName, flagger: Flagger, window: FreqString, z: float = 3.5, **kwargs + data: DictOfSeries, + field: ColumnName, + flagger: Flagger, + window: FreqString, + z: float = 3.5, + flag: float = BAD, + **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ The function represents an implementation of the modyfied Z-score outlier detection method. @@ -690,6 +714,8 @@ def flagMAD( Offset string. Denoting the windows size that the "Z-scored" values have to lie in. z: float, default 3.5 The value the Z-score is tested against. Defaulting to 3.5 (Recommendation of [1]) + flag : float, default BAD + flag to set. Returns ------- @@ -721,7 +747,7 @@ def flagMAD( index = mask.index mask.loc[index < index[0] + pd.to_timedelta(window)] = False - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -735,6 +761,7 @@ def flagOffset( window: Union[IntegerWindow, FreqString], rel_thresh: Optional[float] = None, numba_kickin: int = 200000, # TODO: rm, not a user decision + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -776,7 +803,8 @@ def flagOffset( When there are detected more than `numba_kickin` incidents of potential spikes, the pandas.rolling - part of computation gets "jitted" with numba. Default value hast proven to be around the break even point between "jit-boost" and "jit-costs". - + flag : float, default BAD + flag to set. Returns ------- @@ -877,7 +905,7 @@ def flagOffset( cresult = calcResult(result) cresult = cresult[cresult].index - flagger[cresult, field] = kwargs['flag'] + flagger[cresult, field] = flag return data, flagger @@ -890,6 +918,7 @@ def flagByGrubbs( alpha: float = 0.05, min_periods: int = 8, check_lagged: bool = False, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -927,6 +956,8 @@ def flagByGrubbs( If True, every value gets checked twice for being an outlier. Ones in the initial rolling window and one more time in a rolling window that is lagged by half the windows delimeter (winsz/2). Recommended for avoiding false positives at the window edges. Only available when rolling with integer defined window size. + flag : float, default BAD + flag to set. Returns ------- @@ -983,7 +1014,7 @@ def flagByGrubbs( to_flag &= to_flag_lagged - flagger[to_flag, field] = kwargs['flag'] + flagger[to_flag, field] = flag return data, flagger @@ -994,6 +1025,7 @@ def flagRange( flagger: Flagger, min: float = -np.inf, max: float = np.inf, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -1011,6 +1043,8 @@ def flagRange( Lower bound for valid data. max : float Upper bound for valid data. + flag : float, default BAD + flag to set. Returns ------- @@ -1024,7 +1058,7 @@ def flagRange( # using .values is much faster datacol = data[field].values mask = (datacol < min) | (datacol > max) - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -1036,6 +1070,7 @@ def flagCrossStatistic( fields: Sequence[ColumnName], thresh: float, cross_stat: Literal["modZscore", "Zscore"] = "modZscore", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -1071,6 +1106,9 @@ def flagCrossStatistic( * ``'Zscore'``: Score values by how many times the standard deviation they differ from the median. See References [1] + flag : float, default BAD + flag to set. + Returns ------- data : dios.DictOfSeries @@ -1109,6 +1147,6 @@ def flagCrossStatistic( mask = diff_scores > thresh for var in fields: - flagger[mask[var], var] = kwargs['flag'] + flagger[mask[var], var] = flag return data, flagger diff --git a/saqc/funcs/pattern.py b/saqc/funcs/pattern.py index a33cdceaef496a8d4bcb1087da297b6167403419..6562fd0d3415867726178c84015c426c3f2cfb1b 100644 --- a/saqc/funcs/pattern.py +++ b/saqc/funcs/pattern.py @@ -8,6 +8,7 @@ import pywt from mlxtend.evaluate import permutation_test from dios.dios import DictOfSeries +from saqc.constants import * from saqc.core.register import register from saqc.flagger import Flagger from saqc.lib.tools import customRoller @@ -19,8 +20,9 @@ def flagPatternByDTW( field: str, flagger: Flagger, ref_field: str, - widths: Sequence[int]=(1, 2, 4, 8), - waveform: str="mexh", + widths: Sequence[int] = (1, 2, 4, 8), + waveform: str = "mexh", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -46,6 +48,8 @@ def flagPatternByDTW( Widths for wavelet decomposition. [1] recommends a dyadic scale. Default: (1,2,4,8) waveform: str. Wavelet to be used for decomposition. Default: 'mexh'. See [2] for a list. + flag : float, default BAD + flag to set. kwargs @@ -94,7 +98,7 @@ def flagPatternByDTW( sz = len(ref) mask = customRoller(dat, window=sz, min_periods=sz).apply(isPattern, raw=True) - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger @@ -104,8 +108,9 @@ def flagPatternByWavelet( field: str, flagger: Flagger, ref_field: str, - max_distance: float=0.03, - normalize: bool=True, + max_distance: float = 0.03, + normalize: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ Pattern Recognition via Dynamic Time Warping. @@ -130,9 +135,8 @@ def flagPatternByWavelet( Maximum dtw-distance between partition and pattern, so that partition is recognized as pattern. Default: 0.03 normalize: boolean. Normalizing dtw-distance (see [1]). Default: True - - - kwargs + flag : float, default BAD + flag to set. Returns ------- @@ -166,5 +170,5 @@ def flagPatternByWavelet( sz = len(ref) mask = customRoller(dat, window=sz, min_periods=sz).apply(isPattern, raw=True) - flagger[mask, field] = kwargs['flag'] + flagger[mask, field] = flag return data, flagger diff --git a/saqc/funcs/resampling.py b/saqc/funcs/resampling.py index b5d2a109f8fae80565387a2cac71179efba18606..f69b12bb8681b71160cd4b633421d977da227c3b 100644 --- a/saqc/funcs/resampling.py +++ b/saqc/funcs/resampling.py @@ -43,6 +43,7 @@ def aggregate( value_func, flag_func: Callable[[pd.Series], float] = np.nanmax, method: Literal["fagg", "bagg", "nagg"] = "nagg", + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -94,6 +95,10 @@ def aggregate( Specifies which intervals to be aggregated for a certain timestamp. (preceeding, succeeding or "surrounding" interval). See description above for more details. + flag : float, default BAD + flag to set. + + Returns ------- data : dios.DictOfSeries @@ -106,7 +111,13 @@ def aggregate( data, flagger = copy(data, field, flagger, field + '_original') return resample( - data, field, flagger, freq=freq, agg_func=value_func, flag_agg_func=flag_func, method=method, **kwargs + data, field, flagger, + freq=freq, + agg_func=value_func, + flag_agg_func=flag_func, + method=method, + flag=flag, + **kwargs ) @@ -674,7 +685,7 @@ def reindexFlags( target_datcol = data[field] target_flagscol = flagger[field] - dummy = pd.Series(np.nan, target_flagscol.index) + dummy = pd.Series(np.nan, target_flagscol.index, dtype=float) if method[-13:] == "interpolation": ignore = _getChunkBounds(target_datcol, flagscol, freq) diff --git a/saqc/funcs/residues.py b/saqc/funcs/residues.py index 6abcfd2d64c6e5880511f6c48e3d9eec0d3b8167..0b0046bea5e1baf6437272441b3c9d34b0fb3e68 100644 --- a/saqc/funcs/residues.py +++ b/saqc/funcs/residues.py @@ -8,6 +8,7 @@ import numpy as np from dios import DictOfSeries +from saqc.constants import * from saqc.core.register import register from saqc.flagger import Flagger from saqc.funcs.rolling import roll @@ -24,6 +25,7 @@ def calculatePolynomialResidues( numba: Literal[True, False, "auto"] = "auto", # TODO: rm, not a a user decision eval_flags: bool = True, # TODO, not valid anymore, if still needed, maybe assign user-passed ``flag``? min_periods: Optional[int] = 0, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -88,6 +90,8 @@ def calculatePolynomialResidues( fit to be performed. If there are not enough values, np.nan gets assigned. Default (0) results in fitting regardless of the number of values present (results in overfitting for too sparse intervals). To automatically set the minimum number of periods to the number of values in an offset defined window size, pass np.nan. + flag : float, default BAD + flag to set. Returns ------- @@ -107,6 +111,7 @@ def calculatePolynomialResidues( eval_flags=eval_flags, min_periods=min_periods, return_residues=True, + flag=flag, **kwargs ) @@ -121,6 +126,7 @@ def calculateRollingResidues( eval_flags: bool = True, min_periods: Optional[int] = 0, center: bool = True, + flag: float = BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ TODO: docstring needed""" @@ -132,5 +138,6 @@ def calculateRollingResidues( min_periods=min_periods, center=center, return_residues=True, + flag=flag, **kwargs ) diff --git a/saqc/funcs/rolling.py b/saqc/funcs/rolling.py index 6a40c93c2cd0cf02b249800d8e657fc9b8e811d6..6d58dfbc69a52aa607a5ee774a7093fbbab69235 100644 --- a/saqc/funcs/rolling.py +++ b/saqc/funcs/rolling.py @@ -5,9 +5,9 @@ from typing import Union, Callable import numpy as np import pandas as pd - from dios import DictOfSeries +from saqc.constants import * from saqc.core.register import register from saqc.flagger import Flagger from saqc.lib.tools import getFreqDelta @@ -24,6 +24,7 @@ def roll( min_periods: int=0, center: bool=True, return_residues=False, # TODO: this should not be public, a wrapper would be better + flag: float = BAD, **kwargs ): """ @@ -59,6 +60,8 @@ def roll( center : bool, default True Wheather or not to center the window the mean is calculated of around the reference value. If False, the reference value is placed to the right of the window (classic rolling mean with lag.) + flag : float, default BAD + flag to set. Returns ------- @@ -69,7 +72,6 @@ def roll( The flagger object, holding flags and additional Informations related to `data`. Flags values may have changed relatively to the flagger input. """ - data = data.copy() to_fit = data[field] if to_fit.empty: @@ -122,9 +124,6 @@ def roll( data[field] = means if eval_flags: - # with the new flagger we dont have to care - # about to set NaNs to the original flags anymore - # TODO: we does not get any flags here, because of masking=field worst = flagger[field].rolling(winsz, center=True, min_periods=min_periods).max() flagger[field] = worst