Skip to content
Snippets Groups Projects
Commit a53913c5 authored by David Schäfer's avatar David Schäfer
Browse files

Merge branch 'ts_mods' into 'develop'

ts_operators suggestions

See merge request !51
parents 801254f5 b592334e
No related branches found
No related tags found
3 merge requests!193Release 1.4,!188Release 1.4,!51ts_operators suggestions
Pipeline #6044 passed with stage
in 16 minutes and 15 seconds
......@@ -6,6 +6,8 @@ The module gathers all kinds of timeseries tranformations.
"""
import logging
import re
import pandas as pd
import numpy as np
import numba as nb
......@@ -163,13 +165,12 @@ def validationTrafo(data, max_nan_total, max_nan_consec):
return data
elif _maxConsecutiveNan(np.asarray(data), max_nan_consec):
data[:] = False
return data
else:
data[:] = True
return data
else:
data[:] = True
return data
return data
def stdQC(data, max_nan_total=np.inf, max_nan_consec=np.inf):
......@@ -248,10 +249,8 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio
return x.interpolate(method=wrap_method, order=int(wrap_order))
except (NotImplementedError, ValueError):
logger.warning(
"Interpolation with method {} is not supported at order {}. "
"Interpolation will be performed at order {}".format(
method, str(wrap_order), str(wrap_order - 1)
)
f"Interpolation with method {method} is not supported at order {wrap_order}. "
f"and will be performed at order {wrap_order-1}"
)
return _interpolWrapper(x, int(wrap_order - 1), wrap_method)
elif x.size < 3:
......@@ -269,8 +268,7 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio
data = data.reindex(pre_index)
if return_chunk_bounds:
return data, chunk_bounds
else:
return data
else: return data
def aggregate2Freq(
......@@ -280,6 +278,12 @@ def aggregate2Freq(
# Timestamps that have no values projected on them, get "fill_value" assigned. Also,
# "fill_value" serves as replacement for "invalid" intervals
methods = {
"nagg": lambda seconds_total: (seconds_total/2, "left", "left"),
"bagg": lambda _: (0, "left", "left"),
"fagg": lambda _: (0, "right", "right"),
}
# filter data for invalid patterns (since filtering is expensive we pre-check if it is demanded)
if (max_invalid_total is not np.inf) | (max_invalid_consec is not np.inf):
if pd.isnull(fill_value):
......@@ -292,24 +296,8 @@ def aggregate2Freq(
)
data[temp_mask] = fill_value
# some timestamp acrobatics to feed pd.resample`s base keyword properly
seconds_total = pd.Timedelta(freq).total_seconds()
freq_string = str(int(seconds_total)) + "s"
if method == "nagg":
# all values within a grid points range (+/- freq/2, closed to the left) get aggregated with 'agg method'
base = seconds_total / 2
label = "left"
closed = "left"
elif method == "bagg":
# all values in a sampling interval get aggregated with agg_method and assigned to the last grid point
base = 0
label = "left"
closed = "left"
else:
# all values in a sampling interval get aggregated with agg_method and assigned to the next grid point
base = 0
label = "right"
closed = "right"
base, label, closed = methods[method](seconds_total)
# In the following, we check for empty intervals outside resample.apply, because:
# - resample AND groupBy do insert value zero for empty intervals if resampling with any kind of "sum" application -
......@@ -317,23 +305,16 @@ def aggregate2Freq(
# - we are aggregating data and flags with this function and empty intervals usually would get assigned flagger.BAD
# flag (where resample inserts np.nan or 0)
data_resampler = data.resample(freq_string, base=base, closed=closed, label=label)
data_resampler = data.resample(f"{seconds_total:.0f}s", base=base, closed=closed, label=label)
empty_intervals = data_resampler.count() == 0
# great performance gain can be achieved, when avoiding .apply and using pd.resampler
# methods instead. (this covers all the basic func aggregations, such as median, mean, sum, count, ...)
try:
# get rid of nan_prefix attached to numpys nanfuncs ("ignore nan is pointless down here -
# resample doesnt pass no nans to the func applied)
if agg_func.__name__[:3] == "nan":
check_name = agg_func.__name__[3:]
else:
check_name = agg_func.__name__
# another nasty special case: if function "count" was passed, we not want empty intervals to be replaced by nan:
if check_name == "count":
check_name = re.sub("^nan", "", agg_func.__name__)
# a nasty special case: if function "count" was passed, we not want empty intervals to be replaced by nan:
if check_name == 'count':
empty_intervals[:] = False
data = getattr(data_resampler, check_name)()
except AttributeError:
data = data_resampler.apply(agg_func)
......@@ -352,26 +333,16 @@ def shift2Freq(data, method, freq, fill_value=np.nan):
# shift timestamps backwards/forwards in order to allign them with an equidistant
# frequencie grid.
# Shifts
if method == "fshift":
direction = "ffill"
tolerance = pd.Timedelta(freq)
elif method == "bshift":
direction = "bfill"
tolerance = pd.Timedelta(freq)
elif method == "nshift":
direction = "nearest"
tolerance = pd.Timedelta(freq) / 2
else:
# method == nearest2
direction = "nearest"
tolerance = pd.Timedelta(freq)
methods = {
"fshift": lambda freq: ("ffill", pd.Timedelta(freq)),
"bshift": lambda freq: ("bfill", pd.Timedelta(freq)),
"nshift": lambda freq: ("nearest", pd.Timedelta(freq)/2),
}
direction, tolerance = methods[method](freq)
target_ind = pd.date_range(
start=data.index[0].floor(freq), end=data.index[-1].ceil(freq), freq=freq, name=data.index.name
start=data.index[0].floor(freq), end=data.index[-1].ceil(freq),
freq=freq,
name=data.index.name
)
return data.reindex(target_ind, method=direction, tolerance=tolerance, fill_value=fill_value)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment