diff --git a/saqc/flagger/flags.py b/saqc/flagger/flags.py index e6d3076facdbd590fb00db361571027f45f92863..c1dcb1ed6a966c8a52fa86bdffa5a26b4fe6d517 100644 --- a/saqc/flagger/flags.py +++ b/saqc/flagger/flags.py @@ -376,12 +376,9 @@ def applyFunctionOnHistory(flags: Flags, column, hist_func, hist_kws, mask_func, return flags -def mergeHistoryByFunc(flags: Flags, field, source, merge_func, merge_func_kws, last_column=None): +def appendHistory(flags: Flags, column, append_hist): """ - Merges the information of one history (source) into the other (field). (Without altering fields indices) - - Field indices remain unchanged. The merge is performed, via manipulating the field history values - column wise according to `merge_func`. + Function, specialized for used in deharm context. Parameters @@ -398,25 +395,11 @@ def mergeHistoryByFunc(flags: Flags, field, source, merge_func, merge_func_kws, """ flags = flags.copy() - target_history = flags.history[field] - source_history = flags.history[source] - new_target_history = History() - # import pdb - # pdb.set_trace() - for k in target_history.hist.columns: - col_args_h = dict(source_col=source_history.hist[k]) - col_args_m = dict(source_col=source_history.mask[k]) - col_args_h.update(merge_func_kws) - col_args_m.update(merge_func_kws) - new_target_history.hist[k] = merge_func(target_history.hist[k], **col_args_h) - new_target_history.mask[k] = merge_func(target_history.mask[k], **col_args_m) - - if last_column is None: - new_target_history.mask.iloc[:, -1:] = True - else: - new_target_history.append(last_column, force=True) - - flags.history[field] = new_target_history + new_history = flags.history[column] + for app_k in [k for k in append_hist.columns if k not in new_history.columns]: + new_history.hist[app_k] = append_hist.hist[app_k] + new_history.mask[app_k] = append_hist.mask[app_k] + flags.history[column] = new_history return flags # for now we keep this name diff --git a/saqc/funcs/resampling.py b/saqc/funcs/resampling.py index 848cf6ee9a1c846fa294828ba4d79672876e2639..decac74df30bbd4968b7b8be669a2cb715a53517 100644 --- a/saqc/funcs/resampling.py +++ b/saqc/funcs/resampling.py @@ -18,7 +18,7 @@ from saqc.funcs.tools import copy, drop, rename from saqc.funcs.interpolation import interpolateIndex from saqc.lib.tools import getDropMask, evalFreqStr, getFreqDelta from saqc.lib.ts_operators import shift2Freq, aggregate2Freq -from saqc.flagger.flags import applyFunctionOnHistory, mergeHistoryByFunc +from saqc.flagger.flags import applyFunctionOnHistory, appendHistory from saqc.lib.rolling import customRoller logger = logging.getLogger("SaQC") @@ -621,16 +621,13 @@ def _getChunkBounds(target_datcol, flagscol, freq): return ignore_flags -def _inverseInterpolation(target_flagscol, source_col=None, freq=None, chunk_bounds=None): +def _inverseInterpolation(source_col, freq=None, chunk_bounds=None, target_flagscol=None): source_col = source_col.copy() - source_col[chunk_bounds] = np.nan + if len(chunk_bounds) > 0: + source_col[chunk_bounds] = np.nan backprojected = source_col.reindex(target_flagscol.index, method="bfill", tolerance=freq) fwrdprojected = source_col.reindex(target_flagscol.index, method="ffill", tolerance=freq) - b_replacement_mask = (backprojected > target_flagscol) & (backprojected >= fwrdprojected) - f_replacement_mask = (fwrdprojected > target_flagscol) & (fwrdprojected > backprojected) - target_flagscol.loc[b_replacement_mask] = backprojected.loc[b_replacement_mask] - target_flagscol.loc[f_replacement_mask] = fwrdprojected.loc[f_replacement_mask] - return target_flagscol + return pd.concat([backprojected, fwrdprojected], axis=1).max(axis=1) def _inverseAggregation(target_flagscol, source_col=None, freq=None, method=None): @@ -743,24 +740,27 @@ def reindexFlags( target_datcol = data[field] target_flagscol = flagger[field] - append_dummy = pd.Series(np.nan, target_flagscol.index) + blank_dummy = pd.Series(np.nan, target_flagscol.index) if method[-13:] == "interpolation": ignore = _getChunkBounds(target_datcol, flagscol, freq) merge_func = _inverseInterpolation - merge_dict = dict(freq=freq, chunk_bounds=ignore) + merge_dict = dict(freq=freq, chunk_bounds=ignore, target_flagscol=blank_dummy) + mask_dict = {**merge_dict, 'chunk_bounds':[]} if method[-3:] == "agg" or method == "match": projection_method = METHOD2ARGS[method][0] tolerance = METHOD2ARGS[method][1](freq) merge_func = _inverseAggregation - merge_dict = dict(freq=tolerance, method=projection_method) + merge_dict = mask_dict = dict(freq=tolerance, method=projection_method, target_flagscol=blank_dummy) if method[-5:] == "shift": drop_mask = (target_datcol.isna() | isflagged(target_flagscol, kwargs['to_mask'])) projection_method = METHOD2ARGS[method][0] tolerance = METHOD2ARGS[method][1](freq) merge_func = _inverseShift - merge_dict = dict(freq=tolerance, method=projection_method, drop_mask=drop_mask) + merge_dict = mask_dict = dict(freq=tolerance, method=projection_method, drop_mask=drop_mask, target_flagscol=blank_dummy) - flagger = mergeHistoryByFunc(flagger, field, source, merge_func, merge_dict, last_column=append_dummy) + tmp_flagger = applyFunctionOnHistory(flagger, source, merge_func, merge_dict, merge_func, mask_dict, + last_column=blank_dummy) + flagger = appendHistory(flagger, field, tmp_flagger.history[source]) return data, flagger