Skip to content
Snippets Groups Projects

Funcs dict

Merged Peter Lünenschloß requested to merge funcsDict into develop
All threads resolved!
@@ -21,12 +21,14 @@ from saqc.lib.checking import (
isValidChoice,
validateCallable,
validateChoice,
validateFuncSelection,
validateMinPeriods,
validateValueBounds,
validateWindow,
)
from saqc.lib.tools import isflagged
from saqc.lib.ts_operators import interpolateNANs, shift2Freq
from saqc.lib.ts_operators import interpolateNANs
from saqc.parsing.environ import ENV_OPERATORS
if TYPE_CHECKING:
from saqc import SaQC
@@ -63,6 +65,35 @@ def _resampleOverlapping(data: pd.Series, freq: str, fill_value):
return data.fillna(fill_value).astype(dtype)
def _shift2Freq(
data: Union[pd.Series, pd.DataFrame],
method: Literal["fshift", "bshift", "nshift"],
freq: str,
fill_value,
):
"""
shift timestamps backwards/forwards in order to align them with an equidistant
frequency grid. Resulting Nan's are replaced with the fill-value.
"""
validateWindow(freq, "freq", allow_int=False)
validateChoice(method, "method", ["fshift", "bshift", "nshift"])
methods = {
"fshift": lambda freq: ("ffill", pd.Timedelta(freq)),
"bshift": lambda freq: ("bfill", pd.Timedelta(freq)),
"nshift": lambda freq: ("nearest", pd.Timedelta(freq) / 2),
}
direction, tolerance = methods[method](freq)
target_ind = pd.date_range(
start=pd.Timestamp(data.index[0]).floor(freq),
end=pd.Timestamp(data.index[-1]).ceil(freq),
freq=freq,
name=data.index.name,
)
return data.reindex(
target_ind, method=direction, tolerance=tolerance, fill_value=fill_value
)
class InterpolationMixin:
@register(
mask=["field"],
@@ -73,7 +104,7 @@ class InterpolationMixin:
self: "SaQC",
field: str,
window: str | int,
func: Callable[[pd.Series], float] = np.median,
func: Callable[[pd.Series], float] | str = "median",
center: bool = True,
min_periods: int = 0,
flag: float = UNFLAGGED,
@@ -104,7 +135,9 @@ class InterpolationMixin:
computed.
"""
validateWindow(window)
validateCallable(func, "func")
validateFuncSelection(func, allow_operator_str=True)
if isinstance(func, str):
func = ENV_OPERATORS[func]
validateMinPeriods(min_periods)
datcol = self._data[field]
@@ -588,7 +621,7 @@ def _shift(
return saqc
# do the shift
datcol = shift2Freq(datcol, method, freq, fill_value=np.nan)
datcol = _shift2Freq(datcol, method, freq, fill_value=np.nan)
# do the shift on the history
kws = dict(method=method, freq=freq)
@@ -596,7 +629,7 @@ def _shift(
history = saqc._flags.history[field].apply(
index=datcol.index,
func_handle_df=True,
func=shift2Freq,
func=_shift2Freq,
func_kws={**kws, "fill_value": np.nan},
)
Loading