Skip to content
Snippets Groups Projects
Commit 7670fcb9 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

streamlined/documented interpolate NaN funcion

parent 730e4d99
No related branches found
No related tags found
1 merge request!600Inter limit fix
Pipeline #142245 failed with stages
in 1 minute and 4 seconds
......@@ -276,6 +276,30 @@ def meanQC(data, max_nan_total=np.inf, max_nan_consec=np.inf):
)
def _interpolWrapper(x, order=2, method='time', downgrade_interpolation=False):
"""
Function that automatically modifies the interpolation level or returns uninterpolated
input data if the data configuration breaks the interpolation method at the selected degree.
"""
if order < 0:
return x
elif x.count() > order:
try:
return x.interpolate(method=method, order=int(order))
except (NotImplementedError, ValueError):
warnings.warn(
f"Interpolation with method {method} is not supported at order "
f"{order}. and will be performed at order {order - 1}"
)
return _interpolWrapper(x, int(order - 1), method)
elif x.size < 3:
return x
else:
if downgrade_interpolation:
return _interpolWrapper(x, int(x.count() - 1), method)
else:
return x
def interpolateNANs(
data, method, order=2, inter_limit=2, downgrade_interpolation=False
):
......@@ -301,66 +325,54 @@ def interpolateNANs(
:return:
"""
inter_limit = int(inter_limit or len(data) + 1)
data = pd.Series(data, copy=True)
gap_mask = data.isna().rolling(inter_limit, min_periods=0).sum() != inter_limit
if inter_limit == 2:
gap_mask = gap_mask & gap_mask.shift(-1, fill_value=True)
if inter_limit is None:
# if there is actually no limit set to the gaps to-be interpolated, generate a dummy mask for the gaps
gap_mask = pd.Series(True, index=data.index, name=data.name)
else:
gap_mask = (
gap_mask.replace(True, np.nan)
.fillna(method="bfill", limit=inter_limit - 1)
.replace(np.nan, True)
.astype(bool)
)
# if there is a limit to the gaps to be interpolated, generate a mask that evaluates to False at the right side
# of each too-large gap with a rolling.sum combo
gap_mask = data.isna().rolling(inter_limit, min_periods=0).sum() != inter_limit
if inter_limit < 20:
# for the common case of inter_limit=2 (default "harmonisation"), we efficiently bag propagate the False
# value to fill the whole too-large gap by a shift and a conjunction.
gap_mask &= gap_mask & gap_mask.shift(-1, fill_value=True)
else:
# If the gap_size is bigger we use pandas backfill-interpolation to propagate the False values back.
# Therefor we replace the True values with np.nan so hat they are interpreted as missing periods.
gap_mask = (
gap_mask.replace(True, np.nan)
.fillna(method="bfill", limit=inter_limit - 1)
.replace(np.nan, True)
.astype(bool)
)
# memorizing the index for later reindexing
pre_index = data.index
if data[gap_mask].empty:
# drop the gaps that are too large with regard to the inter_limit from the data-to-be interpolated
data = data[gap_mask]
if data.empty:
return data
else:
data = data[gap_mask]
if method in ["linear", "time"]:
# in the case of linear interpolation, not much can go wrong/break so this conditional branch has efficient
# finish by just calling pandas interpolation routine to fill the gaps remaining in the data:
data.interpolate(
method=method, inplace=True, limit=inter_limit - 1, limit_area="inside"
method=method, inplace=True, limit_area="inside"
)
else:
dat_name = data.name
gap_mask = (~gap_mask).cumsum()
data = pd.merge(gap_mask, data, how="inner", left_index=True, right_index=True)
def _interpolWrapper(x, wrap_order=order, wrap_method=method):
if wrap_order < 0:
return x
elif x.count() > wrap_order:
try:
return x.interpolate(method=wrap_method, order=int(wrap_order))
except (NotImplementedError, ValueError):
warnings.warn(
f"Interpolation with method {method} is not supported at order "
f"{wrap_order}. and will be performed at order {wrap_order - 1}"
)
return _interpolWrapper(x, int(wrap_order - 1), wrap_method)
elif x.size < 3:
return x
else:
if downgrade_interpolation:
return _interpolWrapper(x, int(x.count() - 1), wrap_method)
else:
return x
data = data.groupby(data.columns[0]).transform(_interpolWrapper)
# squeezing the 1-dimensional frame resulting from groupby for consistency
# reasons
data = data.squeeze(axis=1)
data.name = dat_name
# if the method that is inerpolated with depends on not only the left and right border points of any gap,
# but includes more points, it has to be applied on any data chunk seperated by the too-big gaps individually.
# So we use the gap_mask to group the data into chunks and perform the interpolation on every chunk seperatly
# with the .transform method of the grouper.
gap_mask = (~gap_mask).cumsum()[data.index]
data = data.groupby(by=gap_mask).transform(_interpolWrapper, **{'order':order,
'method':method,
'downgrade_inerpolation':downgrade_interpolation})
# finally reinsert the dropped data gaps
data = data.reindex(pre_index)
return data
......
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
import numpy as np
import pandas as pd
import pytest
from numpy.testing import assert_array_equal, assert_equal
from pandas.testing import assert_series_equal
import saqc.lib.ts_operators as tsops
from saqc.lib.ts_operators import interpolateNANs
def test_butterFilter():
......@@ -193,3 +195,72 @@ def test_rateOfChange(data, expected):
result = rateOfChange(data)
assert_series_equal(result, expected, check_names=False)
@pytest.mark.parametrize(
"limit,area,direction,data,expected",
[
(
1,
"inside",
None,
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
),
(
2,
"inside",
None,
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
),
(
3,
"inside",
None,
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
[np.nan, 0, 1, 2, 3, 4, np.nan],
),
(
None,
"inside",
None,
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
[np.nan, 0, 1, 2, 3, 4, np.nan],
),
(
None,
"outside",
"forward",
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
[np.nan, 0, np.nan, np.nan, np.nan, 4, 4],
),
(
None,
"outside",
"backward",
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
[0, 0, np.nan, np.nan, np.nan, 4, np.nan],
),
(
None,
"outside",
"both",
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
[0, 0, np.nan, np.nan, np.nan, 4, 4],
),
(
None,
None,
"both",
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
[0, 0, 1, 2, 3, 4, 4],
),
],
)
def test_interpolatNANs(limit, area, direction, data, expected):
got = interpolateNANs(
pd.Series(data), inter_limit=limit
)
assert got.equals(pd.Series(expected, dtype=float))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment