Skip to content
Snippets Groups Projects
Commit 53fc5d07 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

removed deprecated/shifted methods from the harm module

parent 0d6ecb40
No related branches found
No related tags found
4 merge requests!193Release 1.4,!188Release 1.4,!49Dataprocessing features,!44Dataprocessing features
......@@ -617,75 +617,6 @@ def _backtrackFlags(flagger_harmony, flagger_original_clean, flagger_original, f
return flagger_original.initFlags(flags=res)
def _fromMerged(data, flagger, fieldname):
# we need a not-na mask for the flags data to be retrieved:
mask = flagger.getFlags(fieldname).notna()
return data.loc[mask[mask].index, fieldname], flagger.getFlagger(field=fieldname, loc=mask)
def _toMerged(data, flagger, fieldname, data_to_insert, flagger_to_insert, target_index=None, **kwargs):
data = data.copy()
flags = flagger._flags
flags_to_insert = flagger_to_insert._flags
# this should never happen, but if this could happen in general,
# the caller have to ensure, that we get a dios
assert not isinstance(data, pd.Series)
newcols = data.columns.difference([fieldname])
data = data[newcols]
flags = flags[newcols]
# first case: there is no data, the data-to-insert would have
# to be merged with, and also are we not deharmonizing:
if data.empty and target_index is None:
return data, flagger_to_insert
# if thats not the case: generate the drop mask for the remaining data:
# the following is not implemented in dios, but as soon as it is done,
# we should use it. wait for #21 see: https://git.ufz.de/rdm/dios/issues/21
# mask = data.isna().all(axis=1)
# workaround:
nans = data.isna()
common_nans_index = nans[nans].index_of('shared')
# we only want to drop lines, that do not have to be re-inserted in the merge:
drops = common_nans_index.difference(data_to_insert.index)
# clear mask, but keep index
mask = data.copy()
mask[:] = True
# final mask:
mask[drops] = False
# if we are not "de-harmonizing":
if target_index is None:
# erase nan rows in the data, that became redundant because of harmonization and merge with data-to-insert:
data = pd.merge(data[mask], data_to_insert, how="outer", left_index=True, right_index=True)
flags = pd.merge(flags[mask], flags_to_insert, how="outer", left_index=True, right_index=True)
return data, flagger.initFlags(flags=flags)
else:
# trivial case: there is only one variable ("reindexing to make sure shape matches pre-harm shape"):
if data.empty:
data = data_to_insert.reindex(target_index).to_frame(name=fieldname)
flags = flags_to_insert.reindex(target_index, fill_value=flagger.UNFLAGGED)
return data, flagger.initFlags(flags=flags)
# annoying case: more than one variables:
# erase nan rows resulting from harmonization but keep/regain those, that were initially present in the data:
new_index = data[mask].index.join(target_index, how="outer")
data = data.reindex(new_index)
flags = flags.reindex(new_index, fill_value=flagger.UNFLAGGED)
data = pd.merge(data, data_to_insert, how="outer", left_index=True, right_index=True)
flags = pd.merge(flags, flags_to_insert, how="outer", left_index=True, right_index=True)
# internally harmonization memorizes its own manipulation by inserting nan flags -
# those we will now assign the flagger.bad flag by the "missingTest":
return flagMissing(data, fieldname, flagger.initFlags(flags=flags), nodata=np.nan, **kwargs)
@register()
def harm_shift2Grid(data, field, flagger, freq, method="nshift", drop_flags=None, **kwargs):
return harm_harmonize(
......@@ -743,88 +674,3 @@ def harm_interpolate2Grid(
**kwargs,
)
@register()
def harm_downsample(
data,
field,
flagger,
sample_freq,
agg_freq,
sample_func="mean",
agg_func="mean",
invalid_flags=None,
max_invalid=None,
**kwargs,
):
agg_func = getFuncFromInput(agg_func)
if max_invalid is None:
max_invalid = np.inf
if sample_func is not None:
sample_func = getFuncFromInput(sample_func)
# define the "fastest possible" aggregator
if sample_func is None:
if max_invalid < np.inf:
def aggregator(x):
if x.isna().sum() < max_invalid:
return agg_func(x)
else:
return np.nan
else:
def aggregator(x):
return agg_func(x)
else:
dummy_resampler = pd.Series(np.nan, index=[pd.Timedelta("1min")]).resample("1min")
if hasattr(dummy_resampler, sample_func.__name__):
sample_func_name = sample_func.__name__
if max_invalid < np.inf:
def aggregator(x):
y = getattr(x.resample(sample_freq), sample_func_name)()
if y.isna().sum() < max_invalid:
return agg_func(y)
else:
return np.nan
else:
def aggregator(x):
return agg_func(getattr(x.resample(sample_freq), sample_func_name)())
else:
if max_invalid < np.inf:
def aggregator(x):
y = x.resample(sample_freq).apply(sample_func)
if y.isna().sum() < max_invalid:
return agg_func(y)
else:
return np.nan
else:
def aggregator(x):
return agg_func(x.resample(sample_freq).apply(sample_func))
return harm_harmonize(
data,
field,
flagger,
agg_freq,
inter_method="bagg",
reshape_method="bagg_no_deharm",
inter_agg=aggregator,
reshape_agg="max",
drop_flags=invalid_flags,
**kwargs,
)
......@@ -18,8 +18,7 @@ from saqc.funcs.harm_functions import (
harm_linear2Grid,
harm_interpolate2Grid,
harm_shift2Grid,
harm_aggregate2Grid,
harm_downsample,
harm_aggregate2Grid
)
RESHAPERS = ["nshift", "fshift", "bshift"]
......@@ -338,7 +337,6 @@ def test_wrapper(data, flagger):
field = data.columns[0]
freq = "15min"
flagger = flagger.initFlags(data)
harm_downsample(data, field, flagger, "15min", "30min", agg_func="sum", sample_func="mean")
harm_linear2Grid(data, field, flagger, freq, method="nagg", func="max", drop_flags=None)
harm_aggregate2Grid(data, field, flagger, freq, value_func="sum", flag_func="max", method="nagg", drop_flags=None)
harm_shift2Grid(data, field, flagger, freq, method="nshift", drop_flags=None)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment