Skip to content
Snippets Groups Projects
Commit 703fd2d3 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

moved _interpolate to the ts_operators/added chunk bound calculation...

moved _interpolate to the ts_operators/added chunk bound calculation switch/renamed _interpolate to interpolateNANs
parent ecb906e7
No related branches found
No related tags found
4 merge requests!193Release 1.4,!188Release 1.4,!49Dataprocessing features,!44Dataprocessing features
......@@ -9,6 +9,7 @@ import dios
from saqc.funcs.functions import flagMissing
from saqc.funcs.register import register
from saqc.lib.tools import toSequence, getFuncFromInput
from saqc.lib.ts_operators import interpolateNANs
logger = logging.getLogger("SaQC")
......@@ -362,8 +363,9 @@ def _interpolateGrid(
spec_case_mask = spec_case_mask.tshift(-1, freq)
data = _insertGrid(data, freq)
data, chunk_bounds = _interpolate(
data, chunk_bounds = interpolateNANs(
data, method, order=order, inter_limit=2, downcast_interpolation=downcast_interpolation,
return_chunk_bounds=True
)
# exclude falsely interpolated values:
......@@ -382,78 +384,6 @@ def _interpolateGrid(
return data, chunk_bounds
def _interpolate(data, method, order=2, inter_limit=2, downcast_interpolation=False):
"""
The function interpolates nan-values (and nan-grids) in timeseries data. It can be passed all the method keywords
from the pd.Series.interpolate method and will than apply this very methods. Note, that the inter_limit keyword
really restricts the interpolation to chunks, not containing more than "inter_limit" nan entries
(thereby opposing the limit keyword of pd.Series.interpolate).
:param data: pd.Series. The data series to be interpolated
:param method: String. Method keyword designating interpolation method to use.
:param order: Integer. If your desired interpolation method needs an order to be passed -
here you pass it.
:param inter_limit: Integer. Default = 2. Limit up to wich nan - gaps in the data get interpolated.
Its default value suits an interpolation that only will apply on an inserted
frequency grid.
:param downcast_interpolation: Boolean. Default False. If True:
If a data chunk not contains enough values for interpolation of the order "order",
the highest order possible will be selected for that chunks interpolation."
:return:
"""
gap_mask = (data.rolling(inter_limit, min_periods=0).apply(lambda x: np.sum(np.isnan(x)), raw=True)) != inter_limit
if inter_limit == 2:
gap_mask = gap_mask & gap_mask.shift(-1, fill_value=True)
else:
gap_mask = (
gap_mask.replace(True, np.nan).fillna(method="bfill", limit=inter_limit).replace(np.nan, True).astype(bool)
)
# start end ending points of interpolation chunks have to be memorized to block their flagging:
chunk_switches = gap_mask.astype(int).diff()
chunk_starts = chunk_switches[chunk_switches == -1].index
chunk_ends = chunk_switches[(chunk_switches.shift(-1) == 1)].index
chunk_bounds = chunk_starts.join(chunk_ends, how="outer", sort=True)
data = data[gap_mask]
if method in ["linear", "time"]:
data.interpolate(method=method, inplace=True, limit=1, limit_area="inside")
else:
dat_name = data.name
gap_mask = (~gap_mask).cumsum()
data = pd.merge(gap_mask, data, how="inner", left_index=True, right_index=True)
def _interpolWrapper(x, wrap_order=order, wrap_method=method):
if x.count() > wrap_order:
try:
return x.interpolate(method=wrap_method, order=int(wrap_order))
except (NotImplementedError, ValueError):
logger.warning(
"Interpolation with method {} is not supported at order {}. "
"Interpolation will be performed with order {}".format(
method, str(wrap_order), str(wrap_order - 1)
)
)
return _interpolWrapper(x, int(wrap_order - 1), wrap_method)
elif x.size < 3:
return x
else:
if downcast_interpolation:
return _interpolWrapper(x, int(x.count() - 1), wrap_method)
else:
return x
data = data.groupby(data.columns[0]).transform(_interpolWrapper)
# squeezing the 1-dimensional frame resulting from groupby for consistency reasons
data = data.squeeze(axis=1)
data.name = dat_name
return data, chunk_bounds
def _reshapeFlags(
flagger,
field,
......
......@@ -7,23 +7,6 @@ from sklearn.neighbors import NearestNeighbors
from scipy.stats import iqr
def _isValid(data, max_nan_total, max_nan_consec):
if (max_nan_total is np.inf) & (max_nan_consec is np.inf):
return True
nan_mask = data.isna()
if nan_mask.sum() <= max_nan_total:
if max_nan_consec is np.inf:
return True
elif nan_mask.rolling(window=max_nan_consec + 1).sum().max() <= max_nan_consec:
return True
else:
return False
else:
return False
# ts_transformations
def identity(ts):
return ts
......@@ -85,12 +68,6 @@ def nBallClustering(in_arr, ball_radius=None):
return exemplars, members, ex_indices
def kNN(in_arr, n_neighbors, algorithm="ball_tree"):
nbrs = NearestNeighbors(n_neighbors=n_neighbors, algorithm=algorithm).fit(in_arr)
return nbrs.kneighbors()
def standardizeByMean(ts):
return (ts - ts.mean())/ts.std()
......@@ -99,8 +76,13 @@ def standardizeByMedian(ts):
return (ts - ts.median())/iqr(ts, nan_policy='omit')
def _kNN(in_arr, n_neighbors, algorithm="ball_tree"):
nbrs = NearestNeighbors(n_neighbors=n_neighbors, algorithm=algorithm).fit(in_arr)
return nbrs.kneighbors()
def kNNMaxGap(in_arr, n_neighbors, algorithm='ball_tree'):
dist, *_ = kNN(in_arr, n_neighbors, algorithm=algorithm)
dist, *_ = _kNN(in_arr, n_neighbors, algorithm=algorithm)
sample_size = dist.shape[0]
to_gap = np.append(np.array([[0] * sample_size]).T, dist, axis=1)
max_gap_ind = np.diff(to_gap, axis=1).argmax(axis=1)
......@@ -108,10 +90,27 @@ def kNNMaxGap(in_arr, n_neighbors, algorithm='ball_tree'):
def kNNSum(in_arr, n_neighbors, algorithm="ball_tree"):
dist, *_ = kNN(in_arr, n_neighbors, algorithm=algorithm)
dist, *_ = _kNN(in_arr, n_neighbors, algorithm=algorithm)
return dist.sum(axis=1)
def _isValid(data, max_nan_total, max_nan_consec):
if (max_nan_total is np.inf) & (max_nan_consec is np.inf):
return True
nan_mask = data.isna()
if nan_mask.sum() <= max_nan_total:
if max_nan_consec is np.inf:
return True
elif nan_mask.rolling(window=max_nan_consec + 1).sum().max() <= max_nan_consec:
return True
else:
return False
else:
return False
def stdQC(data, max_nan_total=np.inf, max_nan_consec=np.inf):
"""Pandas built in function for statistical moments have quite poor nan- control, so here comes a wrapper that
will return the standart deviation for a given series input, if the total number of nans in the series does
......@@ -152,3 +151,87 @@ def meanQC(data, max_nan_total=np.inf, max_nan_consec=np.inf):
if _isValid(data, max_nan_total, max_nan_consec):
return data.mean()
return np.nan
def interpolateNANs(data, method, order=2, inter_limit=2, downcast_interpolation=False, return_chunk_bounds=False):
"""
The function interpolates nan-values (and nan-grids) in timeseries data. It can be passed all the method keywords
from the pd.Series.interpolate method and will than apply this very methods. Note, that the inter_limit keyword
really restricts the interpolation to chunks, not containing more than "inter_limit" nan entries
(thereby opposing the limit keyword of pd.Series.interpolate).
:param data: pd.Series. The data series to be interpolated
:param method: String. Method keyword designating interpolation method to use.
:param order: Integer. If your desired interpolation method needs an order to be passed -
here you pass it.
:param inter_limit: Integer. Default = 2. Limit up to whitch nan - gaps in the data get interpolated.
Its default value suits an interpolation that only will apply on an inserted
frequency grid. (regularization by interpolation)
:param downcast_interpolation: Boolean. Default False. If True:
If a data chunk not contains enough values for interpolation of the order "order",
the highest order possible will be selected for that chunks interpolation."
:param return_chunk_bounds: Boolean. Default False. If True:
Additionally to the interpolated data, the start and ending points of data chunks
not containing no series consisting of more then "inter_limit" nan values,
are calculated and returned.
(This option fits requirements of the "_interpolate" functions use in the context of
saqc harmonization mainly.)
:return:
"""
gap_mask = (data.rolling(inter_limit, min_periods=0).apply(lambda x: np.sum(np.isnan(x)), raw=True)) != inter_limit
if inter_limit == 2:
gap_mask = gap_mask & gap_mask.shift(-1, fill_value=True)
else:
gap_mask = (
gap_mask.replace(True, np.nan).fillna(method="bfill", limit=inter_limit).replace(np.nan, True).astype(bool)
)
if return_chunk_bounds:
# start end ending points of interpolation chunks have to be memorized to block their flagging:
chunk_switches = gap_mask.astype(int).diff()
chunk_starts = chunk_switches[chunk_switches == -1].index
chunk_ends = chunk_switches[(chunk_switches.shift(-1) == 1)].index
chunk_bounds = chunk_starts.join(chunk_ends, how="outer", sort=True)
data = data[gap_mask]
if method in ["linear", "time"]:
data.interpolate(method=method, inplace=True, limit=1, limit_area="inside")
else:
dat_name = data.name
gap_mask = (~gap_mask).cumsum()
data = pd.merge(gap_mask, data, how="inner", left_index=True, right_index=True)
def _interpolWrapper(x, wrap_order=order, wrap_method=method):
if x.count() > wrap_order:
try:
return x.interpolate(method=wrap_method, order=int(wrap_order))
except (NotImplementedError, ValueError):
logger.warning(
"Interpolation with method {} is not supported at order {}. "
"Interpolation will be performed with order {}".format(
method, str(wrap_order), str(wrap_order - 1)
)
)
return _interpolWrapper(x, int(wrap_order - 1), wrap_method)
elif x.size < 3:
return x
else:
if downcast_interpolation:
return _interpolWrapper(x, int(x.count() - 1), wrap_method)
else:
return x
data = data.groupby(data.columns[0]).transform(_interpolWrapper)
# squeezing the 1-dimensional frame resulting from groupby for consistency reasons
data = data.squeeze(axis=1)
data.name = dat_name
if return_chunk_bounds:
return data, chunk_bounds
else:
return data
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment