Skip to content
Snippets Groups Projects
Commit 14483ddc authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

moved aggregation core functionality from harmonization to ts_operators

parent a2cfdb8f
No related branches found
No related tags found
4 merge requests!193Release 1.4,!188Release 1.4,!49Dataprocessing features,!44Dataprocessing features
......@@ -9,7 +9,7 @@ import dios
from saqc.funcs.functions import flagMissing
from saqc.funcs.register import register
from saqc.lib.tools import toSequence, getFuncFromInput
from saqc.lib.ts_operators import interpolateNANs
from saqc.lib.ts_operators import interpolateNANs, aggregate2Freq
logger = logging.getLogger("SaQC")
......@@ -304,40 +304,7 @@ def _interpolateGrid(
# Aggregations:
if method in aggregations:
if method == "nagg":
# all values within a grid points range (+/- freq/2, closed to the left) get aggregated with 'agg method'
# some timestamp acrobatics to feed the base keyword properly
seconds_total = pd.Timedelta(freq).total_seconds()
freq_string = str(int(seconds_total)) + "s"
base = seconds_total / 2
loffset = pd.Timedelta(freq) / 2
label = 'left'
closed = 'left'
# calculate the series of aggregated values
elif method == "bagg":
seconds_total = pd.Timedelta(freq).total_seconds()
freq_string = str(int(seconds_total)) + "s"
base = 0
loffset = pd.Timedelta(0)
label = 'left'
closed = 'left'
# all values in a sampling interval get aggregated with agg_method and assigned to the last grid point
# if method is fagg
else:
seconds_total = pd.Timedelta(freq).total_seconds()
freq_string = str(int(seconds_total)) + "s"
base = 0
loffset = pd.Timedelta(0)
label = 'right'
closed = 'right'
# all values in a sampling interval get aggregated with agg_method and assigned to the next grid point
# some consistency cleanup:
# we are not trusting resamples interplay with sum and others - so we check for empty intervals:
to_nan = data.resample(freq_string, loffset=loffset, base=base, closed=closed,
label=label).count() == 0
data = data.resample(freq_string, loffset=loffset, base=base, closed=closed,
label=label).apply(agg_method)
data[to_nan] = np.nan
data = aggregate2Freq(data, method, agg_method, freq)
if total_range is None:
data = data.reindex(ref_index)
......@@ -492,33 +459,8 @@ def _reshapeFlags(
flagger_new = flagger_new.setFlags(field, flag=fdata, force=True, **kwargs)
elif method in aggregations:
# prepare resampling keywords
if method in ["fagg", "fagg_no_deharm"]:
closed = "right"
label = "right"
base = 0
freq_string = freq
loffset = pd.Timedelta(0)
elif method in ["bagg", "bagg_no_deharm"]:
closed = "left"
label = "left"
base = 0
freq_string = freq
loffset = pd.Timedelta(0)
# var sets for 'nagg':
else:
closed = "left"
label = "left"
seconds_total = pd.Timedelta(freq).total_seconds()
base = seconds_total / 2
freq_string = str(int(seconds_total)) + "s"
loffset = pd.Timedelta(freq) / 2
# resampling the flags series with aggregation method
agg = lambda x: agg_method(x) if not x.empty else missing_flag
resampled = fdata.resample(freq_string, closed=closed, label=label, base=base, loffset=loffset)
# NOTE: breaks for non categorical flaggers
fdata = resampled.apply(agg).astype(flagger.dtype)
fdata = aggregate2Freq(fdata, method, agg_method, freq, fill_value=missing_flag)
fdata = fdata.astype(flagger.dtype)
# some consistency clean up to ensure new flags frame matching new data frames size:
if ref_index[0] != fdata.index[0]:
......
......@@ -138,7 +138,6 @@ def meanQC(data, max_nan_total=np.inf, max_nan_consec=np.inf):
return np.nanmean(validationTrafo(data, max_nan_total, max_nan_consec))
def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolation=False, return_chunk_bounds=False):
"""
The function interpolates nan-values (and nan-grids) in timeseries data. It can be passed all the method keywords
......@@ -228,6 +227,48 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio
return data
def aggregate2Freq(data, method, agg_func, freq, fill_value=np.nan):
if method == "nagg":
# all values within a grid points range (+/- freq/2, closed to the left) get aggregated with 'agg method'
# some timestamp acrobatics to feed the base keyword properly
seconds_total = pd.Timedelta(freq).total_seconds()
freq_string = str(int(seconds_total)) + "s"
base = seconds_total / 2
loffset = pd.Timedelta(freq) / 2
label = 'left'
closed = 'left'
elif method == "bagg":
seconds_total = pd.Timedelta(freq).total_seconds()
freq_string = str(int(seconds_total)) + "s"
base = 0
loffset = pd.Timedelta(0)
label = 'left'
closed = 'left'
# all values in a sampling interval get aggregated with agg_method and assigned to the last grid point
# if method is fagg
else:
# "fagg"
seconds_total = pd.Timedelta(freq).total_seconds()
freq_string = str(int(seconds_total)) + "s"
base = 0
loffset = pd.Timedelta(0)
label = 'right'
closed = 'right'
# all values in a sampling interval get aggregated with agg_method and assigned to the next grid point
# some consistency cleanup:
# we check for empty intervals before resampling, because:
# - resample AND groupBy do insert value zero for empty intervals if resampling with any kind of "sum" -
# we want value nan
# - we are aggregating flags as well and empty intervals get BAD flag (which usually is not nan)
empty_intervals = data.resample(freq_string, loffset=loffset, base=base, closed=closed,
label=label).count() == 0
data = data.resample(freq_string, loffset=loffset, base=base, closed=closed,
label=label).apply(agg_func)
data[empty_intervals] = fill_value
return data
def linearInterpolation(data, inter_limit=2):
return interpolateNANs(data, 'time', inter_limit=inter_limit)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment