diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index 32f2d628aa26fc6b0a1b5b1a66e06d607721b41a..108ff59891777be6b062e1c422c64ead003dc84d 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -296,7 +296,7 @@ def _interpolateGrid( # Aggregations: if method in aggregations: - data = aggregate2Freq(data, method, agg_method, freq) + data = aggregate2Freq(data, method, freq, agg_method) # Shifts elif method in shifts: @@ -304,7 +304,6 @@ def _interpolateGrid( # Interpolations: elif method in interpolations: - # account for annoying case of subsequent frequency alligned values, differing exactly by the margin # 2*freq: spec_case_mask = data.index.to_series() @@ -414,7 +413,7 @@ def _reshapeFlags( flagger_new = flagger_new.setFlags(field, flag=fdata, force=True, **kwargs) elif method in aggregations: - fdata = aggregate2Freq(fdata, method, agg_method, freq, fill_value=missing_flag) + fdata = aggregate2Freq(fdata, method, freq, agg_method, fill_value=missing_flag) fdata = fdata.astype(flagger.dtype) # block flagging/backtracking of chunk_starts/chunk_ends diff --git a/saqc/funcs/proc_functions.py b/saqc/funcs/proc_functions.py index 079fedc4ed48dd67b27f8bde836e30203b61b465..4af2861fb953ddadd74f4573590610fef580a513 100644 --- a/saqc/funcs/proc_functions.py +++ b/saqc/funcs/proc_functions.py @@ -37,8 +37,8 @@ def proc_interpolateMissing(data, field, flagger, method, inter_order=2, inter_l @register() -def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total_d=None, max_invalid_consec_d=None, - max_invalid_consec_f=None, max_invalid_total_f=None, flag_agg_func='max', method='bagg', **kwargs): +def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total_d=np.inf, max_invalid_consec_d=np.inf, + max_invalid_consec_f=np.inf, max_invalid_total_f=np.inf, flag_agg_func='max', method='bagg', **kwargs): data = data.copy() datcol = data[field] @@ -47,6 +47,9 @@ def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total_d=N func = composeFunction(func) flag_agg_func = composeFunction(flag_agg_func) + if func == 'shift': + datcol = shift2Freq(datcol, method, freq, fill_value=fill_value) + # data resampling datcol = aggregate2Freq(datcol, method, agg_func=func, freq=freq, fill_value=np.nan, max_invalid_total=max_invalid_total_d, max_invalid_consec=max_invalid_consec_d) diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index b2ea1db9e066e57e69e082edabc847ed1887da28..517c0ff547153d2bfd645895531292e732ef2403 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -230,64 +230,54 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio return data -def aggregate2Freq(data, method, agg_func, freq, fill_value=np.nan, max_invalid_total=None, max_invalid_consec=None): - - # filter data for invalid patterns - if (max_invalid_total is not None) | (max_invalid_consec is not None): - if not max_invalid_total: - max_invalid_total = np.inf - if not max_invalid_consec: - max_invalid_consec = np.inf - +def aggregate2Freq(data, method, freq, agg_func, fill_value=np.nan, max_invalid_total=np.inf, max_invalid_consec=np.inf): + # filter data for invalid patterns (since filtering is expensive we pre-check if it is demanded) + if (max_invalid_total is not np.inf) | (max_invalid_consec is not np.inf): if pd.isnull(fill_value): temp_mask = (data.isna()) else: temp_mask = (data == fill_value) + temp_mask = temp_mask.groupby(pd.Grouper(freq=freq)).transform(validationTrafo, max_nan_total=max_invalid_total, max_nan_consec=max_invalid_consec) data[temp_mask] = fill_value + # some timestamp acrobatics to feed pd.resample`s base keyword properly + seconds_total = pd.Timedelta(freq).total_seconds() + freq_string = str(int(seconds_total)) + "s" if method == "nagg": # all values within a grid points range (+/- freq/2, closed to the left) get aggregated with 'agg method' - # some timestamp acrobatics to feed the base keyword properly - seconds_total = pd.Timedelta(freq).total_seconds() - freq_string = str(int(seconds_total)) + "s" base = seconds_total / 2 - loffset = pd.Timedelta(freq) / 2 label = 'left' closed = 'left' elif method == "bagg": - seconds_total = pd.Timedelta(freq).total_seconds() - freq_string = str(int(seconds_total)) + "s" + # all values in a sampling interval get aggregated with agg_method and assigned to the last grid point base = 0 - loffset = pd.Timedelta(0) label = 'left' closed = 'left' - # all values in a sampling interval get aggregated with agg_method and assigned to the last grid point - # if method is fagg else: - # "fagg" - seconds_total = pd.Timedelta(freq).total_seconds() - freq_string = str(int(seconds_total)) + "s" + # all values in a sampling interval get aggregated with agg_method and assigned to the next grid point base = 0 - loffset = pd.Timedelta(0) label = 'right' closed = 'right' - # all values in a sampling interval get aggregated with agg_method and assigned to the next grid point - # some consistency cleanup: - # we check for empty intervals before resampling, because: - # - resample AND groupBy do insert value zero for empty intervals if resampling with any kind of "sum" - - # we want value nan - # - we are aggregating flags as well and empty intervals get BAD flag (which usually is not nan) - empty_intervals = data.resample(freq_string, loffset=loffset, base=base, closed=closed, - label=label).count() == 0 + # In the following, we check for empty intervals outside resample.apply, because: + # - resample AND groupBy do insert value zero for empty intervals if resampling with any kind of "sum" application - + # we want "fill_value" to be inserted + # - we are aggregating data and flags with this function and empty intervals usually would get assigned flagger.BAD + # flag (where resample inserts np.nan) - dataresampler = data.resample(freq_string, loffset=loffset, base=base, closed=closed, - label=label) + data_resampler = data.resample(freq_string, base=base, closed=closed, + label=label) - data = dataresampler.apply(agg_func) + empty_intervals = data_resampler.count() == 0 + data = data_resampler.apply(agg_func) + # since loffset keyword of pandas "discharges" after one use of the resampler (pandas logic) - we correct the + # resampled labels offset manually, if necessary. + if method == "nagg": + data.index = data.index.shift(freq=pd.Timedelta(freq) / 2) + empty_intervals.index = empty_intervals.index.shift(freq=pd.Timedelta(freq) / 2) data[empty_intervals] = fill_value return data