diff --git a/saqc/funcs/interpolation.py b/saqc/funcs/interpolation.py index 8efb6537ecd5623aa71e1df9a75d5d210fea28b8..fecb5718d9fddf89b359fee98a1eee517448184d 100644 --- a/saqc/funcs/interpolation.py +++ b/saqc/funcs/interpolation.py @@ -173,7 +173,7 @@ def _overlap_rs(x, freq='1min', fill_value=-np.inf): # we are appending last regular grid entry (if necessary), to conserve integrity of groups of regularized # timestamps originating all from the same logger. try: - x = x.append(pd.Series([-np.inf], index=[end]), verify_integrity=True) + x = x.append(pd.Series([fill_value], index=[end]), verify_integrity=True) except ValueError: pass return x diff --git a/saqc/funcs/resampling.py b/saqc/funcs/resampling.py index 3a4d8ce78cfd4ecdb7de539c70a0ca7686be9189..b7fb7213517cb1b72ea15100ed60c05b26147f44 100644 --- a/saqc/funcs/resampling.py +++ b/saqc/funcs/resampling.py @@ -19,6 +19,7 @@ from saqc.funcs.interpolation import interpolateIndex from saqc.lib.tools import getDropMask, evalFreqStr from saqc.lib.ts_operators import shift2Freq, aggregate2Freq from saqc.flagger.flags import applyFunctionOnHistory +from saqc.lib.rolling import customRoller logger = logging.getLogger("SaQC") @@ -564,7 +565,9 @@ def resample( # create a dummys if all_na_2_empty and datcol.dropna().empty: - + # Todo: This needs discussion. It makes possible, that different harmonized variables, + # resulting from the harmonization of the same logger, have differing timestamps! + # (Same holds for starting/ending nan-chunk truncation) datcol = pd.Series([], index=pd.DatetimeIndex([]), name=field) flagscol = pd.Series([], index=pd.DatetimeIndex([]), name=field) @@ -614,7 +617,24 @@ def resample( return data, flagger -@register(masking='field', module="resampling") +def _getChunkBounds(target_datcol, flagscol, freq): + chunk_end = target_datcol.reindex(flagscol.index, method='bfill', tolerance=freq) + chunk_start = target_datcol.reindex(flagscol.index, method='ffill', tolerance=freq) + ignore_flags = (chunk_end.isna() | chunk_start.isna()) + return ignore_flags + + +def _inverseInterpolation(target_flagscol, flagscol=None, freq=None): + backprojected = flagscol.reindex(target_flagscol.index, method="bfill", tolerance=freq) + fwrdprojected = flagscol.reindex(target_flagscol.index, method="ffill", tolerance=freq) + b_replacement_mask = (backprojected > target_flagscol) & (backprojected >= fwrdprojected) + f_replacement_mask = (fwrdprojected > target_flagscol) & (fwrdprojected > backprojected) + target_flagscol.loc[b_replacement_mask] = backprojected.loc[b_replacement_mask] + target_flagscol.loc[f_replacement_mask] = fwrdprojected.loc[f_replacement_mask] + return target_flagscol + + +@register(masking='none', module="resampling") def reindexFlags( data: DictOfSeries, field: str, @@ -622,8 +642,7 @@ def reindexFlags( method: Literal["inverse_fagg", "inverse_bagg", "inverse_nagg", "inverse_fshift", "inverse_bshift", "inverse_nshift"], source: str, freq: Optional[str]=None, - to_drop: Optional[Union[Any, Sequence[Any]]]=None, - freq_check: Optional[Literal["check", "auto"]]=None, + to_mask: Optional[Union[Any, Sequence[Any]]]=BAD, **kwargs ) -> Tuple[DictOfSeries, Flagger]: @@ -674,14 +693,9 @@ def reindexFlags( freq : {None, str},default None The freq determines the projection range for the projection method. See above description for more details. Defaultly (None), the sampling frequency of source is used. - to_drop : {None, str, List[str]}, default None + to_mask : {None, str, List[str]}, default None Flags referring to values that are to drop before flags projection. Relevant only when projecting with an inverted shift method. Defaultly BAD is listed. - freq_check : {None, 'check', 'auto'}, default None - - None: do not validate frequency-string passed to `freq` - - 'check': estimate frequency and log a warning if estimate miss matchs frequency string passed to 'freq', or - if no uniform sampling rate could be estimated - - 'auto': estimate frequency and use estimate. (Ignores `freq` parameter.) Returns ------- @@ -692,37 +706,20 @@ def reindexFlags( Flags values and shape may have changed relatively to the flagger input. """ - # TODO: This needs a refactoring - raise NotImplementedError("currently not available - rewrite needed") + if to_mask is None: + to_mask = BAD - flagscol, metacols = flagger.getFlags(source, full=True) + flagscol = flagger[source] if flagscol.empty: return data, flagger target_datcol = data[field] - target_flagscol, target_metacols = flagger.getFlags(field, full=True) - - if (freq is None) and (method != "match"): - freq_check = 'auto' - - freq = evalFreqStr(freq, freq_check, flagscol.index) + target_flagscol = flagger[field] if method[-13:] == "interpolation": - backprojected = flagscol.reindex(target_flagscol.index, method="bfill", tolerance=freq) - fwrdprojected = flagscol.reindex(target_flagscol.index, method="ffill", tolerance=freq) - b_replacement_mask = (backprojected > target_flagscol) & (backprojected >= fwrdprojected) - f_replacement_mask = (fwrdprojected > target_flagscol) & (fwrdprojected > backprojected) - target_flagscol.loc[b_replacement_mask] = backprojected.loc[b_replacement_mask] - target_flagscol.loc[f_replacement_mask] = fwrdprojected.loc[f_replacement_mask] - - backprojected_meta = {} - fwrdprojected_meta = {} - for meta_key in target_metacols.keys(): - backprojected_meta[meta_key] = metacols[meta_key].reindex(target_metacols[meta_key].index, method='bfill', - tolerance=freq) - fwrdprojected_meta[meta_key] = metacols[meta_key].reindex(target_metacols[meta_key].index, method='ffill', - tolerance=freq) - target_metacols[meta_key].loc[b_replacement_mask] = backprojected_meta[meta_key].loc[b_replacement_mask] - target_metacols[meta_key].loc[f_replacement_mask] = fwrdprojected_meta[meta_key].loc[f_replacement_mask] + ignore = _getChunkBounds(target_datcol, flagscol, freq) + flagscol[ignore] = np.nan + target_flagscol = _inverseInterpolation(target_flagscol, flagscol, freq) + if method[-3:] == "agg" or method == "match": # Aggregation - Inversion