Skip to content
Snippets Groups Projects
Commit 27deaf56 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

evacuated interpolation functionality from harmonization module

parent c0663723
No related branches found
No related tags found
3 merge requests!193Release 1.4,!188Release 1.4,!49Dataprocessing features
Pipeline #4377 failed with stage
in 6 minutes and 53 seconds
......@@ -37,6 +37,65 @@ def proc_interpolateMissing(data, field, flagger, method, inter_order=2, inter_l
return data, flagger
@register
def proc_interpolateGrid(data, field, flagger, freq, method, inter_order=2, drop_flags=None,
downgrade_interpolation=False, **kwargs):
datcol = data[field].copy()
flagscol = flagger.getFlags(field)
drop_flags = toSequence(drop_flags)
drop_mask = pd.Series(False, index=datcol.index)
for f in drop_flags:
drop_mask |= flagger.isFlagged(field, flag=f)
datcol[drop_mask] = np.nan
datcol.dropna()
# account for annoying case of subsequent frequency aligned values, differing exactly by the margin
# 2*freq:
spec_case_mask = datcol.index.to_series()
spec_case_mask = spec_case_mask - spec_case_mask.shift(1)
spec_case_mask = spec_case_mask == 2 * pd.Timedelta(freq)
spec_case_mask = spec_case_mask[spec_case_mask]
spec_case_mask = spec_case_mask.resample(freq).asfreq().dropna()
if not spec_case_mask.empty:
spec_case_mask = spec_case_mask.tshift(-1, freq)
grid_index = pd.date_range(start=data.index[0].floor(freq), end=data.index[-1].ceil(freq), freq=freq,
name=data.index.name)
data.reindex(
data.index.join(grid_index, how="outer", )
)
inter_data, chunk_bounds = interpolateNANs(
datcol, method, order=inter_order, inter_limit=2, downgrade_interpolation=downgrade_interpolation,
return_chunk_bounds=True
)
# exclude falsely interpolated values:
data[spec_case_mask.index] = np.nan
data = data.asfreq(freq)
data[field] = inter_data
# reshape flagger (tiny hack to resample with overlapping intervals):
flagscol.drop(flagscol[drop_mask].index, inplace=True)
flagscol2 = flagscol.copy()
flagscol2.index = flagscol.index.shift(freq=pd.Timedelta(freq))
max_ser1 = flagscol.resample(2*pd.Timedelta(freq)).max()
max_ser2 = flagscol2.resample(2*pd.Timedelta(freq)).max()
max_ser1.index = max_ser1.index.shift(freq=pd.Timedelta(freq))
flagscol = max_ser1.align(max_ser2)[0]
flagscol[max_ser2.index] = max_ser2
flagger_new = flagger.initFlags(inter_data).setFlags(field, flag=flagscol, force=True, **kwargs)
# block chunk ends of interpolation
flags_to_block = pd.Series(np.nan, index=chunk_bounds).astype(flagger_new.dtype)
flagger_new = flagger_new.setFlags(field, loc=chunk_bounds, flag=flags_to_block, force=True)
flagger_new = flagger.slice(drop=field).merge(flagger_new)
return data, flagger_new
@register
def proc_resample(data, field, flagger, freq, func=np.mean, max_invalid_total_d=np.inf, max_invalid_consec_d=np.inf,
max_invalid_consec_f=np.inf, max_invalid_total_f=np.inf, flag_agg_func=max, method='bagg', **kwargs):
......@@ -45,9 +104,10 @@ def proc_resample(data, field, flagger, freq, func=np.mean, max_invalid_total_d=
datcol = data[field]
flagscol = flagger.getFlags(field)
if func == "shift":
datcol = shift2Freq(datcol, method, freq, fill_value=np.nan)
flagscol =shift2Freq(flagscol, method, freq, fill_value=flagger.BAD)
flagscol = shift2Freq(flagscol, method, freq, fill_value=flagger.BAD)
else:
datcol = aggregate2Freq(datcol, method, freq, func, fill_value=np.nan,
......
......@@ -293,10 +293,15 @@ def shift2Freq(data, method, freq, fill_value=np.nan):
direction = "bfill"
tolerance = pd.Timedelta(freq)
else:
elif method == "nearest":
direction = "nearest"
tolerance = pd.Timedelta(freq) / 2
else:
# method == nearest2
direction = "nearest"
tolerance = pd.Timedelta(freq)
target_ind = pd.date_range(start=data.index[0].floor(freq), end=data.index[-1].ceil(freq),
freq=freq,
name=data.index.name)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment