From 3049d3a90166831f94cc6bc2c2e4beecdbf9c156 Mon Sep 17 00:00:00 2001 From: Peter Luenenschloss <peter.luenenschloss@ufz.de> Date: Tue, 19 May 2020 14:39:33 +0200 Subject: [PATCH] smll bfx --- saqc/funcs/proc_functions.py | 56 +++++++++---------------------- saqc/lib/ts_operators.py | 25 ++++++++++---- test/funcs/test_proc_functions.py | 2 +- 3 files changed, 35 insertions(+), 48 deletions(-) diff --git a/saqc/funcs/proc_functions.py b/saqc/funcs/proc_functions.py index d6a1f2cf8..079fedc4e 100644 --- a/saqc/funcs/proc_functions.py +++ b/saqc/funcs/proc_functions.py @@ -4,7 +4,7 @@ import pandas as pd import numpy as np from saqc.funcs.register import register -from saqc.lib.ts_operators import interpolateNANs, validationTrafo +from saqc.lib.ts_operators import interpolateNANs, validationTrafo, aggregate2Freq from saqc.lib.tools import composeFunction, toSequence @@ -37,51 +37,27 @@ def proc_interpolateMissing(data, field, flagger, method, inter_order=2, inter_l @register() -def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total=None, max_invalid_consec=None, - flag_agg_func='max', **kwargs): +def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total_d=None, max_invalid_consec_d=None, + max_invalid_consec_f=None, max_invalid_total_f=None, flag_agg_func='max', method='bagg', **kwargs): + data = data.copy() datcol = data[field] - d_start = datcol.index[0].floor(freq) - d_end = datcol.index[-1].ceil(freq) - - # filter data for invalid patterns - if (max_invalid_total is not None) | (max_invalid_consec is not None): - if not max_invalid_total: - max_invalid_total = np.inf - if not max_invalid_consec: - max_invalid_consec = np.inf - - datcol = datcol.groupby(pd.Grouper(freq=freq)).transform(validationTrafo, max_nan_total=max_invalid_total, - max_nan_consec=max_invalid_consec) - nanmask = np.isnan(datcol) - datcol = datcol[~nanmask] - datflags = flagger.getFlags()[field] - datflags = datflags[~nanmask] - datresampler = datcol.resample(freq) - flagsresampler = datflags.resample(freq) - - # data resampling: - try: - datcol = getattr(datresampler, func)() - except AttributeError: - func = composeFunction(func) - datcol = datresampler.apply(func) + flagscol = flagger.getFlags(field) + + func = composeFunction(func) + flag_agg_func = composeFunction(flag_agg_func) + + # data resampling + datcol = aggregate2Freq(datcol, method, agg_func=func, freq=freq, fill_value=np.nan, + max_invalid_total=max_invalid_total_d, max_invalid_consec=max_invalid_consec_d) # flags resampling: - try: - datflags = getattr(flagsresampler, flag_agg_func)() - except AttributeError: - flag_agg_func = composeFunction(flag_agg_func) - datflags = flagsresampler.apply(flag_agg_func) - - # insert freqgrid (for consistency reasons -> in above step, start and ending chunks can get lost due to invalid - # intervals): - grid = pd.date_range(d_start, d_end, freq=freq) - datcol = datcol.reindex(grid) - datflags = datflags.reindex(grid) + flagscol = aggregate2Freq(flagscol, method, agg_func=flag_agg_func, freq=freq, fill_value=flagger.BAD, + max_invalid_total=max_invalid_total_f, max_invalid_consec=max_invalid_consec_f) + # data/flags reshaping: data[field] = datcol - reshape_flagger = flagger.initFlags(datcol).setFlags(field, flag=datflags, force=True, **kwargs) + reshape_flagger = flagger.initFlags(datcol).setFlags(field, flag=flagscol, force=True, **kwargs) flagger = flagger.getFlagger(drop=field).setFlagger(reshape_flagger) return data, flagger diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index a47d2206d..32039555a 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -7,6 +7,7 @@ import numba as nb import math from sklearn.neighbors import NearestNeighbors from scipy.stats import iqr +#from saqc.lib.tools import composeFunction import logging logger = logging.getLogger("SaQC") @@ -129,15 +130,15 @@ def validationTrafo(data, max_nan_total, max_nan_consec): def stdQC(data, max_nan_total=np.inf, max_nan_consec=np.inf): - return np.nanstd(validationTrafo(data, max_nan_total, max_nan_consec), ddof=1) + return np.nanstd(data[~validationTrafo(data.isna(), max_nan_total, max_nan_consec)], ddof=1) def varQC(data, max_nan_total=np.inf, max_nan_consec=np.inf): - return np.nanvar(validationTrafo(data, max_nan_total, max_nan_consec), ddof=1) + return np.nanvar(data[~validationTrafo(data.isna(), max_nan_total, max_nan_consec)], ddof=1) def meanQC(data, max_nan_total=np.inf, max_nan_consec=np.inf): - return np.nanmean(validationTrafo(data, max_nan_total, max_nan_consec)) + return np.nanmean(data[~validationTrafo(data.isna(), max_nan_total, max_nan_consec)]) def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolation=False, return_chunk_bounds=False): @@ -232,14 +233,19 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio def aggregate2Freq(data, method, agg_func, freq, fill_value=np.nan, max_invalid_total=None, max_invalid_consec=None): # filter data for invalid patterns + #import pdb + #pdb.set_trace() if (max_invalid_total is not None) | (max_invalid_consec is not None): if not max_invalid_total: max_invalid_total = np.inf if not max_invalid_consec: max_invalid_consec = np.inf - temp_mask = (data == fill_value) - temp_mask.groupby(pd.Grouper(freq=freq)).transform(validationTrafo, max_nan_total=max_invalid_total, + if pd.isnull(fill_value): + temp_mask = (data.isna()) + else: + temp_mask = (data == fill_value) + temp_mask = temp_mask.groupby(pd.Grouper(freq=freq)).transform(validationTrafo, max_nan_total=max_invalid_total, max_nan_consec=max_invalid_consec) data[temp_mask] = fill_value @@ -278,12 +284,17 @@ def aggregate2Freq(data, method, agg_func, freq, fill_value=np.nan, max_invalid_ empty_intervals = data.resample(freq_string, loffset=loffset, base=base, closed=closed, label=label).count() == 0 - data = data.resample(freq_string, loffset=loffset, base=base, closed=closed, - label=label).apply(agg_func) + + dataresampler = data.resample(freq_string, loffset=loffset, base=base, closed=closed, + label=label) + + data = dataresampler.apply(agg_func) + data[empty_intervals] = fill_value return data + def linearInterpolation(data, inter_limit=2): return interpolateNANs(data, 'time', inter_limit=inter_limit) diff --git a/test/funcs/test_proc_functions.py b/test/funcs/test_proc_functions.py index 5b9ef99c7..2d76f4dd9 100644 --- a/test/funcs/test_proc_functions.py +++ b/test/funcs/test_proc_functions.py @@ -53,7 +53,7 @@ def test_resample(course_5, flagger): field = data.columns[0] data = dios.DictOfSeries(data) flagger = flagger.initFlags(data) - data1, *_ = proc_resample(data, field, flagger, '10min', 'mean', max_invalid_total=2, max_invalid_consec=1) + data1, *_ = proc_resample(data, field, flagger, '10min', 'mean', max_invalid_total_d=2, max_invalid_consec_d=1) assert ~np.isnan(data1[field].iloc[0]) assert np.isnan(data1[field].iloc[1]) assert np.isnan(data1[field].iloc[2]) \ No newline at end of file -- GitLab