Skip to content
Snippets Groups Projects
Commit ff451ffd authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

remved total_index/total_range/ref_index salad

parent 630a4cfd
No related branches found
No related tags found
3 merge requests!193Release 1.4,!188Release 1.4,!49Dataprocessing features
Pipeline #4177 passed with stage
in 7 minutes and 28 seconds
...@@ -6,7 +6,7 @@ import numpy as np ...@@ -6,7 +6,7 @@ import numpy as np
import logging import logging
import dios import dios
from saqc.funcs.functions import flagMissing
from saqc.funcs.register import register from saqc.funcs.register import register
from saqc.lib.tools import toSequence, getFuncFromInput from saqc.lib.tools import toSequence, getFuncFromInput
from saqc.lib.ts_operators import interpolateNANs, aggregate2Freq from saqc.lib.ts_operators import interpolateNANs, aggregate2Freq
...@@ -86,7 +86,6 @@ def harmWrapper(heap={}): ...@@ -86,7 +86,6 @@ def harmWrapper(heap={}):
method=inter_method, method=inter_method,
order=inter_order, order=inter_order,
agg_method=inter_agg, agg_method=inter_agg,
total_range=(dat_col.index[0], dat_col.index[-1]),
downcast_interpolation=inter_downcast, downcast_interpolation=inter_downcast,
) )
...@@ -94,7 +93,7 @@ def harmWrapper(heap={}): ...@@ -94,7 +93,7 @@ def harmWrapper(heap={}):
flagger_merged_clean_reshaped = _reshapeFlags( flagger_merged_clean_reshaped = _reshapeFlags(
flagger_merged_clean, flagger_merged_clean,
field, field,
ref_index=dat_col.index, freq=dat_col.index.freq,
method=reshape_method, method=reshape_method,
agg_method=reshape_agg, agg_method=reshape_agg,
missing_flag=reshape_missing_flag, missing_flag=reshape_missing_flag,
...@@ -221,7 +220,7 @@ def _insertGrid(data, freq): ...@@ -221,7 +220,7 @@ def _insertGrid(data, freq):
def _interpolateGrid( def _interpolateGrid(
data, freq, method, order=1, agg_method=sum, total_range=None, downcast_interpolation=False, data, freq, method, order=1, agg_method=sum, downcast_interpolation=False,
): ):
"""The function calculates grid point values for a passed pd.Series (['data']) by applying """The function calculates grid point values for a passed pd.Series (['data']) by applying
the selected interpolation/fill method. (passed to key word 'method'). The interpolation will apply for grid points the selected interpolation/fill method. (passed to key word 'method'). The interpolation will apply for grid points
...@@ -270,10 +269,6 @@ def _interpolateGrid( ...@@ -270,10 +269,6 @@ def _interpolateGrid(
:param agg_method: Func. Default = sum. If an aggregation method is selected for grid point filling, :param agg_method: Func. Default = sum. If an aggregation method is selected for grid point filling,
you need to pass the aggregation method to this very parameter. Note that it should be able you need to pass the aggregation method to this very parameter. Note that it should be able
to handle empty argument series passed as well as np.nan passed. to handle empty argument series passed as well as np.nan passed.
:param total_range 2-Tuple of pandas Timestamps.
The total range of all the data in the Dataframe that is currently processed. If not
None, the resulting harmonization grid of the current data column will range over the total
Data-range. This ensures not having nan-entries in the flags dataframe after harmonization.
:return: dios.DictOfSeries. ['data']. :return: dios.DictOfSeries. ['data'].
""" """
...@@ -298,15 +293,11 @@ def _interpolateGrid( ...@@ -298,15 +293,11 @@ def _interpolateGrid(
"akima", "akima",
] ]
data = data.copy() data = data.copy()
ref_index = _makeGrid(data.index[0], data.index[-1], freq, name=data.index.name)
if total_range is not None:
total_index = _makeGrid(total_range[0], total_range[1], freq, name=data.index.name)
# Aggregations: # Aggregations:
if method in aggregations: if method in aggregations:
data = aggregate2Freq(data, method, agg_method, freq) data = aggregate2Freq(data, method, agg_method, freq)
if total_range is None:
data = data.reindex(ref_index)
# Shifts # Shifts
elif method in shifts: elif method in shifts:
...@@ -322,7 +313,8 @@ def _interpolateGrid( ...@@ -322,7 +313,8 @@ def _interpolateGrid(
direction = "nearest" direction = "nearest"
tolerance = pd.Timedelta(freq) / 2 tolerance = pd.Timedelta(freq) / 2
data = data.reindex(ref_index, method=direction, tolerance=tolerance) ref_ind = _makeGrid(data.index[0], data.index[-1], freq, name=data.index.name)
data = data.reindex(ref_ind, method=direction, tolerance=tolerance)
# Interpolations: # Interpolations:
elif method in interpolations: elif method in interpolations:
...@@ -346,16 +338,12 @@ def _interpolateGrid( ...@@ -346,16 +338,12 @@ def _interpolateGrid(
# exclude falsely interpolated values: # exclude falsely interpolated values:
data[spec_case_mask.index] = np.nan data[spec_case_mask.index] = np.nan
data = data.asfreq(freq)
if total_range is None:
data = data.asfreq(freq, fill_value=np.nan)
else: else:
methods = "\n".join([", ".join(shifts), ", ".join(aggregations), ", ".join(interpolations)]) methods = "\n".join([", ".join(shifts), ", ".join(aggregations), ", ".join(interpolations)])
raise ValueError(f"Unknown interpolation method: '{method}', please select from:\n{methods}") raise ValueError(f"Unknown interpolation method: '{method}', please select from:\n{methods}")
if total_range is not None:
data = data.reindex(total_index)
return data, chunk_bounds return data, chunk_bounds
...@@ -363,7 +351,7 @@ def _interpolateGrid( ...@@ -363,7 +351,7 @@ def _interpolateGrid(
def _reshapeFlags( def _reshapeFlags(
flagger, flagger,
field, field,
ref_index, freq,
method="fshift", method="fshift",
agg_method=max, agg_method=max,
missing_flag=None, missing_flag=None,
...@@ -425,10 +413,6 @@ def _reshapeFlags( ...@@ -425,10 +413,6 @@ def _reshapeFlags(
] ]
shifts = ["fshift", "bshift", "nshift"] shifts = ["fshift", "bshift", "nshift"]
freq = ref_index.freq
# fixme: NOTE: now with dios we just work on the series in question and leave
# other indexes untouched...
flags = flagger.getFlags() flags = flagger.getFlags()
fdata = flags[field] fdata = flags[field]
...@@ -449,7 +433,8 @@ def _reshapeFlags( ...@@ -449,7 +433,8 @@ def _reshapeFlags(
# if you want to keep previous comments # if you want to keep previous comments
# only newly generated missing flags get commented: # only newly generated missing flags get commented:
fdata = fdata.reindex(ref_index, tolerance=tolerance, method=direction, fill_value=np.nan) ref_ind = _makeGrid(fdata.index[0], fdata.index[-1], freq, name=fdata.index.name)
fdata = fdata.reindex(ref_ind, tolerance=tolerance, method=direction, fill_value=np.nan)
flags[field] = fdata flags[field] = fdata
flagger_new = flagger.initFlags(flags=flags) flagger_new = flagger.initFlags(flags=flags)
...@@ -462,12 +447,6 @@ def _reshapeFlags( ...@@ -462,12 +447,6 @@ def _reshapeFlags(
fdata = aggregate2Freq(fdata, method, agg_method, freq, fill_value=missing_flag) fdata = aggregate2Freq(fdata, method, agg_method, freq, fill_value=missing_flag)
fdata = fdata.astype(flagger.dtype) fdata = fdata.astype(flagger.dtype)
# some consistency clean up to ensure new flags frame matching new data frames size:
if ref_index[0] != fdata.index[0]:
fdata = pd.Series(data=flagger.BAD, index=[ref_index[0]]).astype(flagger.dtype).append(fdata)
if ref_index[-1] != fdata.index[-1]:
fdata = fdata.append(pd.Series(data=flagger.BAD, index=[ref_index[-1]]).astype(flagger.dtype))
# block flagging/backtracking of chunk_starts/chunk_ends # block flagging/backtracking of chunk_starts/chunk_ends
if block_flags is not None: if block_flags is not None:
fdata[block_flags] = np.nan fdata[block_flags] = np.nan
......
...@@ -233,8 +233,6 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio ...@@ -233,8 +233,6 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio
def aggregate2Freq(data, method, agg_func, freq, fill_value=np.nan, max_invalid_total=None, max_invalid_consec=None): def aggregate2Freq(data, method, agg_func, freq, fill_value=np.nan, max_invalid_total=None, max_invalid_consec=None):
# filter data for invalid patterns # filter data for invalid patterns
#import pdb
#pdb.set_trace()
if (max_invalid_total is not None) | (max_invalid_consec is not None): if (max_invalid_total is not None) | (max_invalid_consec is not None):
if not max_invalid_total: if not max_invalid_total:
max_invalid_total = np.inf max_invalid_total = np.inf
......
...@@ -202,7 +202,8 @@ def test_harmSingleVarInterpolations(data, flagger, interpolation, freq): ...@@ -202,7 +202,8 @@ def test_harmSingleVarInterpolations(data, flagger, interpolation, freq):
data, flagger = harm_harmonize( data, flagger = harm_harmonize(
data, "data", flagger, freq, interpolation, "fshift", reshape_shift_comment=False, inter_agg="sum", data, "data", flagger, freq, interpolation, "fshift", reshape_shift_comment=False, inter_agg="sum",
) )
#import pdb
#pdb.set_trace()
if interpolation == "fshift": if interpolation == "fshift":
if freq == "15min": if freq == "15min":
exp = pd.Series([np.nan, -37.5, -25.0, 0.0, 37.5, 50.0], index=test_index) exp = pd.Series([np.nan, -37.5, -25.0, 0.0, 37.5, 50.0], index=test_index)
...@@ -226,10 +227,10 @@ def test_harmSingleVarInterpolations(data, flagger, interpolation, freq): ...@@ -226,10 +227,10 @@ def test_harmSingleVarInterpolations(data, flagger, interpolation, freq):
assert data[field].equals(exp) assert data[field].equals(exp)
if interpolation == "nagg": if interpolation == "nagg":
if freq == "15min": if freq == "15min":
exp = pd.Series([np.nan, -87.5, -25.0, 0.0, 37.5, 50.0], index=test_index) exp = pd.Series([-87.5, -25.0, 0.0, 37.5, 50.0], index=test_index[1:])
assert data[field].equals(exp) assert data[field].equals(exp)
if freq == "30min": if freq == "30min":
exp = pd.Series([np.nan, -87.5, -25.0, 87.5], index=test_index) exp = pd.Series([-87.5, -25.0, 87.5], index=test_index[1:])
assert data[field].equals(exp) assert data[field].equals(exp)
if interpolation == "bagg": if interpolation == "bagg":
if freq == "15min": if freq == "15min":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment