Skip to content
Snippets Groups Projects
Commit d92e0952 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

moved core of shifting methods from different places in harm to a single func in ts_operators

parent ff451ffd
No related branches found
No related tags found
3 merge requests!193Release 1.4,!188Release 1.4,!49Dataprocessing features
......@@ -9,7 +9,7 @@ import dios
from saqc.funcs.register import register
from saqc.lib.tools import toSequence, getFuncFromInput
from saqc.lib.ts_operators import interpolateNANs, aggregate2Freq
from saqc.lib.ts_operators import interpolateNANs, aggregate2Freq, shift2Freq
logger = logging.getLogger("SaQC")
......@@ -298,23 +298,9 @@ def _interpolateGrid(
if method in aggregations:
data = aggregate2Freq(data, method, agg_method, freq)
# Shifts
elif method in shifts:
if method == "fshift":
direction = "ffill"
tolerance = pd.Timedelta(freq)
elif method == "bshift":
direction = "bfill"
tolerance = pd.Timedelta(freq)
# if method = nshift
else:
direction = "nearest"
tolerance = pd.Timedelta(freq) / 2
ref_ind = _makeGrid(data.index[0], data.index[-1], freq, name=data.index.name)
data = data.reindex(ref_ind, method=direction, tolerance=tolerance)
data = shift2Freq(data, method, freq)
# Interpolations:
elif method in interpolations:
......@@ -418,23 +404,7 @@ def _reshapeFlags(
if method in shifts:
# forward/backward projection of every intervals last/first flag - rest will be dropped
if method == "fshift":
direction = "ffill"
tolerance = pd.Timedelta(freq)
elif method == "bshift":
direction = "bfill"
tolerance = pd.Timedelta(freq)
# varset for nshift
else:
direction = "nearest"
tolerance = pd.Timedelta(freq) / 2
# if you want to keep previous comments
# only newly generated missing flags get commented:
ref_ind = _makeGrid(fdata.index[0], fdata.index[-1], freq, name=fdata.index.name)
fdata = fdata.reindex(ref_ind, tolerance=tolerance, method=direction, fill_value=np.nan)
fdata = shift2Freq(fdata, method, freq)
flags[field] = fdata
flagger_new = flagger.initFlags(flags=flags)
......
......@@ -293,6 +293,26 @@ def aggregate2Freq(data, method, agg_func, freq, fill_value=np.nan, max_invalid_
return data
def shift2Freq(data, method, freq, fill_value=np.nan):
# Shifts
if method == "fshift":
direction = "ffill"
tolerance = pd.Timedelta(freq)
elif method == "bshift":
direction = "bfill"
tolerance = pd.Timedelta(freq)
else:
direction = "nearest"
tolerance = pd.Timedelta(freq) / 2
target_ind = pd.date_range(start=data.index[0].floor(freq), end=data.index[-1].ceil(freq),
freq=freq,
name=data.index.name)
return data.reindex(target_ind, method=direction, tolerance=tolerance, fill_value=fill_value)
def linearInterpolation(data, inter_limit=2):
return interpolateNANs(data, 'time', inter_limit=inter_limit)
......
......@@ -202,8 +202,6 @@ def test_harmSingleVarInterpolations(data, flagger, interpolation, freq):
data, flagger = harm_harmonize(
data, "data", flagger, freq, interpolation, "fshift", reshape_shift_comment=False, inter_agg="sum",
)
#import pdb
#pdb.set_trace()
if interpolation == "fshift":
if freq == "15min":
exp = pd.Series([np.nan, -37.5, -25.0, 0.0, 37.5, 50.0], index=test_index)
......@@ -284,9 +282,6 @@ def test_multivariatHarmonization(multi_data, flagger, shift_comment):
for c in multi_data.columns:
harm_start = multi_data[c].index[0].floor(freq=freq)
harm_end = multi_data[c].index[-1].ceil(freq=freq)
test_index = pd.date_range(start=harm_start, end=harm_end, freq=freq)
assert multi_data[c].index.equals(test_index)
assert pd.Timedelta(pd.infer_freq(multi_data[c].index)) == pd.Timedelta(freq)
multi_data, flagger = harm_deharmonize(multi_data, "data3", flagger, co_flagging=False)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment