diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index 9799df41664494eb0264fed2eaf18bcc80d1285c..bc7f2b9452ff7e97bb2dcedc016e0c3154cf4dd4 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -6,7 +6,7 @@ import numpy as np import logging import dios -from saqc.funcs.functions import flagMissing + from saqc.funcs.register import register from saqc.lib.tools import toSequence, getFuncFromInput from saqc.lib.ts_operators import interpolateNANs, aggregate2Freq @@ -86,7 +86,6 @@ def harmWrapper(heap={}): method=inter_method, order=inter_order, agg_method=inter_agg, - total_range=(dat_col.index[0], dat_col.index[-1]), downcast_interpolation=inter_downcast, ) @@ -94,7 +93,7 @@ def harmWrapper(heap={}): flagger_merged_clean_reshaped = _reshapeFlags( flagger_merged_clean, field, - ref_index=dat_col.index, + freq=dat_col.index.freq, method=reshape_method, agg_method=reshape_agg, missing_flag=reshape_missing_flag, @@ -221,7 +220,7 @@ def _insertGrid(data, freq): def _interpolateGrid( - data, freq, method, order=1, agg_method=sum, total_range=None, downcast_interpolation=False, + data, freq, method, order=1, agg_method=sum, downcast_interpolation=False, ): """The function calculates grid point values for a passed pd.Series (['data']) by applying the selected interpolation/fill method. (passed to key word 'method'). The interpolation will apply for grid points @@ -270,10 +269,6 @@ def _interpolateGrid( :param agg_method: Func. Default = sum. If an aggregation method is selected for grid point filling, you need to pass the aggregation method to this very parameter. Note that it should be able to handle empty argument series passed as well as np.nan passed. - :param total_range 2-Tuple of pandas Timestamps. - The total range of all the data in the Dataframe that is currently processed. If not - None, the resulting harmonization grid of the current data column will range over the total - Data-range. This ensures not having nan-entries in the flags dataframe after harmonization. :return: dios.DictOfSeries. ['data']. """ @@ -298,15 +293,11 @@ def _interpolateGrid( "akima", ] data = data.copy() - ref_index = _makeGrid(data.index[0], data.index[-1], freq, name=data.index.name) - if total_range is not None: - total_index = _makeGrid(total_range[0], total_range[1], freq, name=data.index.name) # Aggregations: if method in aggregations: data = aggregate2Freq(data, method, agg_method, freq) - if total_range is None: - data = data.reindex(ref_index) + # Shifts elif method in shifts: @@ -322,7 +313,8 @@ def _interpolateGrid( direction = "nearest" tolerance = pd.Timedelta(freq) / 2 - data = data.reindex(ref_index, method=direction, tolerance=tolerance) + ref_ind = _makeGrid(data.index[0], data.index[-1], freq, name=data.index.name) + data = data.reindex(ref_ind, method=direction, tolerance=tolerance) # Interpolations: elif method in interpolations: @@ -346,16 +338,12 @@ def _interpolateGrid( # exclude falsely interpolated values: data[spec_case_mask.index] = np.nan - - if total_range is None: - data = data.asfreq(freq, fill_value=np.nan) + data = data.asfreq(freq) else: methods = "\n".join([", ".join(shifts), ", ".join(aggregations), ", ".join(interpolations)]) raise ValueError(f"Unknown interpolation method: '{method}', please select from:\n{methods}") - if total_range is not None: - data = data.reindex(total_index) return data, chunk_bounds @@ -363,7 +351,7 @@ def _interpolateGrid( def _reshapeFlags( flagger, field, - ref_index, + freq, method="fshift", agg_method=max, missing_flag=None, @@ -425,10 +413,6 @@ def _reshapeFlags( ] shifts = ["fshift", "bshift", "nshift"] - freq = ref_index.freq - - # fixme: NOTE: now with dios we just work on the series in question and leave - # other indexes untouched... flags = flagger.getFlags() fdata = flags[field] @@ -449,7 +433,8 @@ def _reshapeFlags( # if you want to keep previous comments # only newly generated missing flags get commented: - fdata = fdata.reindex(ref_index, tolerance=tolerance, method=direction, fill_value=np.nan) + ref_ind = _makeGrid(fdata.index[0], fdata.index[-1], freq, name=fdata.index.name) + fdata = fdata.reindex(ref_ind, tolerance=tolerance, method=direction, fill_value=np.nan) flags[field] = fdata flagger_new = flagger.initFlags(flags=flags) @@ -462,12 +447,6 @@ def _reshapeFlags( fdata = aggregate2Freq(fdata, method, agg_method, freq, fill_value=missing_flag) fdata = fdata.astype(flagger.dtype) - # some consistency clean up to ensure new flags frame matching new data frames size: - if ref_index[0] != fdata.index[0]: - fdata = pd.Series(data=flagger.BAD, index=[ref_index[0]]).astype(flagger.dtype).append(fdata) - if ref_index[-1] != fdata.index[-1]: - fdata = fdata.append(pd.Series(data=flagger.BAD, index=[ref_index[-1]]).astype(flagger.dtype)) - # block flagging/backtracking of chunk_starts/chunk_ends if block_flags is not None: fdata[block_flags] = np.nan diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index f2f9d2ecd255916e365d9d7ceb170e5f1fc897a7..a4c21d6a41d91301025ce3715c9f2239f0f500cd 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -233,8 +233,6 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio def aggregate2Freq(data, method, agg_func, freq, fill_value=np.nan, max_invalid_total=None, max_invalid_consec=None): # filter data for invalid patterns - #import pdb - #pdb.set_trace() if (max_invalid_total is not None) | (max_invalid_consec is not None): if not max_invalid_total: max_invalid_total = np.inf diff --git a/test/funcs/test_harm_funcs.py b/test/funcs/test_harm_funcs.py index 9fb00325df0910ce13476813fff782dfb51b5629..3d30cd5070e8848d8be2966bce8bab3cbbc82c82 100644 --- a/test/funcs/test_harm_funcs.py +++ b/test/funcs/test_harm_funcs.py @@ -202,7 +202,8 @@ def test_harmSingleVarInterpolations(data, flagger, interpolation, freq): data, flagger = harm_harmonize( data, "data", flagger, freq, interpolation, "fshift", reshape_shift_comment=False, inter_agg="sum", ) - + #import pdb + #pdb.set_trace() if interpolation == "fshift": if freq == "15min": exp = pd.Series([np.nan, -37.5, -25.0, 0.0, 37.5, 50.0], index=test_index) @@ -226,10 +227,10 @@ def test_harmSingleVarInterpolations(data, flagger, interpolation, freq): assert data[field].equals(exp) if interpolation == "nagg": if freq == "15min": - exp = pd.Series([np.nan, -87.5, -25.0, 0.0, 37.5, 50.0], index=test_index) + exp = pd.Series([-87.5, -25.0, 0.0, 37.5, 50.0], index=test_index[1:]) assert data[field].equals(exp) if freq == "30min": - exp = pd.Series([np.nan, -87.5, -25.0, 87.5], index=test_index) + exp = pd.Series([-87.5, -25.0, 87.5], index=test_index[1:]) assert data[field].equals(exp) if interpolation == "bagg": if freq == "15min":