Skip to content
Snippets Groups Projects
Commit 33ac032e authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

Merge branch 'TTM' into 'develop'

foundational timeseries model Regressor

See merge request !863
parents 7bfe7237 d7bfe657
No related branches found
No related tags found
1 merge request!863foundational timeseries model Regressor
Pipeline #280954 failed with stages
in 1 minute and 18 seconds
......@@ -80,7 +80,7 @@ coverage:
- export DISPLAY=:99
- Xvfb :99 &
- pip install pytest-cov coverage
- pytest --cov=saqc tests --ignore=tests/fuzzy -Werror
- pytest --cov=saqc tests --ignore=tests/fuzzy tests/extras -Werror
after_script:
- coverage xml
# regex to find the coverage percentage in the job output
......@@ -99,7 +99,7 @@ python39:
script:
- export DISPLAY=:99
- Xvfb :99 &
- pytest tests -Werror --junitxml=report.xml
- pytest tests -Werror --junitxml=report.xml --ignore=tests/extras
- python -m saqc --config docs/resources/data/config.csv --data docs/resources/data/data.csv --outfile /tmp/test.csv
artifacts:
when: always
......@@ -113,7 +113,7 @@ python310:
script:
- export DISPLAY=:99
- Xvfb :99 &
- pytest tests -Werror --junitxml=report.xml
- pytest tests -Werror --junitxml=report.xml --ignore=tests/extras
- python -m saqc --config docs/resources/data/config.csv --data docs/resources/data/data.csv --outfile /tmp/test.csv
artifacts:
when: always
......@@ -126,7 +126,7 @@ python311:
script:
- export DISPLAY=:99
- Xvfb :99 &
- pytest tests -Werror --junitxml=report.xml
- pytest tests -Werror --junitxml=report.xml --ignore=tests/extras
- python -m saqc --config docs/resources/data/config.csv --data docs/resources/data/data.csv --outfile /tmp/test.csv
artifacts:
when: always
......@@ -139,7 +139,7 @@ python312:
script:
- export DISPLAY=:99
- Xvfb :99 &
- pytest tests -Werror --junitxml=report.xml
- pytest tests -Werror --junitxml=report.xml --ignore=tests/extras
- python -m saqc --config docs/resources/data/config.csv --data docs/resources/data/data.csv --outfile /tmp/test.csv
artifacts:
when: always
......
docs/resources/images/fitFMpic.png

831 KiB

SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
SPDX-License-Identifier: GPL-3.0-or-later
\ No newline at end of file
......@@ -7,7 +7,7 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING, Literal, Tuple, Union
from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union
import numpy as np
import pandas as pd
......@@ -19,7 +19,7 @@ from saqc.lib.checking import (
validateValueBounds,
validateWindow,
)
from saqc.lib.tools import getFreqDelta
from saqc.lib.tools import getFreqDelta, toSequence
from saqc.lib.ts_operators import (
butterFilter,
polyRoller,
......@@ -30,6 +30,10 @@ from saqc.lib.ts_operators import (
if TYPE_CHECKING:
from saqc import SaQC
DEFAULT_MOMENT = dict(
pretrained_model_name_or_path="AutonLab/MOMENT-1-large", revision="main"
)
FILL_METHODS = Literal[
"linear",
"nearest",
......@@ -44,6 +48,7 @@ FILL_METHODS = Literal[
class CurvefitMixin:
@register(mask=["field"], demask=[], squeeze=[])
def fitPolynomial(
self: "SaQC",
......@@ -151,6 +156,153 @@ class CurvefitMixin:
)
return self
@register(mask=["field"], demask=[], squeeze=[], multivariate=True)
def fitMomentFM(
self: "SaQC",
field: str | list[str],
ratio: int = 4,
context: int = 512,
agg: Literal["center", "mean", "median", "std"] = "mean",
model_spec: Optional[dict] = None,
**kwargs,
):
"""
Fits the data by reconstructing it with the Moment Foundational Timeseries Model (MomentFM).
The function applies MomentFM [1] in its reconstruction mode on a window of size `context`, striding through the data
with step size `context`/`ratio`
Parameters
----------
ratio :
The number of samples generated for any values reconstruction. Must be a divisor of `context`.
Effectively controlls the stride-width of the reconstruction window through the data.
context :
size of the context window with regard to wich any value is reconstructed.
agg :
How to aggregate the different reconstructions for the same value.
* 'center': use the value that was constructed in a window centering around the origin value
* 'mean': assign the mean over all reconstructed values
* 'median': assign the median over all reconstructed values
* 'std': assign the standard deviation over all reconstructed values
model_spec :
Dictionary with the fields:
* `pretrained_model_name_or_path`
* `revision`
Defaults to global Parameter `DEFAULT_MOMENT=dict(pretrained_model_name_or_path="AutonLab/MOMENT-1-large", revision="main"`
Examples
--------
.. figure:: /resources/images/fitFMpic.png
Notes
-----
[1] https://arxiv.org/abs/2402.03885
[2] https://github.com/moment-timeseries-foundation-model/moment
"""
_model_scope = 512
model_spec = DEFAULT_MOMENT if model_spec is None else model_spec
try:
import torch
from momentfm import MOMENTPipeline
from momentfm.data.informer_dataset import InformerDataset
from torch.utils.data import DataLoader
except ImportError as error_msg:
print("Foundational Timeseries Regressor requirements not sufficed:\n")
print(error_msg)
print(
'Install the Requirements manually or by pip installing "saqc" package with extras "FM" (pip install saqc[FM])'
)
if context > _model_scope:
raise ValueError(
f'Parameter "context" must not be greater {_model_scope}. Got {context}.'
)
if context % ratio > 0:
raise ValueError(
f'Parameter "ratio" must be a divisor of "context". Got context={context} and ratio={ratio} -> divides as: {context / ratio}.'
)
if agg not in ["mean", "median", "std", "center"]:
raise ValueError(
f'Parameter "agg" needs be one out of ["mean", "median", "std", "center"]. Got "agg": {agg}.'
)
field = toSequence(field)
dat = self._data[field].to_pandas()
# input mask, in case context is < 512
_input_mask = np.ones(_model_scope)
_input_mask[context:] = 0
# model instance
model = MOMENTPipeline.from_pretrained(
**model_spec,
model_kwargs={"task_name": "reconstruction"},
)
model.init()
# derive rec window stride width from ratio
stepsz = context // ratio
# na mask that will tell the model where data values are missing
_na_mask = ~(dat.isna().any(axis=1))
# generate sliding view on na mask equaling model input-patch size
na_mask = np.lib.stride_tricks.sliding_window_view(
(_na_mask).values, window_shape=_model_scope, axis=0
)
# generate stack of model input patches from the data
dv = np.lib.stride_tricks.sliding_window_view(
dat.values, window_shape=(_model_scope, len(dat.columns)), axis=(0, 1)
)
dv = np.swapaxes(dv, -1, -2).squeeze(1).astype("float32")
# filter input stacks to represent stepsz - sized reconstruction window stride
dv = dv[::stepsz]
na_mask = na_mask[::stepsz].astype(int)
# mask values to achieve truncated samples (if context < 512)
input_mask = np.ones(na_mask.shape)
input_mask[:, :] = _input_mask
# to torch
dv = torch.tensor(dv)
na_mask = torch.tensor(na_mask)
input_mask = torch.tensor(input_mask)
# get reconstruction for sample stack
output = model(x_enc=dv, mask=na_mask, input_mask=input_mask)
reconstruction = output.reconstruction.detach().cpu().numpy()
reconstruction = reconstruction[:, :, _input_mask.astype(bool)]
# derive number of reconstruction windows covering the same value
partition_count = context // stepsz
# aggregate overlapping reconstruction windows to 1-d data
for ef in enumerate(dat.columns):
rec_arr = np.zeros([dat.shape[0], partition_count]).astype(float)
rec_arr[:] = np.nan
count_arr = rec_arr.copy()
# arange reconstruction windows in array, so that same array row refers to same reconstructed time
for s in range(rec_arr.shape[1]):
d = reconstruction[s::partition_count, ef[0]].squeeze().flatten()
offset = s * stepsz
d_cut = min(offset + len(d), rec_arr.shape[0]) - offset
rec_arr[offset : offset + len(d), s] = d[:d_cut]
count_arr[offset : offset + len(d), s] = np.abs(
np.arange(d_cut) % context - 0.5 * context
)
# aggregate the rows with selected aggregation
if agg == "center":
c_select = count_arr.argmin(axis=1)
rec = rec_arr[np.arange(len(c_select)), c_select]
else:
rec = getattr(np, "nan" + agg)(rec_arr, axis=1)
self._data[ef[1]] = pd.Series(rec, index=dat.index, name=ef[1])
return self
def _fitPolynomial(
data: DictOfSeries,
......
......@@ -55,6 +55,9 @@ setup(
"scipy",
"typing_extensions",
],
extras_require={
"FM": ["momentfm"],
},
license_files=("LICENSE.md", "LICENSES/GPL-3.0-or-later.txt"),
entry_points={
"console_scripts": ["saqc=saqc.__main__:main"],
......
#! /usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
# -*- coding: utf-8 -*-
#! /usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import pytest
from saqc import SaQC
@pytest.fixture
def data():
dat = pd.DataFrame(
{"d" + str(k): np.random.random(1000) for k in range(2)},
index=pd.date_range("2000", freq="10min", periods=1000),
)
dat.iloc[np.random.randint(0, 1000, 10), 0] = np.nan
return dat
@pytest.mark.parametrize("field", ["d0", ["d1", "d0"]])
@pytest.mark.parametrize("ratio", [2, 4])
@pytest.mark.parametrize("context", [512, 256])
def test_fitFMmoment(data, field, ratio, context):
qc = SaQC(data)
qc.fitMomentFM(field, ratio, context)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment