diff --git a/saqc/flagger/flags.py b/saqc/flagger/flags.py index 7754995d78ed2e3a22794c46daac3c7d2a42522a..6f4caf4f9a22017373793d82eb6c1a700c1cbe81 100644 --- a/saqc/flagger/flags.py +++ b/saqc/flagger/flags.py @@ -411,7 +411,12 @@ def mergeHistoryByFunc(flags: Flags, field, source, merge_func, merge_func_kws, col_args_h.update(merge_func_kws) col_args_m.update(merge_func_kws) new_target_history.hist[k] = merge_func(target_history.hist[k], **col_args_h) - new_target_history.mask[k] = merge_func(target_history.mask[k][k], **col_args_m) + new_target_history.mask[k] = merge_func(target_history.mask[k], **col_args_m) + + if last_column is None: + new_target_history.mask.iloc[:, -1:] = True + else: + new_target_history.append(last_column, force=True) flags.history[field] = new_target_history return flags diff --git a/saqc/funcs/interpolation.py b/saqc/funcs/interpolation.py index fecb5718d9fddf89b359fee98a1eee517448184d..95fb4997097a362b8c309049135564d364f70947 100644 --- a/saqc/funcs/interpolation.py +++ b/saqc/funcs/interpolation.py @@ -249,18 +249,20 @@ def interpolateIndex( to_mask = BAD datcol = data[field] + flagscol = flagger[field] if datcol.empty: return data, flagger start, end = datcol.index[0].floor(freq), datcol.index[-1].ceil(freq) + grid_index = pd.date_range(start=start, end=end, freq=freq, name=datcol.index.name) + datcol = datcol.copy() + datcol.drop(flagscol[flagscol >= to_mask].index, inplace=True) datcol.dropna(inplace=True) - - grid_index = pd.date_range(start=start, end=end, freq=freq, name=datcol.index.name) dat_index = datcol.index # account for annoying case of subsequent frequency aligned values, - # which differ exactly by the margin of 2*freq + # that differ exactly by the margin of 2*freq gaps = ((dat_index[1:] - dat_index[:-1]) == 2*pd.Timedelta(freq)) gaps = dat_index[1:][gaps] aligned_gaps = gaps.join(grid_index, how='inner') @@ -286,7 +288,6 @@ def interpolateIndex( data[field] = inter_data[grid_index] # flags reshaping - flagscol = flagger[field] flagscol.drop(flagscol[flagscol >= to_mask].index, inplace=True) flagscol = _overlap_rs(flagscol, freq, UNFLAGGED) diff --git a/saqc/funcs/resampling.py b/saqc/funcs/resampling.py index 5338ec34e58b4f7f0e375072d19206e9b5907afd..49d0888b9950dc69149fe240552302675fa3347b 100644 --- a/saqc/funcs/resampling.py +++ b/saqc/funcs/resampling.py @@ -16,7 +16,7 @@ from saqc.core.register import register from saqc.flagger import Flagger, initFlagsLike, History from saqc.funcs.tools import copy, drop, rename from saqc.funcs.interpolation import interpolateIndex -from saqc.lib.tools import getDropMask, evalFreqStr +from saqc.lib.tools import getDropMask, evalFreqStr, getFreqDelta from saqc.lib.ts_operators import shift2Freq, aggregate2Freq from saqc.flagger.flags import applyFunctionOnHistory, mergeHistoryByFunc from saqc.lib.rolling import customRoller @@ -624,7 +624,9 @@ def _getChunkBounds(target_datcol, flagscol, freq): return ignore_flags -def _inverseInterpolation(target_flagscol, source_col=None, freq=None): +def _inverseInterpolation(target_flagscol, source_col=None, freq=None, chunk_bounds=None): + source_col = source_col.copy() + source_col[chunk_bounds] = np.nan backprojected = source_col.reindex(target_flagscol.index, method="bfill", tolerance=freq) fwrdprojected = source_col.reindex(target_flagscol.index, method="ffill", tolerance=freq) b_replacement_mask = (backprojected > target_flagscol) & (backprojected >= fwrdprojected) @@ -634,6 +636,39 @@ def _inverseInterpolation(target_flagscol, source_col=None, freq=None): return target_flagscol +def _inverseAggregation(target_flagscol, source_col=None, freq=None, method=None): + source_col = source_col.reindex(target_flagscol.index, method=method, tolerance=freq) + replacement_mask = source_col > target_flagscol + target_flagscol.loc[replacement_mask] = source_col.loc[replacement_mask] + return target_flagscol + + +def _inverseShift(target_flagscol, source_col=None, freq=None, method=None, drop_mask=None): + target_flagscol_drops = target_flagscol[drop_mask] + target_flagscol.drop(drop_mask[drop_mask].index, inplace=True) + flags_merged = pd.merge_asof( + source_col, + pd.Series(target_flagscol.index.values, index=target_flagscol.index, name="pre_index"), + left_index=True, + right_index=True, + tolerance=freq, + direction=method, + ) + flags_merged.dropna(subset=["pre_index"], inplace=True) + flags_merged = flags_merged.set_index(["pre_index"]).squeeze() + + # write flags to target + replacement_mask = flags_merged > target_flagscol.loc[flags_merged.index] + target_flagscol.loc[replacement_mask[replacement_mask].index] = flags_merged.loc[replacement_mask] + + # reinsert drops + target_flagscol = target_flagscol.reindex(target_flagscol.index.join(target_flagscol_drops.index, how="outer")) + target_flagscol.loc[target_flagscol_drops.index] = target_flagscol_drops.values + + return target_flagscol + + + @register(masking='none', module="resampling") def reindexFlags( data: DictOfSeries, @@ -713,81 +748,34 @@ def reindexFlags( if flagscol.empty: return data, flagger + if freq is None: + freq = getFreqDelta(flagscol.index) + if freq is None and not method=='match': + raise ValueError('To project irregularly sampled data, either use method="match", or pass custom ' + 'projection range to freq parameter') + target_datcol = data[field] target_flagscol = flagger[field] - + append_dummy = pd.Series(np.nan, target_flagscol.index) if method[-13:] == "interpolation": ignore = _getChunkBounds(target_datcol, flagscol, freq) - flagscol[ignore] = np.nan - target_flagscol = _inverseInterpolation(target_flagscol, flagscol, freq) - flagger = mergeHistoryByFunc(flagger, field, source, _inverseInterpolation, dict(freq=freq)) + merge_func = _inverseInterpolation + merge_dict = dict(freq=freq, chunk_bounds=ignore) + if method[-3:] == "agg" or method == "match": - # Aggregation - Inversion projection_method = METHOD2ARGS[method][0] tolerance = METHOD2ARGS[method][1](freq) - flagscol = flagscol.reindex(target_flagscol.index, method=projection_method, tolerance=tolerance) - replacement_mask = flagscol > target_flagscol - target_flagscol.loc[replacement_mask] = flagscol.loc[replacement_mask] - for meta_key in target_metacols.keys(): - metacols[meta_key] = metacols[meta_key].reindex(target_metacols[meta_key].index, method=projection_method, - tolerance=tolerance) - target_metacols[meta_key].loc[replacement_mask] = metacols[meta_key].loc[replacement_mask] + merge_func = _inverseAggregation + merge_dict = dict(freq=tolerance, method=projection_method) + if method[-5:] == "shift": - # NOTE: although inverting a simple shift seems to be a less complex operation, it has quite some - # code assigned to it and appears to be more verbose than inverting aggregation - - # that owes itself to the problem of BAD/invalid values blocking a proper - # shift inversion and having to be outsorted before shift inversion and re-inserted afterwards. - # - # starting with the dropping and its memorization: - - drop_mask = getDropMask(field, to_drop, flagger, BAD) - drop_mask |= target_datcol.isna() - target_flagscol_drops = target_flagscol[drop_mask] - target_flagscol.drop(drop_mask[drop_mask].index, inplace=True) - - # shift inversion + drop_mask = (target_datcol.isna() | target_flagscol >= to_mask) projection_method = METHOD2ARGS[method][0] tolerance = METHOD2ARGS[method][1](freq) - flags_merged = pd.merge_asof( - flagscol, - pd.Series(target_flagscol.index.values, index=target_flagscol.index, name="pre_index"), - left_index=True, - right_index=True, - tolerance=tolerance, - direction=projection_method, - ) - flags_merged.dropna(subset=["pre_index"], inplace=True) - flags_merged = flags_merged.set_index(["pre_index"]).squeeze() - - # write flags to target - replacement_mask = flags_merged > target_flagscol.loc[flags_merged.index] - target_flagscol.loc[replacement_mask[replacement_mask].index] = flags_merged.loc[replacement_mask] - - # reinsert drops - target_flagscol = target_flagscol.reindex(target_flagscol.index.join(target_flagscol_drops.index, how="outer")) - target_flagscol.loc[target_flagscol_drops.index] = target_flagscol_drops.values - - for meta_key in target_metacols.keys(): - target_metadrops = target_metacols[meta_key][drop_mask] - target_metacols[meta_key].drop(drop_mask[drop_mask].index, inplace=True) - meta_merged = pd.merge_asof( - metacols[meta_key], - pd.Series(target_metacols[meta_key].index.values, index=target_metacols[meta_key].index, - name="pre_index"), - left_index=True, - right_index=True, - tolerance=tolerance, - direction=projection_method, - ) - meta_merged.dropna(subset=["pre_index"], inplace=True) - meta_merged = meta_merged.set_index(["pre_index"]).squeeze() - # reinsert drops - target_metacols[meta_key][replacement_mask[replacement_mask].index] = meta_merged[replacement_mask] - target_metacols[meta_key] = target_metacols[meta_key].reindex( - target_metacols[meta_key].index.join(target_metadrops.index, how="outer")) - target_metacols[meta_key].loc[target_metadrops.index] = target_metadrops.values - - flagger = flagger.setFlags(field, flag=target_flagscol, with_extra=True, **target_metacols, **kwargs) + merge_func = _inverseShift + merge_dict = dict(freq=tolerance, method=projection_method, drop_mask=drop_mask) + + flagger = mergeHistoryByFunc(flagger, field, source, merge_func, merge_dict, last_column=append_dummy) return data, flagger