diff --git a/docs/FunctionDescriptions.md b/docs/FunctionDescriptions.md index 6d089ac43ec68443afba484b0a5161cb5ad025bd..2d53066540da9e74a8b93696cb3afdfa934fb5f4 100644 --- a/docs/FunctionDescriptions.md +++ b/docs/FunctionDescriptions.md @@ -26,9 +26,9 @@ Main documentation of the implemented functions, their purpose and parameters an - [harmonize](#harmonize) - [deharmonize](#deharmonize) - [harmonize_shift2Grid](#harmonize_shift2grid) - - [harmonize_shift2Grid](#harmonize_aggregate2grid) - - [harmonize_shift2Grid](#harmonize_linear2grid) - - [harmonize_shift2Grid](#harmonize_interpolate2grid) + - [harmonize_aggregate2Grid](#harmonize_aggregate2grid) + - [harmonize_linear2Grid](#harmonize_linear2grid) + - [harmonize_interpolate2Grid](#harmonize_interpolate2grid) ## range diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index 9f96141d1ef884d06db4dca6743fd84f5be4a03b..9933253ef3c125a1dbf5219a02ae07fe79ba39f5 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -829,7 +829,8 @@ def linear2Grid(data, field, flagger, freq, flag_assignment_method='nearest_agg' @register('harmonize_interpolate2Grid') -def interpolate2Grid(data, field, flagger, freq, interpolation_method, interpolation_order=1, flag_assignment_method='nearest_agg', flag_agg_func=max, drop_flags=None, **kwargs): +def interpolate2Grid(data, field, flagger, freq, interpolation_method, interpolation_order=1, + flag_assignment_method='nearest_agg', flag_agg_func=max, drop_flags=None, **kwargs): return harmonize( data, field, @@ -842,4 +843,61 @@ def interpolate2Grid(data, field, flagger, freq, interpolation_method, interpola drop_flags=drop_flags, **kwargs) -#def aggregate(data, field, flagger, freq_base, freq_target, agg_func, **kwargs): \ No newline at end of file + +def aggregate(data, field, flagger, source_freq, target_freq, agg_func=np.mean, sample_func=np.mean, + invalid_flags=None, max_invalid=np.inf, **kwargs): + + # define the "fastest possible" aggregator + if sample_func is None: + if max_invalid < np.inf: + + def aggregator(x): + if x.isna().sum() < max_invalid: + return agg_func(x) + else: + return np.nan + else: + + def aggregator(x): + return agg_func(x) + else: + + dummy_resampler = pd.Series(np.nan, index=[pd.Timedelta('1min')]).resample('1min') + if hasattr(dummy_resampler, sample_func.__name__): + + sample_func_name = sample_func.__name__ + if max_invalid < np.inf: + def aggregator(x): + y = getattr(x.resample(source_freq), sample_func_name)() + if y.isna().sum() < max_invalid: + return agg_func(y) + else: + return np.nan + + else: + def aggregator(x): + agg_func(getattr(x.resample(source_freq), sample_func_name)()) + + else: + if max_invalid < np.inf: + def aggregator(x): + y = x.resample(source_freq).apply(sample_func) + if y.isna().sum() < max_invalid: + agg_func(y) + else: + return np.nan + else: + def aggregator(x): + return agg_func(x.resample(source_freq).apply(sample_func)) + + return harmonize( + data, + field, + flagger, + target_freq, + inter_method='bagg', + reshape_method='bagg', + inter_agg=aggregator, + reshape_agg=max, + drop_flags=invalid_flags, + **kwargs)