diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index bbf7bca7fb21aeaf2e530ff8c7de630114c1e02d..9799df41664494eb0264fed2eaf18bcc80d1285c 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -9,7 +9,7 @@ import dios from saqc.funcs.functions import flagMissing from saqc.funcs.register import register from saqc.lib.tools import toSequence, getFuncFromInput -from saqc.lib.ts_operators import interpolateNANs +from saqc.lib.ts_operators import interpolateNANs, aggregate2Freq logger = logging.getLogger("SaQC") @@ -304,40 +304,7 @@ def _interpolateGrid( # Aggregations: if method in aggregations: - if method == "nagg": - # all values within a grid points range (+/- freq/2, closed to the left) get aggregated with 'agg method' - # some timestamp acrobatics to feed the base keyword properly - seconds_total = pd.Timedelta(freq).total_seconds() - freq_string = str(int(seconds_total)) + "s" - base = seconds_total / 2 - loffset = pd.Timedelta(freq) / 2 - label = 'left' - closed = 'left' - # calculate the series of aggregated values - elif method == "bagg": - seconds_total = pd.Timedelta(freq).total_seconds() - freq_string = str(int(seconds_total)) + "s" - base = 0 - loffset = pd.Timedelta(0) - label = 'left' - closed = 'left' - # all values in a sampling interval get aggregated with agg_method and assigned to the last grid point - # if method is fagg - else: - seconds_total = pd.Timedelta(freq).total_seconds() - freq_string = str(int(seconds_total)) + "s" - base = 0 - loffset = pd.Timedelta(0) - label = 'right' - closed = 'right' - # all values in a sampling interval get aggregated with agg_method and assigned to the next grid point - # some consistency cleanup: - # we are not trusting resamples interplay with sum and others - so we check for empty intervals: - to_nan = data.resample(freq_string, loffset=loffset, base=base, closed=closed, - label=label).count() == 0 - data = data.resample(freq_string, loffset=loffset, base=base, closed=closed, - label=label).apply(agg_method) - data[to_nan] = np.nan + data = aggregate2Freq(data, method, agg_method, freq) if total_range is None: data = data.reindex(ref_index) @@ -492,33 +459,8 @@ def _reshapeFlags( flagger_new = flagger_new.setFlags(field, flag=fdata, force=True, **kwargs) elif method in aggregations: - # prepare resampling keywords - if method in ["fagg", "fagg_no_deharm"]: - closed = "right" - label = "right" - base = 0 - freq_string = freq - loffset = pd.Timedelta(0) - elif method in ["bagg", "bagg_no_deharm"]: - closed = "left" - label = "left" - base = 0 - freq_string = freq - loffset = pd.Timedelta(0) - # var sets for 'nagg': - else: - closed = "left" - label = "left" - seconds_total = pd.Timedelta(freq).total_seconds() - base = seconds_total / 2 - freq_string = str(int(seconds_total)) + "s" - loffset = pd.Timedelta(freq) / 2 - - # resampling the flags series with aggregation method - agg = lambda x: agg_method(x) if not x.empty else missing_flag - resampled = fdata.resample(freq_string, closed=closed, label=label, base=base, loffset=loffset) - # NOTE: breaks for non categorical flaggers - fdata = resampled.apply(agg).astype(flagger.dtype) + fdata = aggregate2Freq(fdata, method, agg_method, freq, fill_value=missing_flag) + fdata = fdata.astype(flagger.dtype) # some consistency clean up to ensure new flags frame matching new data frames size: if ref_index[0] != fdata.index[0]: diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index 2f847387b51b315f5550eaa27f57aaa4be069481..8a552423b88c914b152bc363af23d9d5c2ad36d6 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -138,7 +138,6 @@ def meanQC(data, max_nan_total=np.inf, max_nan_consec=np.inf): return np.nanmean(validationTrafo(data, max_nan_total, max_nan_consec)) - def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolation=False, return_chunk_bounds=False): """ The function interpolates nan-values (and nan-grids) in timeseries data. It can be passed all the method keywords @@ -228,6 +227,48 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio return data +def aggregate2Freq(data, method, agg_func, freq, fill_value=np.nan): + if method == "nagg": + # all values within a grid points range (+/- freq/2, closed to the left) get aggregated with 'agg method' + # some timestamp acrobatics to feed the base keyword properly + seconds_total = pd.Timedelta(freq).total_seconds() + freq_string = str(int(seconds_total)) + "s" + base = seconds_total / 2 + loffset = pd.Timedelta(freq) / 2 + label = 'left' + closed = 'left' + elif method == "bagg": + seconds_total = pd.Timedelta(freq).total_seconds() + freq_string = str(int(seconds_total)) + "s" + base = 0 + loffset = pd.Timedelta(0) + label = 'left' + closed = 'left' + # all values in a sampling interval get aggregated with agg_method and assigned to the last grid point + # if method is fagg + else: + # "fagg" + seconds_total = pd.Timedelta(freq).total_seconds() + freq_string = str(int(seconds_total)) + "s" + base = 0 + loffset = pd.Timedelta(0) + label = 'right' + closed = 'right' + # all values in a sampling interval get aggregated with agg_method and assigned to the next grid point + # some consistency cleanup: + # we check for empty intervals before resampling, because: + # - resample AND groupBy do insert value zero for empty intervals if resampling with any kind of "sum" - + # we want value nan + # - we are aggregating flags as well and empty intervals get BAD flag (which usually is not nan) + + empty_intervals = data.resample(freq_string, loffset=loffset, base=base, closed=closed, + label=label).count() == 0 + data = data.resample(freq_string, loffset=loffset, base=base, closed=closed, + label=label).apply(agg_func) + data[empty_intervals] = fill_value + + return data + def linearInterpolation(data, inter_limit=2): return interpolateNANs(data, 'time', inter_limit=inter_limit)