diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d66374875a7bbb0334e3529d19251d9ab33e53fd..86a4ade8900e91ff2c448d55d339ad9eca779cb5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -80,7 +80,7 @@ coverage: - export DISPLAY=:99 - Xvfb :99 & - pip install pytest-cov coverage - - pytest --cov=saqc tests --ignore=tests/fuzzy -Werror + - pytest --cov=saqc tests --ignore=tests/fuzzy tests/extras -Werror after_script: - coverage xml # regex to find the coverage percentage in the job output @@ -99,7 +99,7 @@ python39: script: - export DISPLAY=:99 - Xvfb :99 & - - pytest tests -Werror --junitxml=report.xml + - pytest tests -Werror --junitxml=report.xml --ignore=tests/extras - python -m saqc --config docs/resources/data/config.csv --data docs/resources/data/data.csv --outfile /tmp/test.csv artifacts: when: always @@ -113,7 +113,7 @@ python310: script: - export DISPLAY=:99 - Xvfb :99 & - - pytest tests -Werror --junitxml=report.xml + - pytest tests -Werror --junitxml=report.xml --ignore=tests/extras - python -m saqc --config docs/resources/data/config.csv --data docs/resources/data/data.csv --outfile /tmp/test.csv artifacts: when: always @@ -126,7 +126,7 @@ python311: script: - export DISPLAY=:99 - Xvfb :99 & - - pytest tests -Werror --junitxml=report.xml + - pytest tests -Werror --junitxml=report.xml --ignore=tests/extras - python -m saqc --config docs/resources/data/config.csv --data docs/resources/data/data.csv --outfile /tmp/test.csv artifacts: when: always @@ -139,7 +139,7 @@ python312: script: - export DISPLAY=:99 - Xvfb :99 & - - pytest tests -Werror --junitxml=report.xml + - pytest tests -Werror --junitxml=report.xml --ignore=tests/extras - python -m saqc --config docs/resources/data/config.csv --data docs/resources/data/data.csv --outfile /tmp/test.csv artifacts: when: always diff --git a/docs/resources/images/fitFMpic.png b/docs/resources/images/fitFMpic.png new file mode 100644 index 0000000000000000000000000000000000000000..11f9cc8dad3d89694fa1ff5111c33940b9ee3acc Binary files /dev/null and b/docs/resources/images/fitFMpic.png differ diff --git a/docs/resources/images/fitFMpic.png.license b/docs/resources/images/fitFMpic.png.license new file mode 100644 index 0000000000000000000000000000000000000000..f8c6bf8cd36fb9a9a0a0dd474407f40908bf5d1f --- /dev/null +++ b/docs/resources/images/fitFMpic.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ + +SPDX-License-Identifier: GPL-3.0-or-later \ No newline at end of file diff --git a/saqc/funcs/curvefit.py b/saqc/funcs/curvefit.py index f13887d1586516c46c948871b06238603433fa65..ca69c006d4c880386ca46ea64608798a24e6d2a2 100644 --- a/saqc/funcs/curvefit.py +++ b/saqc/funcs/curvefit.py @@ -7,7 +7,7 @@ # -*- coding: utf-8 -*- from __future__ import annotations -from typing import TYPE_CHECKING, Literal, Tuple, Union +from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union import numpy as np import pandas as pd @@ -19,7 +19,7 @@ from saqc.lib.checking import ( validateValueBounds, validateWindow, ) -from saqc.lib.tools import getFreqDelta +from saqc.lib.tools import getFreqDelta, toSequence from saqc.lib.ts_operators import ( butterFilter, polyRoller, @@ -30,6 +30,10 @@ from saqc.lib.ts_operators import ( if TYPE_CHECKING: from saqc import SaQC +DEFAULT_MOMENT = dict( + pretrained_model_name_or_path="AutonLab/MOMENT-1-large", revision="main" +) + FILL_METHODS = Literal[ "linear", "nearest", @@ -44,6 +48,7 @@ FILL_METHODS = Literal[ class CurvefitMixin: + @register(mask=["field"], demask=[], squeeze=[]) def fitPolynomial( self: "SaQC", @@ -151,6 +156,153 @@ class CurvefitMixin: ) return self + @register(mask=["field"], demask=[], squeeze=[], multivariate=True) + def fitMomentFM( + self: "SaQC", + field: str | list[str], + ratio: int = 4, + context: int = 512, + agg: Literal["center", "mean", "median", "std"] = "mean", + model_spec: Optional[dict] = None, + **kwargs, + ): + """ + Fits the data by reconstructing it with the Moment Foundational Timeseries Model (MomentFM). + + The function applies MomentFM [1] in its reconstruction mode on a window of size `context`, striding through the data + with step size `context`/`ratio` + + Parameters + ---------- + ratio : + The number of samples generated for any values reconstruction. Must be a divisor of `context`. + Effectively controlls the stride-width of the reconstruction window through the data. + + context : + size of the context window with regard to wich any value is reconstructed. + + agg : + How to aggregate the different reconstructions for the same value. + * 'center': use the value that was constructed in a window centering around the origin value + * 'mean': assign the mean over all reconstructed values + * 'median': assign the median over all reconstructed values + * 'std': assign the standard deviation over all reconstructed values + + model_spec : + Dictionary with the fields: + * `pretrained_model_name_or_path` + * `revision` + + Defaults to global Parameter `DEFAULT_MOMENT=dict(pretrained_model_name_or_path="AutonLab/MOMENT-1-large", revision="main"` + + + Examples + -------- + .. figure:: /resources/images/fitFMpic.png + + + Notes + ----- + [1] https://arxiv.org/abs/2402.03885 + [2] https://github.com/moment-timeseries-foundation-model/moment + """ + + _model_scope = 512 + + model_spec = DEFAULT_MOMENT if model_spec is None else model_spec + + try: + import torch + from momentfm import MOMENTPipeline + from momentfm.data.informer_dataset import InformerDataset + from torch.utils.data import DataLoader + except ImportError as error_msg: + print("Foundational Timeseries Regressor requirements not sufficed:\n") + print(error_msg) + print( + 'Install the Requirements manually or by pip installing "saqc" package with extras "FM" (pip install saqc[FM])' + ) + + if context > _model_scope: + raise ValueError( + f'Parameter "context" must not be greater {_model_scope}. Got {context}.' + ) + if context % ratio > 0: + raise ValueError( + f'Parameter "ratio" must be a divisor of "context". Got context={context} and ratio={ratio} -> divides as: {context / ratio}.' + ) + if agg not in ["mean", "median", "std", "center"]: + raise ValueError( + f'Parameter "agg" needs be one out of ["mean", "median", "std", "center"]. Got "agg": {agg}.' + ) + + field = toSequence(field) + dat = self._data[field].to_pandas() + # input mask, in case context is < 512 + _input_mask = np.ones(_model_scope) + _input_mask[context:] = 0 + # model instance + model = MOMENTPipeline.from_pretrained( + **model_spec, + model_kwargs={"task_name": "reconstruction"}, + ) + model.init() + + # derive rec window stride width from ratio + stepsz = context // ratio + # na mask that will tell the model where data values are missing + _na_mask = ~(dat.isna().any(axis=1)) + # generate sliding view on na mask equaling model input-patch size + na_mask = np.lib.stride_tricks.sliding_window_view( + (_na_mask).values, window_shape=_model_scope, axis=0 + ) + # generate stack of model input patches from the data + dv = np.lib.stride_tricks.sliding_window_view( + dat.values, window_shape=(_model_scope, len(dat.columns)), axis=(0, 1) + ) + dv = np.swapaxes(dv, -1, -2).squeeze(1).astype("float32") + # filter input stacks to represent stepsz - sized reconstruction window stride + dv = dv[::stepsz] + na_mask = na_mask[::stepsz].astype(int) + # mask values to achieve truncated samples (if context < 512) + input_mask = np.ones(na_mask.shape) + input_mask[:, :] = _input_mask + # to torch + dv = torch.tensor(dv) + na_mask = torch.tensor(na_mask) + input_mask = torch.tensor(input_mask) + # get reconstruction for sample stack + output = model(x_enc=dv, mask=na_mask, input_mask=input_mask) + reconstruction = output.reconstruction.detach().cpu().numpy() + reconstruction = reconstruction[:, :, _input_mask.astype(bool)] + # derive number of reconstruction windows covering the same value + partition_count = context // stepsz + # aggregate overlapping reconstruction windows to 1-d data + for ef in enumerate(dat.columns): + rec_arr = np.zeros([dat.shape[0], partition_count]).astype(float) + rec_arr[:] = np.nan + count_arr = rec_arr.copy() + # arange reconstruction windows in array, so that same array row refers to same reconstructed time + for s in range(rec_arr.shape[1]): + d = reconstruction[s::partition_count, ef[0]].squeeze().flatten() + offset = s * stepsz + d_cut = min(offset + len(d), rec_arr.shape[0]) - offset + rec_arr[offset : offset + len(d), s] = d[:d_cut] + count_arr[offset : offset + len(d), s] = np.abs( + np.arange(d_cut) % context - 0.5 * context + ) + + # aggregate the rows with selected aggregation + if agg == "center": + c_select = count_arr.argmin(axis=1) + rec = rec_arr[np.arange(len(c_select)), c_select] + else: + rec = getattr(np, "nan" + agg)(rec_arr, axis=1) + + self._data[ef[1]] = pd.Series(rec, index=dat.index, name=ef[1]) + + return self + def _fitPolynomial( data: DictOfSeries, diff --git a/setup.py b/setup.py index adffaf7168ace1a074c6eb990438f0425bda23f8..30df5548dad4759cf9ca8e8564d7d7936d2f9463 100644 --- a/setup.py +++ b/setup.py @@ -55,6 +55,9 @@ setup( "scipy", "typing_extensions", ], + extras_require={ + "FM": ["momentfm"], + }, license_files=("LICENSE.md", "LICENSES/GPL-3.0-or-later.txt"), entry_points={ "console_scripts": ["saqc=saqc.__main__:main"], diff --git a/tests/extras/__init__.py b/tests/extras/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1d74ada495ad00a20887cdee50a426347f37efae --- /dev/null +++ b/tests/extras/__init__.py @@ -0,0 +1,7 @@ +#! /usr/bin/env python + +# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ +# +# SPDX-License-Identifier: GPL-3.0-or-later + +# -*- coding: utf-8 -*- diff --git a/tests/extras/test_FM.py b/tests/extras/test_FM.py new file mode 100644 index 0000000000000000000000000000000000000000..bb027991ea84a742e5bd9e24ff337150cbc2d5c6 --- /dev/null +++ b/tests/extras/test_FM.py @@ -0,0 +1,31 @@ +#! /usr/bin/env python + +# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ +# +# SPDX-License-Identifier: GPL-3.0-or-later + +# -*- coding: utf-8 -*- + +import numpy as np +import pandas as pd +import pytest + +from saqc import SaQC + + +@pytest.fixture +def data(): + dat = pd.DataFrame( + {"d" + str(k): np.random.random(1000) for k in range(2)}, + index=pd.date_range("2000", freq="10min", periods=1000), + ) + dat.iloc[np.random.randint(0, 1000, 10), 0] = np.nan + return dat + + +@pytest.mark.parametrize("field", ["d0", ["d1", "d0"]]) +@pytest.mark.parametrize("ratio", [2, 4]) +@pytest.mark.parametrize("context", [512, 256]) +def test_fitFMmoment(data, field, ratio, context): + qc = SaQC(data) + qc.fitMomentFM(field, ratio, context)