Skip to content
Snippets Groups Projects

Several fixes and refactorings to the interpolation methods

Closed David Schäfer requested to merge interpolation_fixes into develop
+ 37
86
@@ -9,9 +9,10 @@
"""
The module gathers all kinds of timeseries tranformations.
"""
from __future__ import annotations
import re
import sys
import warnings
from typing import Union
import numba as nb
@@ -21,6 +22,7 @@ import pandas as pd
from scipy.signal import butter, filtfilt
from scipy.stats import iqr, median_abs_deviation
from sklearn.neighbors import NearestNeighbors
from typing_extensions import Literal
from saqc.lib.tools import getFreqDelta
@@ -277,91 +279,42 @@ def meanQC(data, max_nan_total=np.inf, max_nan_consec=np.inf):
def interpolateNANs(
data, method, order=2, inter_limit=2, downgrade_interpolation=False
):
"""
The function interpolates nan-values (and nan-grids) in timeseries data. It can
be passed all the method keywords from the pd.Series.interpolate method and will
than apply this very methods. Note, that the limit keyword really restricts
the interpolation to chunks, not containing more than "limit" nan entries (
thereby not being identical to the "limit" keyword of pd.Series.interpolate).
:param data: pd.Series or np.array. The data series to be interpolated
:param method: String. Method keyword designating interpolation method to use.
:param order: Integer. If your desired interpolation method needs an order to be passed -
here you pass it.
:param inter_limit: Integer. Default = 2. Limit up to which consecutive nan - values in the data get
replaced by interpolation.
Its default value suits an interpolation that only will apply to points of an
inserted frequency grid. (regularization by interpolation)
Gaps wider than "limit" will NOT be interpolated at all.
:param downgrade_interpolation: Boolean. Default False. If True:
If a data chunk not contains enough values for interpolation of the order "order",
the highest order possible will be selected for that chunks interpolation.
:return:
"""
inter_limit = int(inter_limit or len(data) + 1)
data = pd.Series(data, copy=True)
gap_mask = data.isna().rolling(inter_limit, min_periods=0).sum() != inter_limit
if inter_limit == 2:
gap_mask = gap_mask & gap_mask.shift(-1, fill_value=True)
else:
gap_mask = (
gap_mask.replace(True, np.nan)
.fillna(method="bfill", limit=inter_limit)
.replace(np.nan, True)
.astype(bool)
)
pre_index = data.index
if data[gap_mask].empty:
return data
data: pd.Series,
method: str = "linear",
limit: int | None = 2,
limit_area: Literal["inside", "outside", None] = "inside",
limit_direction: Literal["forward", "backward", "both"] | None = None,
**kwargs,
) -> pd.Series:
limit = int(limit or len(data) + 1)
# 1. interpolate
out: pd.Series = data.interpolate(
method=method,
inplace=False,
limit=limit,
limit_area=limit_area,
limit_direction=limit_direction,
**kwargs,
)
else:
data = data[gap_mask]
# 2. remove interpolated values from gaps larger than `limit`
if limit < len(data):
if method in ["linear", "time"]:
def func(values, index):
return np.all(values) and len(values) > limit
data.interpolate(
method=method, inplace=True, limit=inter_limit - 1, limit_area="inside"
nans = data.isna()
mask = (
nans.groupby((nans.shift() != nans).cumsum())
.transform(func, engine="numba")
.astype(bool)
)
else:
dat_name = data.name
gap_mask = (~gap_mask).cumsum()
data = pd.merge(gap_mask, data, how="inner", left_index=True, right_index=True)
def _interpolWrapper(x, wrap_order=order, wrap_method=method):
if wrap_order < 0:
return x
elif x.count() > wrap_order:
try:
return x.interpolate(method=wrap_method, order=int(wrap_order))
except (NotImplementedError, ValueError):
warnings.warn(
f"Interpolation with method {method} is not supported at order "
f"{wrap_order}. and will be performed at order {wrap_order - 1}"
)
return _interpolWrapper(x, int(wrap_order - 1), wrap_method)
elif x.size < 3:
return x
else:
if downgrade_interpolation:
return _interpolWrapper(x, int(x.count() - 1), wrap_method)
else:
return x
data = data.groupby(data.columns[0]).transform(_interpolWrapper)
# squeezing the 1-dimensional frame resulting from groupby for consistency
# reasons
data = data.squeeze(axis=1)
data.name = dat_name
data = data.reindex(pre_index)
out.loc[mask] = np.nan
return data
return out
def aggregate2Freq(
@@ -598,11 +551,9 @@ def linearDriftModel(x, origin, target):
return origin + x * target
def linearInterpolation(data, inter_limit=2):
return interpolateNANs(data, "time", inter_limit=inter_limit)
def linearInterpolation(data, inter_limit=1):
return interpolateNANs(data, "time", limit=inter_limit)
def polynomialInterpolation(data, inter_limit=2, inter_order=2):
return interpolateNANs(
data, "polynomial", inter_limit=inter_limit, order=inter_order
)
def polynomialInterpolation(data, inter_limit=1, inter_order=2):
return interpolateNANs(data, "polynomial", limit=inter_limit, order=inter_order)
Loading