diff --git a/CHANGELOG.md b/CHANGELOG.md index 064466aa5585f4e72ea7eff9d9053e0cfd087dc1..5f83a8da16918b1de013fbce889b0b6d413aa6d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ SPDX-License-Identifier: GPL-3.0-or-later - `setFlags`: function to replace `flagManual` - `flagUniLOF`: added parameter `slope_correct` to correct for overflagging at relatively steep data value slopes - `History`: added option to change aggregation behavior +- "horizontal" axis / multivariate mode for `rolling` - Translation scheme `AnnotatedFloatScheme` ### Changed - `SaQC.flags` always returns a `DictOfSeries` diff --git a/docs/resources/images/horizontalAxisRollingExample.png b/docs/resources/images/horizontalAxisRollingExample.png new file mode 100644 index 0000000000000000000000000000000000000000..b5f1dff483e43e6bfc0718f6d6f50c32c0fae312 Binary files /dev/null and b/docs/resources/images/horizontalAxisRollingExample.png differ diff --git a/docs/resources/images/horizontalAxisRollingExample.png.license b/docs/resources/images/horizontalAxisRollingExample.png.license new file mode 100644 index 0000000000000000000000000000000000000000..f8c6bf8cd36fb9a9a0a0dd474407f40908bf5d1f --- /dev/null +++ b/docs/resources/images/horizontalAxisRollingExample.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ + +SPDX-License-Identifier: GPL-3.0-or-later \ No newline at end of file diff --git a/saqc/funcs/rolling.py b/saqc/funcs/rolling.py index 2a1b95ef203131aa564c59575f0bb413bf4c8fc2..dfcdfd23cd747f59b6ccece0a89ffd2e847d2fd3 100644 --- a/saqc/funcs/rolling.py +++ b/saqc/funcs/rolling.py @@ -12,6 +12,7 @@ from typing import TYPE_CHECKING, Callable, Union import numpy as np import pandas as pd +import saqc from saqc.core import DictOfSeries, Flags, register from saqc.lib.checking import ( validateCallable, @@ -26,11 +27,14 @@ if TYPE_CHECKING: class RollingMixin: - @register(mask=["field"], demask=[], squeeze=[]) + @register( + mask=["field"], demask=[], squeeze=[], multivariate=True, handles_target=True + ) def rolling( self: "SaQC", - field: str, + field: str | list[str], window: str | int, + target: str | list[str] = None, func: Callable[[pd.Series], np.ndarray] | str = "mean", min_periods: int = 0, center: bool = True, @@ -39,7 +43,9 @@ class RollingMixin: """ Calculate a rolling-window function on the data. - Note, that the data gets assigned the worst flag present in the original data. + Note, that the new data gets assigned the worst flag present in the window it was aggregated from. + + Note, That you also can select multiple fields to get a rolling calculation over those. Parameters ---------- @@ -58,21 +64,116 @@ class RollingMixin: center : If True, center the rolling window. + + Notes + ----- + .. figure:: /resources/images/horizontalAxisRollingExample.png + + Rolling over multiple variables. """ # HINT: checking in _roll - self._data, self._flags = _roll( - data=self._data, - field=field, - flags=self._flags, - window=window, - func=func, - min_periods=min_periods, - center=center, - **kwargs, - ) + if target and (len(target) > 1) and (len(field) != len(target)): + raise ValueError( + f"""If multiple targets are given, per-field application of rolling is conducted and the number of + fields has to equal the number of targets.\n Got: \n Fields={field} \n Targets={target}""" + ) + + if target and (len(field) > 1) and (len(target) == 1): + target = target[0] + if target not in self._data.columns: + self[target] = saqc.SaQC( + pd.Series( + np.nan, index=self.data[field].to_pandas().index, name=target + ) + ) + + self._data, self._flags = _hroll( + data=self._data, + field=field, + flags=self._flags, + target=target, + window=window, + func=func, + min_periods=min_periods, + center=center, + ) + + else: + if target: + for ft in zip(field, target): + self = self.copyField(ft[0], target=ft[1], overwrite=True) + field = target + for f in field: + self._data, self._flags = _roll( + data=self._data, + field=f, + flags=self._flags, + window=window, + func=func, + min_periods=min_periods, + center=center, + **kwargs, + ) return self +def _hroll( + data: DictOfSeries, + field: str, + flags: Flags, + target: str, + window: str | int, + func: Callable[[pd.Series], np.ndarray] | str = "mean", + min_periods: int = 0, + center: bool = True, + **kwargs, +): + + if isinstance(window, str): + freq = getFreqDelta(data[field].to_pandas().index) + if freq is None: + raise ValueError( + f"Rolling over more than one column is only supported if either the data has a unitary" + f'sampling rate, or window is an integer. "{window}" was passed and combined {field} ' + f"index is not unitarily sampled" + ) + else: + window = int(np.floor(pd.Timedelta(window) / freq)) + + views = np.lib.stride_tricks.sliding_window_view( + data[field].to_pandas(), (window, len(field)) + ) + f_views = np.lib.stride_tricks.sliding_window_view( + pd.DataFrame({f: flags[f] for f in field}), (window, len(field)) + ) + frame = pd.DataFrame( + views.reshape(views.shape[0], views.shape[1] * views.shape[2] * views.shape[3]) + ) + if isinstance(func, str) and hasattr(pd.DataFrame, func): + result = getattr(frame, func)(axis=1) + else: + result = frame.apply(func, axis=1) + + insuff_periods_mask = ~(~frame.isna()).sum(axis=1) >= min_periods + result[insuff_periods_mask] = np.nan + f_result = f_views.max(axis=(2, 3)).squeeze() + + d_out = pd.Series(np.nan, index=data[field].to_pandas().index) + d_out[window - 1 :] = result + if center: + d_out = d_out.shift(-int(np.floor(window / 2))) + + f_out = pd.Series(np.nan, index=data[field].to_pandas().index) + f_out[window - 1 :] = f_result + if center: + f_out = f_out.shift(-int(np.floor(window / 2))) + + data[target] = d_out + flags[target] = f_out + + return data, flags + + def _roll( data: DictOfSeries, field: str, diff --git a/tests/funcs/test_proc_functions.py b/tests/funcs/test_proc_functions.py index c47d974daf1b347d702be183039dfca1cc444cf4..df960969b062e7a76e2a53e938f0200843d62ebf 100644 --- a/tests/funcs/test_proc_functions.py +++ b/tests/funcs/test_proc_functions.py @@ -20,6 +20,27 @@ from saqc.lib.ts_operators import linearInterpolation, polynomialInterpolation from tests.fixtures import char_dict, course_3, course_5 # noqa, todo: fix fixtures +@pytest.mark.parametrize( + ("window", "center", "expected"), + [ + (1, True, [3, 2, 3, 2]), + (2, False, [np.nan, 5, 5, 5]), + (3, True, [np.nan, 8, 7, np.nan]), + ("20min", True, [5, 5, 5, np.nan]), + ], +) +def test_multivariateRolling(window, center, expected): + data = pd.DataFrame( + {"a": [1, np.nan, 3, 4], "b": [1, 2, 3, 4], "c": [1, 2, 3, np.nan]}, + index=pd.date_range("2000", periods=4, freq="10min"), + ) + qc = saqc.SaQC(data) + qc = qc.rolling( + ["a", "b", "c"], func="count", target="count", window=window, center=center + ) + assert np.array_equal(qc.data["count"].values, expected, equal_nan=True) + + def test_rollingInterpolateMissing(course_5): data, characteristics = course_5(periods=10, nan_slice=[5, 6]) field = data.columns[0]