diff --git a/saqc/funcs/proc_functions.py b/saqc/funcs/proc_functions.py index 215efe9656dd89c55bff24db02bcd5405547ae8e..694e09f195b0799b686cf662ffe138500bb603c7 100644 --- a/saqc/funcs/proc_functions.py +++ b/saqc/funcs/proc_functions.py @@ -37,6 +37,65 @@ def proc_interpolateMissing(data, field, flagger, method, inter_order=2, inter_l return data, flagger +@register +def proc_interpolateGrid(data, field, flagger, freq, method, inter_order=2, drop_flags=None, + downgrade_interpolation=False, **kwargs): + + datcol = data[field].copy() + flagscol = flagger.getFlags(field) + drop_flags = toSequence(drop_flags) + drop_mask = pd.Series(False, index=datcol.index) + for f in drop_flags: + drop_mask |= flagger.isFlagged(field, flag=f) + datcol[drop_mask] = np.nan + datcol.dropna() + # account for annoying case of subsequent frequency aligned values, differing exactly by the margin + # 2*freq: + spec_case_mask = datcol.index.to_series() + spec_case_mask = spec_case_mask - spec_case_mask.shift(1) + spec_case_mask = spec_case_mask == 2 * pd.Timedelta(freq) + spec_case_mask = spec_case_mask[spec_case_mask] + spec_case_mask = spec_case_mask.resample(freq).asfreq().dropna() + + if not spec_case_mask.empty: + spec_case_mask = spec_case_mask.tshift(-1, freq) + + grid_index = pd.date_range(start=data.index[0].floor(freq), end=data.index[-1].ceil(freq), freq=freq, + name=data.index.name) + + data.reindex( + data.index.join(grid_index, how="outer", ) + ) + + inter_data, chunk_bounds = interpolateNANs( + datcol, method, order=inter_order, inter_limit=2, downgrade_interpolation=downgrade_interpolation, + return_chunk_bounds=True + ) + + # exclude falsely interpolated values: + data[spec_case_mask.index] = np.nan + data = data.asfreq(freq) + data[field] = inter_data + + # reshape flagger (tiny hack to resample with overlapping intervals): + flagscol.drop(flagscol[drop_mask].index, inplace=True) + flagscol2 = flagscol.copy() + flagscol2.index = flagscol.index.shift(freq=pd.Timedelta(freq)) + max_ser1 = flagscol.resample(2*pd.Timedelta(freq)).max() + max_ser2 = flagscol2.resample(2*pd.Timedelta(freq)).max() + max_ser1.index = max_ser1.index.shift(freq=pd.Timedelta(freq)) + flagscol = max_ser1.align(max_ser2)[0] + flagscol[max_ser2.index] = max_ser2 + flagger_new = flagger.initFlags(inter_data).setFlags(field, flag=flagscol, force=True, **kwargs) + + # block chunk ends of interpolation + flags_to_block = pd.Series(np.nan, index=chunk_bounds).astype(flagger_new.dtype) + flagger_new = flagger_new.setFlags(field, loc=chunk_bounds, flag=flags_to_block, force=True) + + flagger_new = flagger.slice(drop=field).merge(flagger_new) + return data, flagger_new + + @register def proc_resample(data, field, flagger, freq, func=np.mean, max_invalid_total_d=np.inf, max_invalid_consec_d=np.inf, max_invalid_consec_f=np.inf, max_invalid_total_f=np.inf, flag_agg_func=max, method='bagg', **kwargs): @@ -45,9 +104,10 @@ def proc_resample(data, field, flagger, freq, func=np.mean, max_invalid_total_d= datcol = data[field] flagscol = flagger.getFlags(field) + if func == "shift": datcol = shift2Freq(datcol, method, freq, fill_value=np.nan) - flagscol =shift2Freq(flagscol, method, freq, fill_value=flagger.BAD) + flagscol = shift2Freq(flagscol, method, freq, fill_value=flagger.BAD) else: datcol = aggregate2Freq(datcol, method, freq, func, fill_value=np.nan, diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index 517c0ff547153d2bfd645895531292e732ef2403..497f10af3746dbe03af64c9816e105d6e3310a94 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -293,10 +293,15 @@ def shift2Freq(data, method, freq, fill_value=np.nan): direction = "bfill" tolerance = pd.Timedelta(freq) - else: + elif method == "nearest": direction = "nearest" tolerance = pd.Timedelta(freq) / 2 + else: + # method == nearest2 + direction = "nearest" + tolerance = pd.Timedelta(freq) + target_ind = pd.date_range(start=data.index[0].floor(freq), end=data.index[-1].ceil(freq), freq=freq, name=data.index.name)