From dd787ea04cf878aab223d3c2bf88726c37f9a4bf Mon Sep 17 00:00:00 2001 From: luenensc <peter.luenenschloss@ufz.de> Date: Sat, 14 Jan 2023 01:42:03 +0100 Subject: [PATCH] removed automatic interpolation downgrade mechanic/ added extrapolation mechanic --- saqc/funcs/interpolation.py | 5 +-- saqc/lib/ts_operators.py | 79 +++++++++++++++++----------------- tests/lib/test_ts_operators.py | 2 +- 3 files changed, 42 insertions(+), 44 deletions(-) diff --git a/saqc/funcs/interpolation.py b/saqc/funcs/interpolation.py index e0907e261..52f17bda2 100644 --- a/saqc/funcs/interpolation.py +++ b/saqc/funcs/interpolation.py @@ -186,8 +186,7 @@ class InterpolationMixin: self._data[field], method, order=order, - inter_limit=limit, - downgrade_interpolation=downgrade, + gap_limit=limit, ) interpolated = self._data[field].isna() & inter_data.notna() @@ -281,7 +280,7 @@ class InterpolationMixin: data=datcol, method=method, order=order, - inter_limit=limit, + gap_limit=limit, downgrade_interpolation=downgrade, ) diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index 91f41281d..3a51e750c 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -276,46 +276,37 @@ def meanQC(data, max_nan_total=np.inf, max_nan_consec=np.inf): ) -def _interpolWrapper(x, order=2, method="time", downgrade_interpolation=False): +def _interpolWrapper(x, order=1, method="time", limit_area='inside', limit_direction=None): """ Function that automatically modifies the interpolation level or returns uninterpolated input data if the data configuration breaks the interpolation method at the selected degree. """ - if order < 0: - return x - elif x.count() > order: - try: - return x.interpolate(method=method, order=int(order)) - except (NotImplementedError, ValueError): - warnings.warn( - f"Interpolation with method {method} is not supported at order " - f"{order}. and will be performed at order {order - 1}" - ) - return _interpolWrapper(x, int(order - 1), method) - elif x.size < 3: + + min_vals_dict = {'nearest': 2, 'slinear': 2, 'quadratic': 3, 'cubic':4, 'spline':order+1, 'polynomial':order+1, + 'piecewise_polynomial': 2, 'pchip': 2, 'akima': 2, 'cubicspline': 2} + min_vals = min_vals_dict.get(method, 0) + + if (x.size < 3) | (x.count() < min_vals): return x else: - if downgrade_interpolation: - return _interpolWrapper(x, int(x.count() - 1), method) - else: - return x + return x.interpolate(method=method, order=order, limit_area=limit_area, limit_direction=limit_direction) def interpolateNANs( - data, method, order=2, inter_limit=2, downgrade_interpolation=False + data, method, order=2, gap_limit=2, extrapolate=None ): """ The function interpolates nan-values (and nan-grids) in timeseries data. It can be passed all the method keywords from the pd.Series.interpolate method and will than apply this very methods. Note, that the limit keyword really restricts - the interpolation to chunks, not containing more than "limit" nan entries ( + the interpolation to gaps, not containing more than "limit" nan entries ( thereby not being identical to the "limit" keyword of pd.Series.interpolate). :param data: pd.Series or np.array. The data series to be interpolated :param method: String. Method keyword designating interpolation method to use. :param order: Integer. If your desired interpolation method needs an order to be passed - here you pass it. - :param inter_limit: Integer. Default = 2. Number up to which consecutive nan - values in the data get + :param gap_limit: Integer. Default = 2. Number up to which consecutive nan - values in the data get replaced by interpolation. Its default value suits an interpolation that only will apply to points of an inserted frequency grid. (regularization by interpolation) @@ -327,18 +318,18 @@ def interpolateNANs( :return: """ data = pd.Series(data, copy=True) - - if inter_limit is None: + limit_area = "inside" if not extrapolate else "outside" + if gap_limit is None: # if there is actually no limit set to the gaps to-be interpolated, generate a dummy mask for the gaps gap_mask = pd.Series(True, index=data.index, name=data.name) - elif inter_limit < 2: + elif gap_limit < 2: return data else: # if there is a limit to the gaps to be interpolated, generate a mask that evaluates to False at the right side # of each too-large gap with a rolling.sum combo - gap_mask = data.isna().rolling(inter_limit, min_periods=0).sum() != inter_limit - if inter_limit == 2: - # for the common case of inter_limit=2 (default "harmonisation"), we efficiently bag propagate the False + gap_mask = data.isna().rolling(gap_limit, min_periods=0).sum() != gap_limit + if gap_limit == 2: + # for the common case of gap_limit=2 (default "harmonisation"), we efficiently back propagate the False # value to fill the whole too-large gap by a shift and a conjunction. gap_mask &= gap_mask & gap_mask.shift(-1, fill_value=True) else: @@ -346,14 +337,14 @@ def interpolateNANs( # Therefor we replace the True values with np.nan so hat they are interpreted as missing periods. gap_mask = ( gap_mask.replace(True, np.nan) - .fillna(method="bfill", limit=inter_limit - 1) + .fillna(method="bfill", limit=gap_limit - 1) .replace(np.nan, True) .astype(bool) ) # memorizing the index for later reindexing pre_index = data.index - # drop the gaps that are too large with regard to the inter_limit from the data-to-be interpolated + # drop the gaps that are too large with regard to the gap_limit from the data-to-be interpolated data = data[gap_mask] if data.empty: return data @@ -361,22 +352,30 @@ def interpolateNANs( if method in ["linear", "time"]: # in the case of linear interpolation, not much can go wrong/break so this conditional branch has efficient # finish by just calling pandas interpolation routine to fill the gaps remaining in the data: - data.interpolate(method=method, inplace=True, limit_area="inside") + data.interpolate(method=method, inplace=True, limit_area=limit_area, limit_direction=extrapolate) else: - # if the method that is interpolated with depends on not only the left and right border points of any gap, + # if the method that is interpolated with, depends on not only the left and right border points of any gap, # but includes more points, it has to be applied on any data chunk seperated by the too-big gaps individually. # So we use the gap_mask to group the data into chunks and perform the interpolation on every chunk seperatly # with the .transform method of the grouper. gap_mask = (~gap_mask).cumsum()[data.index] - data = data.groupby(by=gap_mask).transform( - _interpolWrapper, - **{ - "order": order, - "method": method, - "downgrade_interpolation": downgrade_interpolation, - }, - ) + chunk_groups = data.groupby(by=gap_mask) + if extrapolate: + if extrapolate in ['both', 'backward']: + lead_idx = gap_mask[gap_mask==gap_mask.min()].index + data[lead_idx] = _interpolWrapper(data[lead_idx], order=order, method=method, limit_area=limit_area, limit_direction='backward') + if extrapolate in ['both', 'forward']: + trail_idx = gap_mask[gap_mask==gap_mask.max()].index + data[trail_idx] = _interpolWrapper(data[lead_idx], order=order, method=method, limit_area=limit_area, limit_direction='forward') + else: + data = chunk_groups.groupby(by=gap_mask).transform( + _interpolWrapper, + **{ + "order": order, + "method": method, + }, + ) # finally reinsert the dropped data gaps data = data.reindex(pre_index) return data @@ -617,10 +616,10 @@ def linearDriftModel(x, origin, target): def linearInterpolation(data, inter_limit=2): - return interpolateNANs(data, "time", inter_limit=inter_limit) + return interpolateNANs(data, "time", gap_limit=inter_limit) def polynomialInterpolation(data, inter_limit=2, inter_order=2): return interpolateNANs( - data, "polynomial", inter_limit=inter_limit, order=inter_order + data, "polynomial", gap_limit=inter_limit, order=inter_order ) diff --git a/tests/lib/test_ts_operators.py b/tests/lib/test_ts_operators.py index 044ab7297..dffb19aa6 100644 --- a/tests/lib/test_ts_operators.py +++ b/tests/lib/test_ts_operators.py @@ -228,7 +228,7 @@ def test_rateOfChange(data, expected): ], ) def test_interpolatNANs(limit, data, expected): - got = interpolateNANs(pd.Series(data), inter_limit=limit, method="linear") + got = interpolateNANs(pd.Series(data), gap_limit=limit, method="linear") try: assert got.equals(pd.Series(expected, dtype=float)) except: -- GitLab