From ff451ffdf03bd175c9a0e994cc05bf51381ab387 Mon Sep 17 00:00:00 2001 From: Peter Luenenschloss <peter.luenenschloss@ufz.de> Date: Wed, 20 May 2020 12:36:17 +0200 Subject: [PATCH] remved total_index/total_range/ref_index salad --- saqc/funcs/harm_functions.py | 41 +++++++++-------------------------- saqc/lib/ts_operators.py | 2 -- test/funcs/test_harm_funcs.py | 7 +++--- 3 files changed, 14 insertions(+), 36 deletions(-) diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index 9799df416..bc7f2b945 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -6,7 +6,7 @@ import numpy as np import logging import dios -from saqc.funcs.functions import flagMissing + from saqc.funcs.register import register from saqc.lib.tools import toSequence, getFuncFromInput from saqc.lib.ts_operators import interpolateNANs, aggregate2Freq @@ -86,7 +86,6 @@ def harmWrapper(heap={}): method=inter_method, order=inter_order, agg_method=inter_agg, - total_range=(dat_col.index[0], dat_col.index[-1]), downcast_interpolation=inter_downcast, ) @@ -94,7 +93,7 @@ def harmWrapper(heap={}): flagger_merged_clean_reshaped = _reshapeFlags( flagger_merged_clean, field, - ref_index=dat_col.index, + freq=dat_col.index.freq, method=reshape_method, agg_method=reshape_agg, missing_flag=reshape_missing_flag, @@ -221,7 +220,7 @@ def _insertGrid(data, freq): def _interpolateGrid( - data, freq, method, order=1, agg_method=sum, total_range=None, downcast_interpolation=False, + data, freq, method, order=1, agg_method=sum, downcast_interpolation=False, ): """The function calculates grid point values for a passed pd.Series (['data']) by applying the selected interpolation/fill method. (passed to key word 'method'). The interpolation will apply for grid points @@ -270,10 +269,6 @@ def _interpolateGrid( :param agg_method: Func. Default = sum. If an aggregation method is selected for grid point filling, you need to pass the aggregation method to this very parameter. Note that it should be able to handle empty argument series passed as well as np.nan passed. - :param total_range 2-Tuple of pandas Timestamps. - The total range of all the data in the Dataframe that is currently processed. If not - None, the resulting harmonization grid of the current data column will range over the total - Data-range. This ensures not having nan-entries in the flags dataframe after harmonization. :return: dios.DictOfSeries. ['data']. """ @@ -298,15 +293,11 @@ def _interpolateGrid( "akima", ] data = data.copy() - ref_index = _makeGrid(data.index[0], data.index[-1], freq, name=data.index.name) - if total_range is not None: - total_index = _makeGrid(total_range[0], total_range[1], freq, name=data.index.name) # Aggregations: if method in aggregations: data = aggregate2Freq(data, method, agg_method, freq) - if total_range is None: - data = data.reindex(ref_index) + # Shifts elif method in shifts: @@ -322,7 +313,8 @@ def _interpolateGrid( direction = "nearest" tolerance = pd.Timedelta(freq) / 2 - data = data.reindex(ref_index, method=direction, tolerance=tolerance) + ref_ind = _makeGrid(data.index[0], data.index[-1], freq, name=data.index.name) + data = data.reindex(ref_ind, method=direction, tolerance=tolerance) # Interpolations: elif method in interpolations: @@ -346,16 +338,12 @@ def _interpolateGrid( # exclude falsely interpolated values: data[spec_case_mask.index] = np.nan - - if total_range is None: - data = data.asfreq(freq, fill_value=np.nan) + data = data.asfreq(freq) else: methods = "\n".join([", ".join(shifts), ", ".join(aggregations), ", ".join(interpolations)]) raise ValueError(f"Unknown interpolation method: '{method}', please select from:\n{methods}") - if total_range is not None: - data = data.reindex(total_index) return data, chunk_bounds @@ -363,7 +351,7 @@ def _interpolateGrid( def _reshapeFlags( flagger, field, - ref_index, + freq, method="fshift", agg_method=max, missing_flag=None, @@ -425,10 +413,6 @@ def _reshapeFlags( ] shifts = ["fshift", "bshift", "nshift"] - freq = ref_index.freq - - # fixme: NOTE: now with dios we just work on the series in question and leave - # other indexes untouched... flags = flagger.getFlags() fdata = flags[field] @@ -449,7 +433,8 @@ def _reshapeFlags( # if you want to keep previous comments # only newly generated missing flags get commented: - fdata = fdata.reindex(ref_index, tolerance=tolerance, method=direction, fill_value=np.nan) + ref_ind = _makeGrid(fdata.index[0], fdata.index[-1], freq, name=fdata.index.name) + fdata = fdata.reindex(ref_ind, tolerance=tolerance, method=direction, fill_value=np.nan) flags[field] = fdata flagger_new = flagger.initFlags(flags=flags) @@ -462,12 +447,6 @@ def _reshapeFlags( fdata = aggregate2Freq(fdata, method, agg_method, freq, fill_value=missing_flag) fdata = fdata.astype(flagger.dtype) - # some consistency clean up to ensure new flags frame matching new data frames size: - if ref_index[0] != fdata.index[0]: - fdata = pd.Series(data=flagger.BAD, index=[ref_index[0]]).astype(flagger.dtype).append(fdata) - if ref_index[-1] != fdata.index[-1]: - fdata = fdata.append(pd.Series(data=flagger.BAD, index=[ref_index[-1]]).astype(flagger.dtype)) - # block flagging/backtracking of chunk_starts/chunk_ends if block_flags is not None: fdata[block_flags] = np.nan diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index f2f9d2ecd..a4c21d6a4 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -233,8 +233,6 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio def aggregate2Freq(data, method, agg_func, freq, fill_value=np.nan, max_invalid_total=None, max_invalid_consec=None): # filter data for invalid patterns - #import pdb - #pdb.set_trace() if (max_invalid_total is not None) | (max_invalid_consec is not None): if not max_invalid_total: max_invalid_total = np.inf diff --git a/test/funcs/test_harm_funcs.py b/test/funcs/test_harm_funcs.py index 9fb00325d..3d30cd507 100644 --- a/test/funcs/test_harm_funcs.py +++ b/test/funcs/test_harm_funcs.py @@ -202,7 +202,8 @@ def test_harmSingleVarInterpolations(data, flagger, interpolation, freq): data, flagger = harm_harmonize( data, "data", flagger, freq, interpolation, "fshift", reshape_shift_comment=False, inter_agg="sum", ) - + #import pdb + #pdb.set_trace() if interpolation == "fshift": if freq == "15min": exp = pd.Series([np.nan, -37.5, -25.0, 0.0, 37.5, 50.0], index=test_index) @@ -226,10 +227,10 @@ def test_harmSingleVarInterpolations(data, flagger, interpolation, freq): assert data[field].equals(exp) if interpolation == "nagg": if freq == "15min": - exp = pd.Series([np.nan, -87.5, -25.0, 0.0, 37.5, 50.0], index=test_index) + exp = pd.Series([-87.5, -25.0, 0.0, 37.5, 50.0], index=test_index[1:]) assert data[field].equals(exp) if freq == "30min": - exp = pd.Series([np.nan, -87.5, -25.0, 87.5], index=test_index) + exp = pd.Series([-87.5, -25.0, 87.5], index=test_index[1:]) assert data[field].equals(exp) if interpolation == "bagg": if freq == "15min": -- GitLab