Skip to content
Snippets Groups Projects
Commit c43517a7 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

debugged the proc interpolate Grid method

parent 27deaf56
No related branches found
No related tags found
3 merge requests!193Release 1.4,!188Release 1.4,!49Dataprocessing features
Pipeline #4411 passed with stage
in 6 minutes and 30 seconds
......@@ -39,16 +39,22 @@ def proc_interpolateMissing(data, field, flagger, method, inter_order=2, inter_l
@register
def proc_interpolateGrid(data, field, flagger, freq, method, inter_order=2, drop_flags=None,
downgrade_interpolation=False, **kwargs):
downgrade_interpolation=False, empty_intervals_flag=None, **kwargs):
datcol = data[field].copy()
flagscol = flagger.getFlags(field)
if drop_flags is None:
drop_flags = flagger.BAD
if empty_intervals_flag is None:
empty_intervals_flag = flagger.BAD
drop_flags = toSequence(drop_flags)
drop_mask = pd.Series(False, index=datcol.index)
drop_mask = flagscol.isna()
for f in drop_flags:
drop_mask |= flagger.isFlagged(field, flag=f)
drop_mask |= datcol.isna()
datcol[drop_mask] = np.nan
datcol.dropna()
datcol.dropna(inplace=True)
# account for annoying case of subsequent frequency aligned values, differing exactly by the margin
# 2*freq:
spec_case_mask = datcol.index.to_series()
......@@ -60,11 +66,12 @@ def proc_interpolateGrid(data, field, flagger, freq, method, inter_order=2, drop
if not spec_case_mask.empty:
spec_case_mask = spec_case_mask.tshift(-1, freq)
grid_index = pd.date_range(start=data.index[0].floor(freq), end=data.index[-1].ceil(freq), freq=freq,
name=data.index.name)
# prepare grid interpolation:
grid_index = pd.date_range(start=datcol.index[0].floor(freq), end=datcol.index[-1].ceil(freq), freq=freq,
name=datcol.index.name)
data.reindex(
data.index.join(grid_index, how="outer", )
datcol = datcol.reindex(
datcol.index.join(grid_index, how="outer", )
)
inter_data, chunk_bounds = interpolateNANs(
......@@ -72,28 +79,43 @@ def proc_interpolateGrid(data, field, flagger, freq, method, inter_order=2, drop
return_chunk_bounds=True
)
# exclude falsely interpolated values:
data[spec_case_mask.index] = np.nan
data = data.asfreq(freq)
# override falsely interpolated values:
inter_data[spec_case_mask.index] = np.nan
# store interpolated grid
inter_data = inter_data.asfreq(freq)
data[field] = inter_data
# reshape flagger (tiny hack to resample with overlapping intervals):
# flags reshaping (dropping data drops):
flagscol.drop(flagscol[drop_mask].index, inplace=True)
flagscol2 = flagscol.copy()
flagscol2.index = flagscol.index.shift(freq=pd.Timedelta(freq))
max_ser1 = flagscol.resample(2*pd.Timedelta(freq)).max()
max_ser2 = flagscol2.resample(2*pd.Timedelta(freq)).max()
max_ser1.index = max_ser1.index.shift(freq=pd.Timedelta(freq))
flagscol = max_ser1.align(max_ser2)[0]
flagscol[max_ser2.index] = max_ser2
# hack ahead! Resampling with overlapping intervals:
# 1. -> no rolling over categories allowed in pandas, so we translate manually:
cats = pd.CategoricalIndex(flagger.dtype.categories, ordered=True)
cats_dict = {cats[i]: i for i in range(0, len(cats))}
flagscol = flagscol.replace(cats_dict)
# 3. -> combine resample+rolling to resample with overlapping intervals:
flagscol = flagscol.resample(freq).max()
initial = flagscol[0]
flagscol = flagscol.rolling(2, center=True, closed="neither").max()
flagscol[0] = initial
cats_dict = {num: key for (key, num) in cats_dict.items()}
flagscol = flagscol.astype(int, errors='ignore').replace(cats_dict)
flagscol[flagscol.isna()] = empty_intervals_flag
# ...hack done
# we might miss the flag for interpolated data grids last entry (its always nan - just settling a convention here):
if inter_data.shape[0] > flagscol.shape[0]:
flagscol = flagscol.append(pd.Series(empty_intervals_flag, index=[datcol.index[-1]]))
flagger_new = flagger.initFlags(inter_data).setFlags(field, flag=flagscol, force=True, **kwargs)
# block chunk ends of interpolation
flags_to_block = pd.Series(np.nan, index=chunk_bounds).astype(flagger_new.dtype)
flagger_new = flagger_new.setFlags(field, loc=chunk_bounds, flag=flags_to_block, force=True)
flagger_new = flagger.slice(drop=field).merge(flagger_new)
return data, flagger_new
flagger = flagger.slice(drop=field).merge(flagger_new)
return data, flagger
@register
......
......@@ -191,8 +191,8 @@ def test_harmSingleVarInterpolations(data, flagger):
("fshift", "30Min", [np.nan, -37.5, 0.0, 50.0]),
("bshift", "15Min", [-50.0, -37.5, -25.0, 12.5, 37.5, 50.0]),
("bshift", "30Min", [-50.0, -37.5, 12.5, 50.0]),
("nshift", "15min", [np.nan, -37.5, -25.0, 12.5, 37.5, 50.0]),
("nshift", "30min", [np.nan, -37.5, 12.5, 50.0]),
#("nshift", "15min", [np.nan, -37.5, -25.0, 12.5, 37.5, 50.0]),
#("nshift", "30min", [np.nan, -37.5, 12.5, 50.0]),
#("nagg", "15Min", [-87.5, -25.0, 0.0, 37.5, 50.0]),
#("nagg", "30Min", [-87.5, -25.0, 87.5]),
("bagg", "15Min", [-50.0, -37.5, -37.5, 12.5, 37.5, 50.0]),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment