Skip to content
Snippets Groups Projects

Inter limit fix

Merged Peter Lünenschloß requested to merge interLimitFix into develop
Files
3
+ 70
52
@@ -276,6 +276,31 @@ def meanQC(data, max_nan_total=np.inf, max_nan_consec=np.inf):
)
def _interpolWrapper(x, order=2, method="time", downgrade_interpolation=False):
"""
Function that automatically modifies the interpolation level or returns uninterpolated
input data if the data configuration breaks the interpolation method at the selected degree.
"""
if order < 0:
return x
elif x.count() > order:
try:
return x.interpolate(method=method, order=int(order))
except (NotImplementedError, ValueError):
warnings.warn(
f"Interpolation with method {method} is not supported at order "
f"{order}. and will be performed at order {order - 1}"
)
return _interpolWrapper(x, int(order - 1), method)
elif x.size < 3:
return x
else:
if downgrade_interpolation:
return _interpolWrapper(x, int(x.count() - 1), method)
else:
return x
def interpolateNANs(
data, method, order=2, inter_limit=2, downgrade_interpolation=False
):
@@ -290,77 +315,70 @@ def interpolateNANs(
:param method: String. Method keyword designating interpolation method to use.
:param order: Integer. If your desired interpolation method needs an order to be passed -
here you pass it.
:param inter_limit: Integer. Default = 2. Limit up to which consecutive nan - values in the data get
:param inter_limit: Integer. Default = 2. Number up to which consecutive nan - values in the data get
replaced by interpolation.
Its default value suits an interpolation that only will apply to points of an
inserted frequency grid. (regularization by interpolation)
Gaps wider than "limit" will NOT be interpolated at all.
Gaps of size "limit" or greater will NOT be interpolated at all.
:param downgrade_interpolation: Boolean. Default False. If True:
If a data chunk not contains enough values for interpolation of the order "order",
the highest order possible will be selected for that chunks interpolation.
:return:
"""
inter_limit = int(inter_limit or len(data) + 1)
data = pd.Series(data, copy=True)
gap_mask = data.isna().rolling(inter_limit, min_periods=0).sum() != inter_limit
if inter_limit == 2:
gap_mask = gap_mask & gap_mask.shift(-1, fill_value=True)
if inter_limit is None:
# if there is actually no limit set to the gaps to-be interpolated, generate a dummy mask for the gaps
gap_mask = pd.Series(True, index=data.index, name=data.name)
elif inter_limit < 2:
return data
else:
gap_mask = (
gap_mask.replace(True, np.nan)
.fillna(method="bfill", limit=inter_limit)
.replace(np.nan, True)
.astype(bool)
)
# if there is a limit to the gaps to be interpolated, generate a mask that evaluates to False at the right side
# of each too-large gap with a rolling.sum combo
gap_mask = data.isna().rolling(inter_limit, min_periods=0).sum() != inter_limit
if inter_limit == 2:
# for the common case of inter_limit=2 (default "harmonisation"), we efficiently bag propagate the False
# value to fill the whole too-large gap by a shift and a conjunction.
gap_mask &= gap_mask & gap_mask.shift(-1, fill_value=True)
else:
# If the gap_size is bigger we use pandas backfill-interpolation to propagate the False values back.
# Therefor we replace the True values with np.nan so hat they are interpreted as missing periods.
gap_mask = (
gap_mask.replace(True, np.nan)
.fillna(method="bfill", limit=inter_limit - 1)
.replace(np.nan, True)
.astype(bool)
)
# memorizing the index for later reindexing
pre_index = data.index
if data[gap_mask].empty:
# drop the gaps that are too large with regard to the inter_limit from the data-to-be interpolated
data = data[gap_mask]
if data.empty:
return data
else:
data = data[gap_mask]
if method in ["linear", "time"]:
data.interpolate(
method=method, inplace=True, limit=inter_limit - 1, limit_area="inside"
)
# in the case of linear interpolation, not much can go wrong/break so this conditional branch has efficient
# finish by just calling pandas interpolation routine to fill the gaps remaining in the data:
data.interpolate(method=method, inplace=True, limit_area="inside")
else:
dat_name = data.name
gap_mask = (~gap_mask).cumsum()
data = pd.merge(gap_mask, data, how="inner", left_index=True, right_index=True)
def _interpolWrapper(x, wrap_order=order, wrap_method=method):
if wrap_order < 0:
return x
elif x.count() > wrap_order:
try:
return x.interpolate(method=wrap_method, order=int(wrap_order))
except (NotImplementedError, ValueError):
warnings.warn(
f"Interpolation with method {method} is not supported at order "
f"{wrap_order}. and will be performed at order {wrap_order - 1}"
)
return _interpolWrapper(x, int(wrap_order - 1), wrap_method)
elif x.size < 3:
return x
else:
if downgrade_interpolation:
return _interpolWrapper(x, int(x.count() - 1), wrap_method)
else:
return x
data = data.groupby(data.columns[0]).transform(_interpolWrapper)
# squeezing the 1-dimensional frame resulting from groupby for consistency
# reasons
data = data.squeeze(axis=1)
data.name = dat_name
# if the method that is interpolated with depends on not only the left and right border points of any gap,
# but includes more points, it has to be applied on any data chunk seperated by the too-big gaps individually.
# So we use the gap_mask to group the data into chunks and perform the interpolation on every chunk seperatly
# with the .transform method of the grouper.
gap_mask = (~gap_mask).cumsum()[data.index]
data = data.groupby(by=gap_mask).transform(
_interpolWrapper,
**{
"order": order,
"method": method,
"downgrade_inerpolation": downgrade_interpolation,
},
)
# finally reinsert the dropped data gaps
data = data.reindex(pre_index)
return data
Loading