Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • berntm/saqc
  • rdm-software/saqc
  • schueler/saqc
3 results
Show changes
Showing
with 1210 additions and 144 deletions
......@@ -13,13 +13,11 @@ import numpy as np
import pandas as pd
from saqc import BAD, UNFLAGGED
from saqc.core import DictOfSeries, Flags, flagging, register
from saqc.core import flagging, register
from saqc.lib.checking import (
isInBounds,
validateCallable,
validateChoice,
validateMinPeriods,
validateValueBounds,
validateWindow,
)
......
......@@ -77,9 +77,8 @@ class ConstantsMixin:
starting_points_mask = removeRollingRamps(starting_points_mask, window=window)
# mimic forward rolling by roll over inverse [::-1]
rolling = starting_points_mask[::-1].rolling(
window=window, min_periods=min_periods
)
rolling = starting_points_mask[::-1].rolling(window=window, min_periods=0)
# mimic any()
mask = (rolling.sum()[::-1] > 0) & d.notna()
......
......@@ -7,7 +7,7 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING, Literal, Tuple, Union
from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union
import numpy as np
import pandas as pd
......@@ -19,7 +19,7 @@ from saqc.lib.checking import (
validateValueBounds,
validateWindow,
)
from saqc.lib.tools import getFreqDelta
from saqc.lib.tools import getFreqDelta, toSequence
from saqc.lib.ts_operators import (
butterFilter,
polyRoller,
......@@ -30,6 +30,10 @@ from saqc.lib.ts_operators import (
if TYPE_CHECKING:
from saqc import SaQC
DEFAULT_MOMENT = dict(
pretrained_model_name_or_path="AutonLab/MOMENT-1-large", revision="main"
)
FILL_METHODS = Literal[
"linear",
"nearest",
......@@ -44,6 +48,7 @@ FILL_METHODS = Literal[
class CurvefitMixin:
@register(mask=["field"], demask=[], squeeze=[])
def fitPolynomial(
self: "SaQC",
......@@ -151,6 +156,153 @@ class CurvefitMixin:
)
return self
@register(mask=["field"], demask=[], squeeze=[], multivariate=True)
def fitMomentFM(
self: "SaQC",
field: str | list[str],
ratio: int = 4,
context: int = 512,
agg: Literal["center", "mean", "median", "std"] = "mean",
model_spec: Optional[dict] = None,
**kwargs,
):
"""
Fits the data by reconstructing it with the Moment Foundational Timeseries Model (MomentFM).
The function applies MomentFM [1] in its reconstruction mode on a window of size `context`, striding through the data
with step size `context`/`ratio`
Parameters
----------
ratio :
The number of samples generated for any values reconstruction. Must be a divisor of `context`.
Effectively controlls the stride-width of the reconstruction window through the data.
context :
size of the context window with regard to wich any value is reconstructed.
agg :
How to aggregate the different reconstructions for the same value.
* 'center': use the value that was constructed in a window centering around the origin value
* 'mean': assign the mean over all reconstructed values
* 'median': assign the median over all reconstructed values
* 'std': assign the standard deviation over all reconstructed values
model_spec :
Dictionary with the fields:
* `pretrained_model_name_or_path`
* `revision`
Defaults to global Parameter `DEFAULT_MOMENT=dict(pretrained_model_name_or_path="AutonLab/MOMENT-1-large", revision="main"`
Examples
--------
.. figure:: /resources/images/fitFMpic.png
Notes
-----
[1] https://arxiv.org/abs/2402.03885
[2] https://github.com/moment-timeseries-foundation-model/moment
"""
_model_scope = 512
model_spec = DEFAULT_MOMENT if model_spec is None else model_spec
try:
import torch
from momentfm import MOMENTPipeline
from momentfm.data.informer_dataset import InformerDataset
from torch.utils.data import DataLoader
except ImportError as error_msg:
print("Foundational Timeseries Regressor requirements not sufficed:\n")
print(error_msg)
print(
'Install the Requirements manually or by pip installing "saqc" package with extras "FM" (pip install saqc[FM])'
)
if context > _model_scope:
raise ValueError(
f'Parameter "context" must not be greater {_model_scope}. Got {context}.'
)
if context % ratio > 0:
raise ValueError(
f'Parameter "ratio" must be a divisor of "context". Got context={context} and ratio={ratio} -> divides as: {context / ratio}.'
)
if agg not in ["mean", "median", "std", "center"]:
raise ValueError(
f'Parameter "agg" needs be one out of ["mean", "median", "std", "center"]. Got "agg": {agg}.'
)
field = toSequence(field)
dat = self._data[field].to_pandas()
# input mask, in case context is < 512
_input_mask = np.ones(_model_scope)
_input_mask[context:] = 0
# model instance
model = MOMENTPipeline.from_pretrained(
**model_spec,
model_kwargs={"task_name": "reconstruction"},
)
model.init()
# derive rec window stride width from ratio
stepsz = context // ratio
# na mask that will tell the model where data values are missing
_na_mask = ~(dat.isna().any(axis=1))
# generate sliding view on na mask equaling model input-patch size
na_mask = np.lib.stride_tricks.sliding_window_view(
(_na_mask).values, window_shape=_model_scope, axis=0
)
# generate stack of model input patches from the data
dv = np.lib.stride_tricks.sliding_window_view(
dat.values, window_shape=(_model_scope, len(dat.columns)), axis=(0, 1)
)
dv = np.swapaxes(dv, -1, -2).squeeze(1).astype("float32")
# filter input stacks to represent stepsz - sized reconstruction window stride
dv = dv[::stepsz]
na_mask = na_mask[::stepsz].astype(int)
# mask values to achieve truncated samples (if context < 512)
input_mask = np.ones(na_mask.shape)
input_mask[:, :] = _input_mask
# to torch
dv = torch.tensor(dv)
na_mask = torch.tensor(na_mask)
input_mask = torch.tensor(input_mask)
# get reconstruction for sample stack
output = model(x_enc=dv, mask=na_mask, input_mask=input_mask)
reconstruction = output.reconstruction.detach().cpu().numpy()
reconstruction = reconstruction[:, :, _input_mask.astype(bool)]
# derive number of reconstruction windows covering the same value
partition_count = context // stepsz
# aggregate overlapping reconstruction windows to 1-d data
for ef in enumerate(dat.columns):
rec_arr = np.zeros([dat.shape[0], partition_count]).astype(float)
rec_arr[:] = np.nan
count_arr = rec_arr.copy()
# arange reconstruction windows in array, so that same array row refers to same reconstructed time
for s in range(rec_arr.shape[1]):
d = reconstruction[s::partition_count, ef[0]].squeeze().flatten()
offset = s * stepsz
d_cut = min(offset + len(d), rec_arr.shape[0]) - offset
rec_arr[offset : offset + len(d), s] = d[:d_cut]
count_arr[offset : offset + len(d), s] = np.abs(
np.arange(d_cut) % context - 0.5 * context
)
# aggregate the rows with selected aggregation
if agg == "center":
c_select = count_arr.argmin(axis=1)
rec = rec_arr[np.arange(len(c_select)), c_select]
else:
rec = getattr(np, "nan" + agg)(rec_arr, axis=1)
self._data[ef[1]] = pd.Series(rec, index=dat.index, name=ef[1])
return self
def _fitPolynomial(
data: DictOfSeries,
......
......@@ -17,7 +17,6 @@ from typing_extensions import Literal
from saqc import BAD, FILTER_ALL, UNFLAGGED
from saqc.core import DictOfSeries, flagging, register
from saqc.core.flags import Flags
from saqc.core.history import History
from saqc.lib.checking import validateChoice, validateWindow
from saqc.lib.tools import (
......@@ -446,7 +445,7 @@ class FlagtoolsMixin:
fields, targets, broadcasting = multivariateParameters(field, target)
meta = {
"func": f"transferFlags",
"func": "transferFlags",
"args": (),
"kwargs": {
"field": field,
......@@ -631,7 +630,12 @@ class FlagtoolsMixin:
**kwargs,
) -> "SaQC":
"""
Flag all values, if all the given ``field`` values are already flagged.
Logical AND operation for Flags.
Flag the variable(s) `field` at every period, at wich `field` in all of the saqc objects in
`group` is flagged.
See Examples section for examples.
Parameters
----------
......@@ -639,6 +643,64 @@ class FlagtoolsMixin:
A collection of ``SaQC`` objects. Flag checks are performed on all ``SaQC`` objects
based on the variables specified in ``field``. Whenever all monitored variables
are flagged, the associated timestamps will receive a flag.
Examples
--------
Flag data, if the values are above a certain threshold (determined by :py:meth:`~saqc.SaQC.flagRange`) AND if the values are
constant for 3 periods (determined by :py:meth:`~saqc.SaQC.flagConstants`)
.. doctest:: andGroupExample
>>> dat = pd.Series([1,0,0,0,1,2,3,4,5,5,5,4], name='data', index=pd.date_range('2000', freq='10min', periods=12))
>>> qc = saqc.SaQC(dat)
>>> qc = qc.andGroup('data', group=[qc.flagRange('data', max=4), qc.flagConstants('data', thresh=0, window=3)])
>>> qc.flags['data']
2000-01-01 00:00:00 -inf
2000-01-01 00:10:00 -inf
2000-01-01 00:20:00 -inf
2000-01-01 00:30:00 -inf
2000-01-01 00:40:00 -inf
2000-01-01 00:50:00 -inf
2000-01-01 01:00:00 -inf
2000-01-01 01:10:00 -inf
2000-01-01 01:20:00 255.0
2000-01-01 01:30:00 255.0
2000-01-01 01:40:00 255.0
2000-01-01 01:50:00 -inf
Freq: 10min, dtype: float64
Masking data, so that a test result only gets assigned during daytime (between 6 and 18 o clock for example).
The daytime condition is generated via :py:meth:`~saqc.SaQC.flagGeneric`:
.. doctest:: andGroupExample
>>> from saqc.lib.tools import periodicMask
>>> mask_func = lambda x: ~periodicMask(x.index, '06:00:00', '18:00:00', True)
>>> dat = pd.Series(range(100), name='data', index=pd.date_range('2000', freq='4h', periods=100))
>>> qc = saqc.SaQC(dat)
>>> qc = qc.andGroup('data', group=[qc.flagRange('data', max=5), qc.flagGeneric('data', func=mask_func)])
>>> qc.flags['data'].head(20)
2000-01-01 00:00:00 -inf
2000-01-01 04:00:00 -inf
2000-01-01 08:00:00 -inf
2000-01-01 12:00:00 -inf
2000-01-01 16:00:00 -inf
2000-01-01 20:00:00 -inf
2000-01-02 00:00:00 -inf
2000-01-02 04:00:00 -inf
2000-01-02 08:00:00 255.0
2000-01-02 12:00:00 255.0
2000-01-02 16:00:00 255.0
2000-01-02 20:00:00 -inf
2000-01-03 00:00:00 -inf
2000-01-03 04:00:00 -inf
2000-01-03 08:00:00 255.0
2000-01-03 12:00:00 255.0
2000-01-03 16:00:00 255.0
2000-01-03 20:00:00 -inf
2000-01-04 00:00:00 -inf
2000-01-04 04:00:00 -inf
Freq: 4h, dtype: float64
"""
return _groupOperation(
saqc=self,
......@@ -666,7 +728,12 @@ class FlagtoolsMixin:
**kwargs,
) -> "SaQC":
"""
Flag all values, if at least one of the given ``field`` values is already flagged.
Logical OR operation for Flags.
Flag the variable(s) `field` at every period, at wich `field` is flagged in at least one of the saqc objects
in `group`.
See Examples section for examples.
Parameters
----------
......@@ -674,6 +741,32 @@ class FlagtoolsMixin:
A collection of ``SaQC`` objects. Flag checks are performed on all ``SaQC`` objects
based on the variables specified in :py:attr:`field`. Whenever any of monitored variables
is flagged, the associated timestamps will receive a flag.
Examples
--------
Flag data, if the values are above a certain threshold (determined by :py:meth:`~saqc.SaQC.flagRange`) OR if the values are
constant for 3 periods (determined by :py:meth:`~saqc.SaQC.flagConstants`)
.. doctest:: orGroupExample
>>> dat = pd.Series([1,0,0,0,0,2,3,4,5,5,7,8], name='data', index=pd.date_range('2000', freq='10min', periods=12))
>>> qc = saqc.SaQC(dat)
>>> qc = qc.orGroup('data', group=[qc.flagRange('data', max=5), qc.flagConstants('data', thresh=0, window=3)])
>>> qc.flags['data']
2000-01-01 00:00:00 -inf
2000-01-01 00:10:00 255.0
2000-01-01 00:20:00 255.0
2000-01-01 00:30:00 255.0
2000-01-01 00:40:00 255.0
2000-01-01 00:50:00 -inf
2000-01-01 01:00:00 -inf
2000-01-01 01:10:00 -inf
2000-01-01 01:20:00 -inf
2000-01-01 01:30:00 -inf
2000-01-01 01:40:00 255.0
2000-01-01 01:50:00 255.0
Freq: 10min, dtype: float64
"""
return _groupOperation(
saqc=self,
......
......@@ -267,7 +267,7 @@ class GenericMixin:
)
if not result.empty and not isAllBoolean(result):
raise TypeError(f"generic expression does not return a boolean array")
raise TypeError("generic expression does not return a boolean array")
# update flags & data
for i, col in enumerate(targets):
......
......@@ -7,7 +7,6 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Callable, Tuple, Union
import numpy as np
......@@ -18,8 +17,6 @@ from saqc import UNFLAGGED
from saqc.core import register
from saqc.core.history import History
from saqc.lib.checking import (
isValidChoice,
validateCallable,
validateChoice,
validateFuncSelection,
validateMinPeriods,
......
......@@ -13,16 +13,10 @@ from typing import TYPE_CHECKING, Callable, Literal
import numpy as np
import pandas as pd
from scipy.stats import median_abs_deviation
from saqc.constants import BAD
from saqc.core.register import flagging
from saqc.lib.checking import (
isCallable,
validateFuncSelection,
validateMinPeriods,
validateWindow,
)
from saqc.lib.checking import validateFuncSelection, validateMinPeriods, validateWindow
from saqc.lib.tools import isunflagged, statPass
from saqc.parsing.environ import ENV_OPERATORS
......
......@@ -10,13 +10,16 @@ from __future__ import annotations
import uuid
import warnings
from typing import TYPE_CHECKING, Callable, List, Optional, Sequence, Tuple
from collections import Counter
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Tuple
import numpy as np
import numpy.polynomial.polynomial as poly
import pandas as pd
from outliers import smirnov_grubbs # noqa, on pypi as outlier-utils
from scipy.stats import median_abs_deviation
from sklearn.cluster import k_means
from sklearn.metrics import silhouette_samples, silhouette_score
from typing_extensions import Literal
from saqc import BAD, UNFLAGGED
......@@ -27,7 +30,6 @@ from saqc.lib.checking import (
validateChoice,
validateFraction,
validateFrequency,
validateFuncSelection,
validateMinPeriods,
validateValueBounds,
validateWindow,
......@@ -40,6 +42,86 @@ if TYPE_CHECKING:
from saqc import SaQC
def _estimateCutoffProbability(probabilities: np.ndarray, thresh: float):
"""
Estimate a cutoff level for the outlier probabilities passed to `probabilities` with an iterative k-means aproach
"""
if isinstance(thresh, float) and (thresh < 1):
sup_num = max(4, int(abs(thresh * probabilities.shape[0])))
else:
sup_num = max(4, int(abs(thresh)))
vals = (-probabilities).nlargest(sup_num).values
vals = np.unique(vals).reshape(-1, 1)
km = k_means(vals, 2)
thresh = 1.0
for check in range(3, int(vals.shape[0]) + 1):
_km = k_means(vals, check)
if (_km[-1] - km[-1]) > -1:
cluster_hierarchy = km[0].argsort(axis=0).squeeze()
top_cluster = cluster_hierarchy[-1]
sec_cluster = cluster_hierarchy[-2]
top_select = km[1] == top_cluster
sec_select = km[1] == sec_cluster
topsec_select = top_select | sec_select
topsec_labels = km[1][topsec_select]
if topsec_select.sum() == 2:
continue
silhouette = silhouette_samples(vals[topsec_select], topsec_labels)
if top_select.sum() == 1:
silhouette[top_select] = 1.0
if sec_select.sum() == 1:
silhouette[sec_select] = 1.0
top_silhouette = silhouette[km[1][topsec_select] == top_cluster]
sil_mean = silhouette.mean()
min_select = top_silhouette >= sil_mean
if (min_select.sum() > 0) and (sil_mean > 0.6):
od_samples = vals[km[1] == top_cluster][min_select]
thresh = od_samples.min()
if od_samples.shape[0] > 10:
thresh = max(thresh, km[0].max() - 3 * od_samples.std())
break
km = _km
return thresh
def _stray(partition, min_periods, iter_start, alpha):
sample_size = len(partition)
flag_index = pd.Index([])
if len(partition) == 0 or sample_size < min_periods:
return flag_index
if isinstance(partition, np.ndarray):
idx = np.arange(len(partition))
else:
idx = partition.index
partition = partition.values
sorted_i = partition.argsort()
resids = partition[sorted_i]
gaps = np.append(0, np.diff(resids))
tail_size = int(max(min(np.floor(sample_size / 4), 50), 2))
tail_indices = np.arange(2, tail_size + 1)
i_start = int(max(np.floor(sample_size * iter_start), 1))
ghat = np.array([np.nan] * sample_size)
for i in range(i_start, sample_size):
ghat[i] = sum((tail_indices / (tail_size - 1)) * gaps[i - tail_indices + 1])
log_alpha = np.log(1 / alpha)
for iter_index in range(i_start, sample_size):
if gaps[iter_index] > log_alpha * ghat[iter_index]:
flag_index = idx[sorted_i[iter_index:]]
break
return flag_index
class OutliersMixin:
@staticmethod
def _validateLOF(algorithm, n, p, density):
......@@ -172,7 +254,9 @@ class OutliersMixin:
self: "SaQC",
field: str,
n: int = 20,
thresh: Literal["auto"] | float = 1.5,
thresh: float | Literal["auto"] | None = None,
probability: Optional[float] = None,
corruption: float | int | None = None,
algorithm: Literal["ball_tree", "kd_tree", "brute", "auto"] = "ball_tree",
p: int = 1,
density: Literal["auto"] | float = "auto",
......@@ -211,19 +295,23 @@ class OutliersMixin:
Higher values greatly increase numerical costs.
thresh :
The threshold for flagging the calculated LOF. A LOF of around
``1`` is considered normal and most likely corresponds to
inlier points. This parameter is considered the main calibration
parameter of the algorithm.
* The threshing defaults to ``1.5``, wich is the default value
found to be suitable in the literature.
* ``'auto'`` enables flagging the scores with a modified 3-sigma
rule, resulting in a thresh around ``4``, which usually
greatly mitigates overflagging compared to the literature
recommendation, but often is too high.
* sensitive range for the parameter may be ``[1,15]``, assuming
default settings for the other parameters.
Outlier Factor Cutoff.
* If given, compute local outlier factors and cut them off at value level
`thresh`, resulting in all values having factors assigned higher than the cutoff,
being flagged.
probability :
Outlier probability cutoff
* If given, compute local outlier probabilities and cut them off at value level
`probability`, resulting in all values with factors larger than the cutoff, being flagged.
corruption :
Portion of data that is anomalous. Can be given:
* as a portion of the data (`float in [0,1]`)
* as the total of corrupted samples (`int > 1`)
algorithm :
Algorithm used for calculating the :py:attr:`n`-nearest neighbors
......@@ -355,8 +443,12 @@ class OutliersMixin:
:ref:`introduction to outlier detection with saqc <cookbooks/OutlierDetection:Outlier Detection>`
"""
self._validateLOF(algorithm, n, p, density)
if thresh != "auto" and not isFloatLike(thresh):
raise ValueError(f"'thresh' must be 'auto' or a float, not {thresh}")
para_check = (thresh is None) + (probability is None) + (corruption is None)
if para_check < 2:
raise ValueError(
f'Only one (or none) of "thresh", "probability", "corruption" must be given, got: \n thresh: {thresh} \n probability: {probability} \n corruption: {corruption}'
)
tmp_field = str(uuid.uuid4())
qc = self.assignUniLOF(
......@@ -367,13 +459,23 @@ class OutliersMixin:
p=p,
density=density,
fill_na=fill_na,
statistical_extent=0.997 if thresh is None else 1,
)
s = qc.data[tmp_field]
if thresh == "auto":
_s = pd.concat([s, (-s - 2)])
s_mask = ((_s - _s.mean()) / _s.std()).iloc[: int(s.shape[0])].abs() > 3
else:
s_mask = s < -abs(thresh)
if para_check == 3:
corruption = int(-(len(self.data[field]) / max(n, 4)))
if thresh is not None:
if isinstance(thresh, str) and (thresh == "auto"):
_s = pd.concat([s, (-s - 2)])
s_mask = ((_s - _s.mean()) / _s.std()).iloc[: int(s.shape[0])].abs() > 3
else:
s_mask = s < -abs(thresh)
elif corruption is not None:
cutoff = _estimateCutoffProbability(probabilities=s, thresh=corruption)
s_mask = s <= -abs(cutoff)
elif probability is not None:
s_mask = s <= -abs(probability)
s_mask = ~isflagged(qc._flags[field], kwargs["dfilter"]) & s_mask
if slope_correct:
......@@ -402,15 +504,7 @@ class OutliersMixin:
down_slopes = (max_vals - eps <= last_vals.shift(1)) & (
min_vals + eps >= first_vals.shift(-1)
)
slopes = up_slopes | down_slopes
odd_return_pred = (max_vals > last_vals.shift(1)) & (
min_vals < last_vals.shift(1)
)
odd_return_succ = (max_vals > first_vals.shift(-1)) & (
min_vals < first_vals.shift(-1)
)
returns = odd_return_succ | odd_return_pred
corrections = returns | slopes
corrections = up_slopes | down_slopes
for s_id in corrections[corrections].index:
correct_idx = od_groups.get_group(s_id).index
s_mask[correct_idx] = False
......@@ -517,32 +611,8 @@ class OutliersMixin:
# calculate flags for every window
for _, partition in partitions:
sample_size = len(partition)
if partition.empty or sample_size < min_periods:
continue
sorted_i = partition.values.argsort()
resids = partition.values[sorted_i]
gaps = np.append(0, np.diff(resids))
tail_size = int(max(min(np.floor(sample_size / 4), 50), 2))
tail_indices = np.arange(2, tail_size + 1)
i_start = int(max(np.floor(sample_size * iter_start), 1) + 1)
ghat = np.array([np.nan] * sample_size)
for i in range(i_start - 1, sample_size):
ghat[i] = sum(
(tail_indices / (tail_size - 1)) * gaps[i - tail_indices + 1]
)
log_alpha = np.log(1 / alpha)
for iter_index in range(i_start - 1, sample_size):
if gaps[iter_index] > log_alpha * ghat[iter_index]:
index = partition.index[sorted_i[iter_index:]]
self._flags[index, field] = flag
break
index = _stray(partition, min_periods, iter_start, alpha)
self._flags[index, field] = flag
return self
......
This diff is collapsed.
......@@ -7,11 +7,10 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import re
import typing
import uuid
import warnings
from typing import TYPE_CHECKING, Callable, Union
from typing import TYPE_CHECKING, Callable
import numpy as np
import pandas as pd
......@@ -21,10 +20,9 @@ import saqc.constants
from saqc.constants import UNFLAGGED
from saqc.core import register
from saqc.core.history import History
from saqc.funcs.interpolation import DATA_REINDEXER
from saqc.lib.checking import validateChoice, validateFuncSelection
from saqc.lib.docs import DOC_TEMPLATES
from saqc.lib.tools import filterKwargs, getFreqDelta
from saqc.lib.tools import getFreqDelta
from saqc.lib.ts_operators import isValid
if TYPE_CHECKING:
......@@ -288,10 +286,10 @@ class ResamplingMixin:
done = True
if len(stack) == 0:
raise ValueError(f"Could not find no last reindexing to invert")
raise ValueError("Could not find no last reindexing to invert")
reindex_method = METHODINVERTS.get(stack[-1], False)
if reindex_method == False:
if reindex_method is False:
raise ValueError(f"cant invert {stack[-1]}")
return reindex_method
......
......@@ -7,7 +7,7 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING, Callable, Optional, Union
from typing import TYPE_CHECKING, Callable
import numpy as np
import pandas as pd
......
......@@ -12,13 +12,9 @@ from typing import TYPE_CHECKING, Callable, Union
import numpy as np
import pandas as pd
import saqc
from saqc.core import DictOfSeries, Flags, register
from saqc.lib.checking import (
validateCallable,
validateFuncSelection,
validateMinPeriods,
validateWindow,
)
from saqc.lib.checking import validateFuncSelection, validateMinPeriods, validateWindow
from saqc.lib.tools import getFreqDelta
if TYPE_CHECKING:
......@@ -26,11 +22,14 @@ if TYPE_CHECKING:
class RollingMixin:
@register(mask=["field"], demask=[], squeeze=[])
@register(
mask=["field"], demask=[], squeeze=[], multivariate=True, handles_target=True
)
def rolling(
self: "SaQC",
field: str,
field: str | list[str],
window: str | int,
target: str | list[str] = None,
func: Callable[[pd.Series], np.ndarray] | str = "mean",
min_periods: int = 0,
center: bool = True,
......@@ -39,7 +38,9 @@ class RollingMixin:
"""
Calculate a rolling-window function on the data.
Note, that the data gets assigned the worst flag present in the original data.
Note, that the new data gets assigned the worst flag present in the window it was aggregated from.
Note, That you also can select multiple fields to get a rolling calculation over those.
Parameters
----------
......@@ -58,21 +59,116 @@ class RollingMixin:
center :
If True, center the rolling window.
Notes
-----
.. figure:: /resources/images/horizontalAxisRollingExample.png
Rolling over multiple variables.
"""
# HINT: checking in _roll
self._data, self._flags = _roll(
data=self._data,
field=field,
flags=self._flags,
window=window,
func=func,
min_periods=min_periods,
center=center,
**kwargs,
)
if target and (len(target) > 1) and (len(field) != len(target)):
raise ValueError(
f"""If multiple targets are given, per-field application of rolling is conducted and the number of
fields has to equal the number of targets.\n Got: \n Fields={field} \n Targets={target}"""
)
if target and (len(field) > 1) and (len(target) == 1):
target = target[0]
if target not in self._data.columns:
self[target] = saqc.SaQC(
pd.Series(
np.nan, index=self.data[field].to_pandas().index, name=target
)
)
self._data, self._flags = _hroll(
data=self._data,
field=field,
flags=self._flags,
target=target,
window=window,
func=func,
min_periods=min_periods,
center=center,
)
else:
if target:
for ft in zip(field, target):
self = self.copyField(ft[0], target=ft[1], overwrite=True)
field = target
for f in field:
self._data, self._flags = _roll(
data=self._data,
field=f,
flags=self._flags,
window=window,
func=func,
min_periods=min_periods,
center=center,
**kwargs,
)
return self
def _hroll(
data: DictOfSeries,
field: str,
flags: Flags,
target: str,
window: str | int,
func: Callable[[pd.Series], np.ndarray] | str = "mean",
min_periods: int = 0,
center: bool = True,
**kwargs,
):
if isinstance(window, str):
freq = getFreqDelta(data[field].to_pandas().index)
if freq is None:
raise ValueError(
f"Rolling over more than one column is only supported if either the data has a unitary"
f'sampling rate, or window is an integer. "{window}" was passed and combined {field} '
f"index is not unitarily sampled"
)
else:
window = int(np.floor(pd.Timedelta(window) / freq))
views = np.lib.stride_tricks.sliding_window_view(
data[field].to_pandas(), (window, len(field))
)
f_views = np.lib.stride_tricks.sliding_window_view(
pd.DataFrame({f: flags[f] for f in field}), (window, len(field))
)
frame = pd.DataFrame(
views.reshape(views.shape[0], views.shape[1] * views.shape[2] * views.shape[3])
)
if isinstance(func, str) and hasattr(pd.DataFrame, func):
result = getattr(frame, func)(axis=1)
else:
result = frame.apply(func, axis=1)
insuff_periods_mask = ~(~frame.isna()).sum(axis=1) >= min_periods
result[insuff_periods_mask] = np.nan
f_result = f_views.max(axis=(2, 3)).squeeze()
d_out = pd.Series(np.nan, index=data[field].to_pandas().index)
d_out[window - 1 :] = result
if center:
d_out = d_out.shift(-int(np.floor(window / 2)))
f_out = pd.Series(np.nan, index=data[field].to_pandas().index)
f_out[window - 1 :] = f_result
if center:
f_out = f_out.shift(-int(np.floor(window / 2)))
data[target] = d_out
flags[target] = f_out
return data, flags
def _roll(
data: DictOfSeries,
field: str,
......
......@@ -7,14 +7,15 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Tuple, Union
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Tuple
import numpy as np
import pandas as pd
from scipy.spatial.distance import cdist
from scipy.special import erf, erfinv
from sklearn.neighbors import LocalOutlierFactor
from typing_extensions import Literal
from saqc import UNFLAGGED
from saqc.core import register
from saqc.lib.checking import (
validateChoice,
......@@ -25,7 +26,7 @@ from saqc.lib.checking import (
from saqc.lib.docs import DOC_TEMPLATES
from saqc.lib.tools import getApply, toSequence
from saqc.lib.ts_operators import kNN
from saqc.parsing.environ import ENV_OPERATORS, ENV_TRAFOS
from saqc.parsing.environ import ENV_OPERATORS
if TYPE_CHECKING:
from saqc import SaQC
......@@ -50,6 +51,43 @@ def _LOFApply(vals, n_neighbors, ret="scores", **kwargs):
return labels == -1
def _LOPApply(
vals,
n_neighbors,
ret="scores",
statistical_extent=0.997,
use_centroids=False,
**kwargs,
):
"""
Apply Local Outlier Probability algorithm, as presented in
Kriegel, H.-P.; Kröger, P.; Schubert, E.; Zimek, A. LoOP: local outlier probabilities. Proceedings
of the 18th ACM conference on Information and knowledge management 2009, 1649–1652.
"""
d, members = kNN(vals, n_neighbors, **kwargs)
members = np.concatenate([np.array([range(vals.shape[0])]).T, members], axis=1)
d = np.concatenate([np.array([[0] * vals.shape[0]]).T, d], axis=1)
if use_centroids:
centroids = vals[members].sum(axis=1) / (n_neighbors + 1)
for k in range(vals.shape[0]):
d[k] = cdist(centroids[k : k + 1], vals[members[k]])
standard_dist = np.sqrt(np.sum(d**2, axis=1) / (n_neighbors + 1))
lambda_val = np.sqrt(2) * erfinv(statistical_extent)
p_dist = lambda_val * standard_dist
exp_p_dist = p_dist[members].sum(axis=1) / (n_neighbors + 1)
plof = (p_dist / exp_p_dist) - 1
nplof = np.sqrt(np.sum(plof**2) / len(plof)) * lambda_val * np.sqrt(2)
scores = erf(plof / nplof)
scores = np.where(scores > 0, scores, 0)
if ret == "scores":
return -scores
else:
return scores == 0
def _groupedScoring(
val_frame: pd.DataFrame,
n: int = 20,
......@@ -437,10 +475,11 @@ class ScoresMixin:
p: int = 1,
density: Literal["auto"] | float = "auto",
fill_na: bool = True,
statistical_extent=1,
**kwargs,
) -> "SaQC":
"""
Assign "univariate" Local Outlier Factor (LOF).
Assign "univariate" Local Outlier Factor (LOF) or "inivariate" Local Outlier Probability (LOP)
The Function is a wrapper around a usual LOF implementation, aiming for an easy to use, parameter minimal
outlier scoring function for singleton variables, that does not necessitate prior modelling of the variable.
......@@ -483,11 +522,16 @@ class ScoresMixin:
Notes
-----
LOP: Kriegel, H.-P.; Kröger, P.; Schubert, E.; Zimek, A. LoOP: local outlier probabilities. Proceedings
of the 18th ACM conference on Information and knowledge management 2009, 1649–1652.
Algorithm steps for uniLOF flagging of variable `x`:
1. The temporal density `dt(x)` is calculated according to the `density` parameter.
2. LOF scores `LOF(x)` are calculated for the concatenation [`x`, `dt(x)`]
3. `x` is flagged where `LOF(x)` exceeds the threshold determined by the parameter `thresh`.
2. LOF (or LOP) scores `L(x)` are calculated for the concatenation [`x`, `dt(x)`]
3. `x` is flagged where `L(x)` exceeds the threshold determined by the parameter `thresh`.
Examples
--------
......@@ -507,13 +551,13 @@ class ScoresMixin:
filled = pd.Series(False, index=vals.index)
if density == "auto":
v_diff = vals.diff()
v_diff = (vals**p).diff()
density = v_diff.abs().median()
if density == 0:
density = v_diff[v_diff != 0].abs().median()
elif isinstance(density, Callable):
density = density(vals)
if isinstance(density, pd.Series):
elif isinstance(density, pd.Series):
density = density.values
d_var = pd.Series(np.arange(len(vals)) * density, index=vals.index)
......@@ -528,17 +572,29 @@ class ScoresMixin:
d_var = d_var.drop(na_idx, axis=0).values
vals = vals.drop(na_idx, axis=0).values
vals_extended = np.array(
list(vals[::-1][-n:]) + list(vals) + list(vals[::-1][:n])
)
d_extended = np.array(
list(d_var[:n] + (d_var[0] - d_var[n + 1]))
+ list(d_var)
+ list(d_var[-n:] + (d_var[-1] - d_var[-n - 1]))
vals_extended = np.pad(vals, n, mode="reflect")
d_extension = n * density
d_extended = np.pad(
d_var,
n,
mode="linear_ramp",
end_values=(d_var[0] - d_extension, d_var[-1] + d_extension),
)
LOF_vars = np.array([vals_extended, d_extended]).T
scores = _LOFApply(LOF_vars, n, p=p, algorithm=algorithm, metric="minkowski")
if statistical_extent == 1:
scores = _LOFApply(
LOF_vars, n, p=p, algorithm=algorithm, metric="minkowski"
)
else:
scores = _LOPApply(
LOF_vars,
n,
p=p,
statistical_extent=statistical_extent,
algorithm=algorithm,
metric="minkowski",
)
scores = scores[n:-n]
score_ser = pd.Series(scores, index=na_bool_ser.index[~na_bool_ser.values])
score_ser = score_ser.reindex(na_bool_ser.index)
......
......@@ -9,7 +9,6 @@ from __future__ import annotations
import pickle
import tkinter as tk
import warnings
from typing import TYPE_CHECKING
import matplotlib as mpl
......
......@@ -7,7 +7,7 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING, Callable, Optional, Union
from typing import TYPE_CHECKING, Callable
import numpy as np
import pandas as pd
......
......@@ -9,7 +9,6 @@
from __future__ import annotations
import itertools
from collections import OrderedDict
import matplotlib as mpl
import matplotlib.pyplot as plt
......
......@@ -434,12 +434,15 @@ def getApply(in_obj, apply_obj, attr_access="__name__", attr_or="apply") -> pd.S
"""
try:
out = getattr(in_obj, getattr(apply_obj, attr_access))()
access = getattr(apply_obj, attr_access)
if access.startswith("nan"):
access = access[3:]
out = getattr(in_obj, access)()
except AttributeError:
try:
# let's try to run it somewhat optimized
out = getattr(in_obj, attr_or)(apply_obj, raw=True)
except:
except Exception:
# did't work out, fallback
out = getattr(in_obj, attr_or)(apply_obj)
......
......@@ -9,10 +9,7 @@
"""
The module gathers all kinds of timeseries tranformations.
"""
import re
import sys
import warnings
from typing import Literal, Union
import numpy as np
import numpy.polynomial.polynomial as poly
......
......@@ -8,20 +8,12 @@
from __future__ import annotations
import abc
from typing import Any, Dict, TypeVar, Union
from typing import TypeVar, Union
import numpy as np
import pandas as pd
from typing_extensions import Protocol
__all__ = [
"T",
"ArrayLike",
"CurveFitter",
"ExternalFlag",
"OptionalNone",
]
T = TypeVar("T")
ArrayLike = TypeVar("ArrayLike", np.ndarray, pd.Series, pd.DataFrame)
......
......@@ -7,8 +7,6 @@
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import scipy.stats as st
import saqc.lib.ts_operators as ts_ops
from saqc import BAD, DOUBTFUL, FILTER_ALL, FILTER_NONE, GOOD, UNFLAGGED
......