Skip to content
Snippets Groups Projects
Commit 2e2eb3ca authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

inverse interpolation running

parent fe30a3d8
No related branches found
No related tags found
3 merge requests!271Static expansion of regular expressions,!260Follow-Up Translations,!237Flagger Translations
......@@ -376,12 +376,9 @@ def applyFunctionOnHistory(flags: Flags, column, hist_func, hist_kws, mask_func,
return flags
def mergeHistoryByFunc(flags: Flags, field, source, merge_func, merge_func_kws, last_column=None):
def appendHistory(flags: Flags, column, append_hist):
"""
Merges the information of one history (source) into the other (field). (Without altering fields indices)
Field indices remain unchanged. The merge is performed, via manipulating the field history values
column wise according to `merge_func`.
Function, specialized for used in deharm context.
Parameters
......@@ -398,25 +395,11 @@ def mergeHistoryByFunc(flags: Flags, field, source, merge_func, merge_func_kws,
"""
flags = flags.copy()
target_history = flags.history[field]
source_history = flags.history[source]
new_target_history = History()
# import pdb
# pdb.set_trace()
for k in target_history.hist.columns:
col_args_h = dict(source_col=source_history.hist[k])
col_args_m = dict(source_col=source_history.mask[k])
col_args_h.update(merge_func_kws)
col_args_m.update(merge_func_kws)
new_target_history.hist[k] = merge_func(target_history.hist[k], **col_args_h)
new_target_history.mask[k] = merge_func(target_history.mask[k], **col_args_m)
if last_column is None:
new_target_history.mask.iloc[:, -1:] = True
else:
new_target_history.append(last_column, force=True)
flags.history[field] = new_target_history
new_history = flags.history[column]
for app_k in [k for k in append_hist.columns if k not in new_history.columns]:
new_history.hist[app_k] = append_hist.hist[app_k]
new_history.mask[app_k] = append_hist.mask[app_k]
flags.history[column] = new_history
return flags
# for now we keep this name
......
......@@ -18,7 +18,7 @@ from saqc.funcs.tools import copy, drop, rename
from saqc.funcs.interpolation import interpolateIndex
from saqc.lib.tools import getDropMask, evalFreqStr, getFreqDelta
from saqc.lib.ts_operators import shift2Freq, aggregate2Freq
from saqc.flagger.flags import applyFunctionOnHistory, mergeHistoryByFunc
from saqc.flagger.flags import applyFunctionOnHistory, appendHistory
from saqc.lib.rolling import customRoller
logger = logging.getLogger("SaQC")
......@@ -621,16 +621,13 @@ def _getChunkBounds(target_datcol, flagscol, freq):
return ignore_flags
def _inverseInterpolation(target_flagscol, source_col=None, freq=None, chunk_bounds=None):
def _inverseInterpolation(source_col, freq=None, chunk_bounds=None, target_flagscol=None):
source_col = source_col.copy()
source_col[chunk_bounds] = np.nan
if len(chunk_bounds) > 0:
source_col[chunk_bounds] = np.nan
backprojected = source_col.reindex(target_flagscol.index, method="bfill", tolerance=freq)
fwrdprojected = source_col.reindex(target_flagscol.index, method="ffill", tolerance=freq)
b_replacement_mask = (backprojected > target_flagscol) & (backprojected >= fwrdprojected)
f_replacement_mask = (fwrdprojected > target_flagscol) & (fwrdprojected > backprojected)
target_flagscol.loc[b_replacement_mask] = backprojected.loc[b_replacement_mask]
target_flagscol.loc[f_replacement_mask] = fwrdprojected.loc[f_replacement_mask]
return target_flagscol
return pd.concat([backprojected, fwrdprojected], axis=1).max(axis=1)
def _inverseAggregation(target_flagscol, source_col=None, freq=None, method=None):
......@@ -743,24 +740,27 @@ def reindexFlags(
target_datcol = data[field]
target_flagscol = flagger[field]
append_dummy = pd.Series(np.nan, target_flagscol.index)
blank_dummy = pd.Series(np.nan, target_flagscol.index)
if method[-13:] == "interpolation":
ignore = _getChunkBounds(target_datcol, flagscol, freq)
merge_func = _inverseInterpolation
merge_dict = dict(freq=freq, chunk_bounds=ignore)
merge_dict = dict(freq=freq, chunk_bounds=ignore, target_flagscol=blank_dummy)
mask_dict = {**merge_dict, 'chunk_bounds':[]}
if method[-3:] == "agg" or method == "match":
projection_method = METHOD2ARGS[method][0]
tolerance = METHOD2ARGS[method][1](freq)
merge_func = _inverseAggregation
merge_dict = dict(freq=tolerance, method=projection_method)
merge_dict = mask_dict = dict(freq=tolerance, method=projection_method, target_flagscol=blank_dummy)
if method[-5:] == "shift":
drop_mask = (target_datcol.isna() | isflagged(target_flagscol, kwargs['to_mask']))
projection_method = METHOD2ARGS[method][0]
tolerance = METHOD2ARGS[method][1](freq)
merge_func = _inverseShift
merge_dict = dict(freq=tolerance, method=projection_method, drop_mask=drop_mask)
merge_dict = mask_dict = dict(freq=tolerance, method=projection_method, drop_mask=drop_mask, target_flagscol=blank_dummy)
flagger = mergeHistoryByFunc(flagger, field, source, merge_func, merge_dict, last_column=append_dummy)
tmp_flagger = applyFunctionOnHistory(flagger, source, merge_func, merge_dict, merge_func, mask_dict,
last_column=blank_dummy)
flagger = appendHistory(flagger, field, tmp_flagger.history[source])
return data, flagger
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment