diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index e67fe51579d1b9770b8bf6ffa492c663a04be354..7ea8ddf28abb42573752eeca5c3f34ea641f8ba6 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -617,75 +617,6 @@ def _backtrackFlags(flagger_harmony, flagger_original_clean, flagger_original, f return flagger_original.initFlags(flags=res) -def _fromMerged(data, flagger, fieldname): - # we need a not-na mask for the flags data to be retrieved: - mask = flagger.getFlags(fieldname).notna() - return data.loc[mask[mask].index, fieldname], flagger.getFlagger(field=fieldname, loc=mask) - - -def _toMerged(data, flagger, fieldname, data_to_insert, flagger_to_insert, target_index=None, **kwargs): - - data = data.copy() - flags = flagger._flags - flags_to_insert = flagger_to_insert._flags - - # this should never happen, but if this could happen in general, - # the caller have to ensure, that we get a dios - assert not isinstance(data, pd.Series) - - newcols = data.columns.difference([fieldname]) - data = data[newcols] - flags = flags[newcols] - - # first case: there is no data, the data-to-insert would have - # to be merged with, and also are we not deharmonizing: - if data.empty and target_index is None: - return data, flagger_to_insert - - - # if thats not the case: generate the drop mask for the remaining data: - - # the following is not implemented in dios, but as soon as it is done, - # we should use it. wait for #21 see: https://git.ufz.de/rdm/dios/issues/21 - # mask = data.isna().all(axis=1) - # workaround: - nans = data.isna() - common_nans_index = nans[nans].index_of('shared') - - # we only want to drop lines, that do not have to be re-inserted in the merge: - drops = common_nans_index.difference(data_to_insert.index) - # clear mask, but keep index - mask = data.copy() - mask[:] = True - # final mask: - mask[drops] = False - - # if we are not "de-harmonizing": - if target_index is None: - # erase nan rows in the data, that became redundant because of harmonization and merge with data-to-insert: - data = pd.merge(data[mask], data_to_insert, how="outer", left_index=True, right_index=True) - flags = pd.merge(flags[mask], flags_to_insert, how="outer", left_index=True, right_index=True) - return data, flagger.initFlags(flags=flags) - - else: - # trivial case: there is only one variable ("reindexing to make sure shape matches pre-harm shape"): - if data.empty: - data = data_to_insert.reindex(target_index).to_frame(name=fieldname) - flags = flags_to_insert.reindex(target_index, fill_value=flagger.UNFLAGGED) - return data, flagger.initFlags(flags=flags) - # annoying case: more than one variables: - # erase nan rows resulting from harmonization but keep/regain those, that were initially present in the data: - new_index = data[mask].index.join(target_index, how="outer") - data = data.reindex(new_index) - flags = flags.reindex(new_index, fill_value=flagger.UNFLAGGED) - data = pd.merge(data, data_to_insert, how="outer", left_index=True, right_index=True) - flags = pd.merge(flags, flags_to_insert, how="outer", left_index=True, right_index=True) - - # internally harmonization memorizes its own manipulation by inserting nan flags - - # those we will now assign the flagger.bad flag by the "missingTest": - return flagMissing(data, fieldname, flagger.initFlags(flags=flags), nodata=np.nan, **kwargs) - - @register() def harm_shift2Grid(data, field, flagger, freq, method="nshift", drop_flags=None, **kwargs): return harm_harmonize( @@ -743,88 +674,3 @@ def harm_interpolate2Grid( **kwargs, ) - -@register() -def harm_downsample( - data, - field, - flagger, - sample_freq, - agg_freq, - sample_func="mean", - agg_func="mean", - invalid_flags=None, - max_invalid=None, - **kwargs, -): - - agg_func = getFuncFromInput(agg_func) - - if max_invalid is None: - max_invalid = np.inf - - if sample_func is not None: - sample_func = getFuncFromInput(sample_func) - - # define the "fastest possible" aggregator - if sample_func is None: - if max_invalid < np.inf: - - def aggregator(x): - if x.isna().sum() < max_invalid: - return agg_func(x) - else: - return np.nan - - else: - - def aggregator(x): - return agg_func(x) - - else: - - dummy_resampler = pd.Series(np.nan, index=[pd.Timedelta("1min")]).resample("1min") - if hasattr(dummy_resampler, sample_func.__name__): - - sample_func_name = sample_func.__name__ - if max_invalid < np.inf: - - def aggregator(x): - y = getattr(x.resample(sample_freq), sample_func_name)() - if y.isna().sum() < max_invalid: - return agg_func(y) - else: - return np.nan - - else: - - def aggregator(x): - return agg_func(getattr(x.resample(sample_freq), sample_func_name)()) - - else: - if max_invalid < np.inf: - - def aggregator(x): - y = x.resample(sample_freq).apply(sample_func) - if y.isna().sum() < max_invalid: - return agg_func(y) - else: - return np.nan - - else: - - def aggregator(x): - return agg_func(x.resample(sample_freq).apply(sample_func)) - - return harm_harmonize( - data, - field, - flagger, - agg_freq, - inter_method="bagg", - reshape_method="bagg_no_deharm", - inter_agg=aggregator, - reshape_agg="max", - drop_flags=invalid_flags, - **kwargs, - ) diff --git a/test/funcs/test_harm_funcs.py b/test/funcs/test_harm_funcs.py index 989d7b3a711cf9613b6d850dfd275ceb51fa791a..9fb00325df0910ce13476813fff782dfb51b5629 100644 --- a/test/funcs/test_harm_funcs.py +++ b/test/funcs/test_harm_funcs.py @@ -18,8 +18,7 @@ from saqc.funcs.harm_functions import ( harm_linear2Grid, harm_interpolate2Grid, harm_shift2Grid, - harm_aggregate2Grid, - harm_downsample, + harm_aggregate2Grid ) RESHAPERS = ["nshift", "fshift", "bshift"] @@ -338,7 +337,6 @@ def test_wrapper(data, flagger): field = data.columns[0] freq = "15min" flagger = flagger.initFlags(data) - harm_downsample(data, field, flagger, "15min", "30min", agg_func="sum", sample_func="mean") harm_linear2Grid(data, field, flagger, freq, method="nagg", func="max", drop_flags=None) harm_aggregate2Grid(data, field, flagger, freq, value_func="sum", flag_func="max", method="nagg", drop_flags=None) harm_shift2Grid(data, field, flagger, freq, method="nshift", drop_flags=None)