Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • berntm/saqc
  • rdm-software/saqc
  • schueler/saqc
3 results
Show changes
Showing
with 516 additions and 277 deletions
......@@ -20,8 +20,6 @@ from saqc.lib.ts_operators import (
polyRoller,
polyRollerIrregular,
polyRollerNoMissing,
polyRollerNoMissingNumba,
polyRollerNumba,
)
if TYPE_CHECKING:
......@@ -176,17 +174,13 @@ def _fitPolynomial(
fitted = to_fit.rolling(
pd.Timedelta(window), closed="both", min_periods=min_periods, center=True
).apply(polyRollerIrregular, args=(centers, order))
else:
else: # if regular
if isinstance(window, str):
window = pd.Timedelta(window) // regular
if window % 2 == 0:
window = int(window - 1)
if min_periods is None:
min_periods = window
if len(to_fit) < 200000:
numba = False
else:
numba = True
val_range = np.arange(0, window)
center_index = window // 2
......@@ -202,43 +196,20 @@ def _fitPolynomial(
miss_marker = np.floor(miss_marker - 1)
na_mask = to_fit.isna()
to_fit[na_mask] = miss_marker
if numba:
fitted = to_fit.rolling(window).apply(
polyRollerNumba,
args=(miss_marker, val_range, center_index, order),
raw=True,
engine="numba",
engine_kwargs={"no_python": True},
)
# due to a tiny bug - rolling with center=True doesnt work
# when using numba engine.
fitted = fitted.shift(-int(center_index))
else:
fitted = to_fit.rolling(window, center=True).apply(
polyRoller,
args=(miss_marker, val_range, center_index, order),
raw=True,
)
fitted = to_fit.rolling(window, center=True).apply(
polyRoller,
args=(miss_marker, val_range, center_index, order),
raw=True,
)
fitted[na_mask] = np.nan
else:
# we only fit fully populated intervals:
if numba:
fitted = to_fit.rolling(window).apply(
polyRollerNoMissingNumba,
args=(val_range, center_index, order),
engine="numba",
engine_kwargs={"no_python": True},
raw=True,
)
# due to a tiny bug - rolling with center=True doesnt work
# when using numba engine.
fitted = fitted.shift(-int(center_index))
else:
fitted = to_fit.rolling(window, center=True).apply(
polyRollerNoMissing,
args=(val_range, center_index, order),
raw=True,
)
fitted = to_fit.rolling(window, center=True).apply(
polyRollerNoMissing,
args=(val_range, center_index, order),
raw=True,
)
data[field] = fitted
worst = flags[field].rolling(window, center=True, min_periods=min_periods).max()
......
......@@ -10,6 +10,7 @@ from __future__ import annotations
import functools
import inspect
import warnings
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Tuple
import numpy as np
......@@ -22,7 +23,8 @@ from saqc import BAD
from saqc.core import DictOfSeries, Flags, flagging, register
from saqc.funcs.changepoints import _getChangePoints
from saqc.lib.docs import DOC_TEMPLATES
from saqc.lib.tools import detectDeviants, filterKwargs, toSequence
from saqc.lib.exceptions import ParameterOutOfBounds
from saqc.lib.tools import detectDeviants, filterKwargs, isInBounds, toSequence
from saqc.lib.ts_operators import expDriftModel, linearDriftModel
from saqc.lib.types import CurveFitter
......@@ -53,7 +55,7 @@ class DriftMixin:
def flagDriftFromNorm(
self: "SaQC",
field: Sequence[str],
freq: str,
window: str,
spread: float,
frac: float = 0.5,
metric: Callable[
......@@ -74,7 +76,7 @@ class DriftMixin:
Parameters
----------
freq :
window :
Frequency, that split the data in chunks.
spread :
......@@ -130,12 +132,25 @@ class DriftMixin:
Introduction to Hierarchical clustering:
[2] https://en.wikipedia.org/wiki/Hierarchical_clustering
"""
if not isInBounds(frac, (0.5, 1), closed="both"):
raise ParameterOutOfBounds(frac, "frac", (0.5, 1), "both")
if "freq" in kwargs:
warnings.warn(
"""
The parameter `freq` is deprecated and will be removed in version 3.0 of saqc.
Please us the parameter `window` instead.'
""",
DeprecationWarning,
)
window = kwargs["freq"]
fields = toSequence(field)
data = self._data[fields].to_pandas()
data.dropna(inplace=True)
segments = data.groupby(pd.Grouper(freq=freq))
segments = data.groupby(pd.Grouper(freq=window))
for segment in segments:
if len(segment[1]) <= 1:
continue
......
......@@ -277,6 +277,9 @@ class FlagtoolsMixin:
"""
Transfer Flags of one variable to another.
.. deprecated:: 2.4.0
Use :py:meth:`~saqc.SaQC.concatFlags` with ``method="match"`` and ``squeeze=False`` instead.
See Also
--------
* :py:meth:`saqc.SaQC.flagGeneric`
......
......@@ -144,6 +144,9 @@ class InterpolationMixin:
"""
Fill NaN and flagged values using an interpolation method.
.. deprecated:: 2.4.0
Use :py:meth:`~saqc.SaQC.align` instead.
Parameters
----------
method :
......@@ -399,6 +402,9 @@ class InterpolationMixin:
"""
Function to interpolate the data at regular (äquidistant) timestamps (or Grid points).
.. deprecated:: 2.4.0
Use :py:meth:`~saqc.SaQC.align` instead.
Parameters
----------
freq :
......@@ -471,6 +477,10 @@ class InterpolationMixin:
flag: float = UNFLAGGED,
**kwargs,
) -> "SaQC":
"""
.. deprecated:: 2.4.0
Use :py:meth:`~saqc.SaQC.interpolate` instead.
"""
warnings.warn(
f"""
The method `intepolateInvalid` is deprecated and will be removed
......
......@@ -12,7 +12,6 @@ import uuid
import warnings
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Tuple
import numba
import numpy as np
import numpy.polynomial.polynomial as poly
import pandas as pd
......@@ -24,6 +23,7 @@ from saqc import BAD, UNFLAGGED
from saqc.core import DictOfSeries, Flags, flagging, register
from saqc.funcs.scores import _univarScoring
from saqc.lib.docs import DOC_TEMPLATES
from saqc.lib.rolling import windowRoller
from saqc.lib.tools import getFreqDelta, isflagged, toSequence
if TYPE_CHECKING:
......@@ -741,14 +741,7 @@ class OutliersMixin:
# get invalid-raise/drop mask:
raise_series = dataseries.rolling(raise_window_td, min_periods=2, closed="both")
numba_boost = True
if numba_boost:
raise_check_boosted = numba.jit(raise_check, nopython=True)
raise_series = raise_series.apply(
raise_check_boosted, args=(thresh,), raw=True, engine="numba"
)
else:
raise_series = raise_series.apply(raise_check, args=(thresh,), raw=True)
raise_series = raise_series.apply(raise_check, args=(thresh,), raw=True)
if raise_series.isna().all():
return self
......@@ -789,21 +782,10 @@ class OutliersMixin:
weights_rolling_sum = weights.rolling(
average_window, min_periods=2, closed="both"
)
if numba_boost:
custom_rolling_mean_boosted = numba.jit(custom_rolling_mean, nopython=True)
weighted_rolling_mean = weighted_rolling_mean.apply(
custom_rolling_mean_boosted, raw=True, engine="numba"
)
weights_rolling_sum = weights_rolling_sum.apply(
custom_rolling_mean_boosted, raw=True, engine="numba"
)
else:
weighted_rolling_mean = weighted_rolling_mean.apply(
custom_rolling_mean, raw=True
)
weights_rolling_sum = weights_rolling_sum.apply(
custom_rolling_mean, raw=True, engine="numba"
)
weighted_rolling_mean = weighted_rolling_mean.apply(
custom_rolling_mean, raw=True
)
weights_rolling_sum = weights_rolling_sum.apply(custom_rolling_mean, raw=True)
weighted_rolling_mean = weighted_rolling_mean / weights_rolling_sum
# check means against critical raise value:
......@@ -852,6 +834,13 @@ class OutliersMixin:
----------
[1] https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm
"""
msg = """
The method `flagMAD` is deprecated and will be removed in verion 3.0 of saqc.
To achieve the same behavior use:
"""
call = f"qc.flagZScore(field={field}, window={window}, method='modified', thresh={z}, min_residuals={min_residuals}, min_periods={min_periods}, center={center})"
warnings.warn(f"{msg}`{call}`", DeprecationWarning)
self = self.flagZScore(
field,
......@@ -1226,68 +1215,53 @@ class OutliersMixin:
----------
[1] https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm
"""
warnings.warn(
"The method `flagCrossStatistics` will be deprecated in a future version of saqc",
PendingDeprecationWarning,
msg = """
The method `flagCrossStatistics` is deprecated and will be removed in verion 3.0 of saqc.
To achieve the same behavior use:
"""
new_method_string = {
"modZscore": "modified",
"Zscore": "standard",
np.mean: "standard",
np.median: "modified",
}
call = f"qc.flagZScore(field={field}, window=1, method={new_method_string[method]}, thresh={thresh}, axis=1)"
warnings.warn(f"{msg}`{call}`", DeprecationWarning)
return self.flagZScore(
field={field},
window=1,
method={new_method_string[method]},
thresh={thresh},
axis=1,
flag=flag,
)
fields = toSequence(field)
df = self._data[fields].to_pandas(how="inner")
if isinstance(method, str):
if method == "modZscore":
MAD_series = df.subtract(df.median(axis=1), axis=0).abs().median(axis=1)
diff_scores = (
(0.6745 * (df.subtract(df.median(axis=1), axis=0)))
.divide(MAD_series, axis=0)
.abs()
)
elif method == "Zscore":
diff_scores = (
df.subtract(df.mean(axis=1), axis=0)
.divide(df.std(axis=1), axis=0)
.abs()
)
else:
raise ValueError(method)
else:
try:
stat = getattr(df, method.__name__)(axis=1)
except AttributeError:
stat = df.aggregate(method, axis=1)
diff_scores = df.subtract(stat, axis=0).abs()
mask = diff_scores > thresh
if not mask.empty:
for f in fields:
m = mask[f].reindex(index=self._flags[f].index, fill_value=False)
self._flags[m, f] = flag
return self
@flagging()
@register(
mask=["field"],
demask=["field"],
squeeze=["field"],
multivariate=True,
docstring={"field": DOC_TEMPLATES["field"]},
)
def flagZScore(
self: "SaQC",
field: str,
field: Sequence[str],
method: Literal["standard", "modified"] = "standard",
window: str | int | None = None,
thresh: float = 3,
min_residuals: int | None = None,
min_periods: int | None = None,
model_func: Callable[[np.ndarray | pd.Series], float] = np.nanmean,
norm_func: Callable[[np.ndarray | pd.Series], float] = np.nanstd,
center: bool = True,
axis: int = 0,
flag: float = BAD,
**kwargs,
) -> "SaQC":
"""
Flag data where its (rolling) Zscore exceeds a threshold.
The function implements flagging derived from a basic Zscore calculation. To handle non
The function implements flagging derived from standard or modified Zscore calculation. To handle non
stationary data, the Zscoring can be applied with a rolling window. Therefor, the function
allows for a minimum residual to be specified in order to mitigate overflagging in local
regimes of low variance.
......@@ -1297,51 +1271,164 @@ class OutliersMixin:
Parameters
----------
window :
Size of the window. Either determined via an Offset String, denoting the windows temporal
Size of the window. Either determined via an offset string, denoting the windows temporal
extension or by an integer, denoting the windows number of periods. ``NaN`` also count as
periods. If ``None`` is passed, all data points share the same scoring window, which than
equals the whole data.
method :
Which method to use for ZScoring:
* `"standard"`: standard Zscoring, using *mean* for the expectation and *standard deviation (std)* as scaling factor
* `"modified"`: modified Zscoring, using *median* as the expectation and *median absolute deviation (MAD)* as the scaling Factor
See notes section for detailed scoring formula
thresh :
Cutoff level for the Zscores, above which associated points are marked as outliers.
min_residuals :
Minimum residual value points must have to be considered outliers.
min_periods :
Minimum number of valid meassurements in a scoring window, to consider the resulting score valid.
model_func : default mean
Function to calculate the center moment in every window.
norm_func : default std
Function to calculate the scaling for every window.
center :
Weather or not to center the target value in the scoring window. If ``False``, the
target value is the last value in the window.
axis :
Along which axis to calculate the scoring statistics:
* `0` (default) - calculate statistics along time axis
* `1` - calculate statistics over multiple variables
See Notes section for a visual clarification of the workings
of `axis` and `window`.
Notes
-----
Steps of calculation:
1. Consider a window :math:`W` of successive points :math:`W = x_{1},...x_{w}`
containing the value :math:`y_{K}` which is to be checked.
(The index of :math:`K` depends on the selection of the parameter :py:attr:`center`.)
2. The "moment" :math:`M` for the window gets calculated via :math:`M=` :py:attr:`model_func` :math:`(W)`.
3. The "scaling" :math:`N` for the window gets calculated via :math:`N=` :py:attr:`norm_func` :math:`(W)`.
4. The "score" :math:`S` for the point :math:`x_{k}` gets calculated via :math:`S=(x_{k} - M) / N`.
5. Finally, :math:`x_{k}` gets flagged, if :math:`|S| >` :py:attr:`thresh` and
:math:`|M - x_{k}| >=` :py:attr:`min_residuals`.
The flag for :math:`x` is determined as follows:
1. Depending on ``window`` and ``axis``, the context population :math:`X` is collected (see pictures below)
* If ``axis=0``, any value is flagged in the context of those values of the same variable (``field``), that are
in `window` range.
* If ``axis=1``, any value is flagged in the context of all values of all variables (``fields``), that are
in `window` range.
* If ``axis=0`` and ``window=1``, any value is flagged in the context of all values of all variables (``fields``),
that share the same timestamp.
.. figure:: /resources/images/ZscorePopulation.png
:class: with-border
2. Depending on ``method``, a score :math:`Z` is calculated for :math:`x` via :math:`Z = \\frac{|E(X) - X|}{S(X)}`
* ``method="standard"``: :math:`E(X)=mean(X)`, :math:`S(X)=std(X)`
* ``method="modified"``: :math:`E(X)=median(X)`, :math:`S(X)=MAD(X)`
3. :math:`x` is flagged, if :math:`Z >` ``thresh``
"""
datser = self._data[field]
if "norm_func" in kwargs or "model_func" in kwargs:
warnings.warn(
"Parameters norm_func and model_func are deprecated, use parameter method instead.\n"
'To model with mean and scale with standard deviation, use method="standard".\n'
'To model with median and scale with median absolute deviation (MAD) use method="modified".\n'
"Other/Custom model and scaling functions are not supported any more"
)
if (
"mean" in kwargs.get("model_func", "").__name__
or "std" in kwargs.get("norm_func", "").__name__
):
method = "standard"
elif (
"median" in kwargs.get("model_func", lambda x: x).__name__
or "median" in kwargs.get("norm_func", lambda x: x).__name__
):
method = "modified"
else:
raise ValueError(
"Support for scoring with functions not similar to either Zscore or modified Zscore is "
"not supported anymore"
)
dat = self._data[field].to_pandas(how="outer")
if min_residuals is None:
min_residuals = 0
score, model, _ = _univarScoring(
datser,
window=window,
norm_func=norm_func,
model_func=model_func,
center=center,
min_periods=min_periods,
)
to_flag = (score.abs() > thresh) & ((model - datser).abs() >= min_residuals)
self._flags[to_flag, field] = flag
if dat.empty:
return self
if min_periods is None:
min_periods = 0
if window is None:
if dat.notna().sum().sum() >= min_periods:
if method == "standard":
mod = pd.DataFrame(
{f: dat[f].mean() for f in dat.columns}, index=dat.index
)
norm = pd.DataFrame(
{f: dat[f].std() for f in dat.columns}, index=dat.index
)
else:
mod = pd.DataFrame(
{f: dat[f].median() for f in dat.columns}, index=dat.index
)
norm = pd.DataFrame(
{f: (dat[f] - mod[f]).abs().median() for f in dat.columns},
index=dat.index,
)
else:
return self
else: # window is not None
if axis == 0:
if method == "standard":
mod = dat.rolling(
window, center=center, min_periods=min_periods
).mean()
norm = dat.rolling(
window, center=center, min_periods=min_periods
).std()
else:
mod = dat.rolling(
window, center=center, min_periods=min_periods
).median()
norm = (
(mod - dat)
.abs()
.rolling(window, center=center, min_periods=min_periods)
.median()
)
else: # axis == 1:
if window == 1:
if method == "standard":
mod = dat.mean(axis=1)
norm = dat.std(axis=1)
else: # method == 'modified'
mod = dat.median(axis=1)
norm = (dat.subtract(mod, axis=0)).abs().median(axis=1)
else: # window > 1
if method == "standard":
mod = windowRoller(dat, window, "mean", min_periods, center)
norm = windowRoller(dat, window, "std", min_periods, center)
else: # method == 'modified'
mod = windowRoller(dat, window, "median", min_periods, center)
norm = windowRoller(
dat.subtract(mod, axis=0).abs(),
window,
"median",
min_periods,
center,
)
residuals = dat.subtract(mod, axis=0).abs()
score = residuals.divide(norm, axis=0)
to_flag = (score.abs() > thresh) & (residuals >= min_residuals)
for f in field:
self._flags[to_flag[f], f] = flag
return self
......
......@@ -47,10 +47,11 @@ class ResamplingMixin:
"""
A method to "regularize" data by interpolating linearly the data at regular timestamp.
A series of data is considered "regular", if it is sampled regularly (= having uniform sampling rate).
.. deprecated:: 2.4.0
Use :py:meth:`~saqc.SaQC.align` with ``method="linear"`` instead.
A series of data is considered "regular", if it is sampled regularly (= having uniform sampling rate).
Interpolated values will get assigned the worst flag within freq-range.
Note, that the data only gets interpolated at those (regular) timestamps, that have a valid (existing and
not-na) datapoint preceeding them and one succeeding them within freq range.
Regular timestamp that do not suffice this condition get nan assigned AND The associated flag will be of value
......@@ -61,6 +62,15 @@ class ResamplingMixin:
freq :
An offset string. The frequency of the grid you want to interpolate your data at.
"""
warnings.warn(
f"""
The method `shift` is deprecated and will be removed with version 2.6 of saqc.
To achieve the same behavior please use:
`qc.align(field={field}, freq={freq}. method="linear")`
""",
DeprecationWarning,
)
reserved = ["method", "order", "limit", "downgrade"]
kwargs = filterKwargs(kwargs, reserved)
return self.interpolateIndex(field, freq, "time", **kwargs)
......@@ -76,6 +86,9 @@ class ResamplingMixin:
"""
Shift data points and flags to a regular frequency grid.
.. deprecated:: 2.4.0
Use :py:meth:`~saqc.SaQC.align` instead.
Parameters
----------
freq :
......
......@@ -78,6 +78,9 @@ class RollingMixin:
"""
Calculate a rolling-window function on the data.
.. deprecated:: 2.4.0
Use :py:meth:`~saqc.SaQC.rolling` instead.
Note, that the data gets assigned the worst flag present in the original data.
Parameters
......
......@@ -198,21 +198,21 @@ class ToolsMixin:
def plot(
self: "SaQC",
field: str,
path: Optional[str] = None,
max_gap: Optional[str] = None,
history: Optional[Literal["valid", "complete"] | list] = "valid",
xscope: Optional[slice] = None,
phaseplot: Optional[str] = None,
store_kwargs: Optional[dict] = None,
path: str | None = None,
max_gap: str | None = None,
history: Literal["valid", "complete"] | list[str] | None = "valid",
xscope: slice | None = None,
phaseplot: str | None = None,
store_kwargs: dict | None = None,
ax: mpl.axes.Axes | None = None,
ax_kwargs: Optional[dict] = None,
ax_kwargs: dict | None = None,
dfilter: float = FILTER_NONE,
**kwargs,
) -> "SaQC":
"""
Plot data and flags or store plot to file.
There are two modes, 'interactive' and 'store', which are determind through the
There are two modes, 'interactive' and 'store', which are determined through the
``save_path`` keyword. In interactive mode (default) the plot is shown at runtime
and the program execution stops until the plot window is closed manually. In
store mode the generated plot is stored to disk and no manually interaction is
......@@ -227,36 +227,45 @@ class ToolsMixin:
the plot is stored unter the passed location.
max_gap :
If None, all the points in the data will be connected, resulting in long linear
lines, where continous chunks of data is missing. Nans in the data get dropped
before plotting. If an offset string is passed, only points that have a distance
below `max_gap` get connected via the plotting line.
If ``None``, all data points will be connected, resulting in long linear
lines, in case of large data gaps. ``NaN`` values will be removed before
plotting. If an offset string is passed, only points that have a distance
below ``max_gap`` are connected via the plotting line.
history :
Discriminate the plotted flags with respect to the tests they originate from.
* "valid" - Only plot those flags, that do not get altered or "unflagged" by subsequent tests. Only list tests
in the legend, that actually contributed flags to the overall resault.
* "complete" - plot all the flags set and list all the tests ran on a variable. Suitable for debugging/tracking.
* None - just plot the resulting flags for one variable, without any historical meta information.
* list of strings - plot only flags set by those tests listed.
* ``"valid"``: Only plot flags, that are not overwritten by subsequent tests.
Only list tests in the legend, that actually contributed flags to the overall
result.
* ``"complete"``: Plot all flags set and list all the tests executed on a variable.
Suitable for debugging/tracking.
* ``None``: Just plot the resulting flags for one variable, without any historical
and/or meta information.
* list of strings: List of tests. Plot flags from the given tests, only.
xscope :
Parameter, that determines a chunk of the data to be plotted
processed. `xscope` can be anything, that is a valid argument to the ``pandas.Series.__getitem__`` method.
Determine a chunk of the data to be plotted processed. ``xscope`` can be anything,
that is a valid argument to the ``pandas.Series.__getitem__`` method.
phaseplot :
If a string is passed, plot ``field`` in the phase space it forms together with the Variable ``phaseplot``.
If a string is passed, plot ``field`` in the phase space it forms together with the
variable ``phaseplot``.
ax :
If not ``None``, plot into the given ``matplotlib.Axes`` instance, instead of a
newly created ``matplotlib.Figure``. This option offers a possibility to integrate
``SaQC`` plots into custom figure layouts.
store_kwargs :
Keywords to be passed on to the ``matplotlib.pyplot.savefig`` method, handling
the figure storing. To store an pickle object of the figure, use the option
``{'pickle': True}``, but note that all other store_kwargs are ignored then.
Reopen with: ``pickle.load(open(savepath,'w')).show()``
``{"pickle": True}``, but note that all other ``store_kwargs`` are ignored then.
To reopen a pickled figure execute: ``pickle.load(open(savepath, "w")).show()``
ax_kwargs :
Axis keywords. Change the axis labeling defaults. Most important keywords:
'x_label', 'y_label', 'title', 'fontsize', 'cycleskip'.
``"xlabel"``, ``"ylabel"``, ``"title"``, ``"fontsize"``, ``"cycleskip"``.
"""
data, flags = self._data.copy(), self._flags.copy()
......
#! /usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import (
Any,
Callable,
Collection,
List,
Literal,
Sequence,
Tuple,
TypeVar,
Union,
)
CLOSURE_TO_NOTION = {
None: "interval ({}, {})",
"left": "right-open interval [{}, {})]",
"right": "left-open interval ({}, {}]",
"both": "closed interval [{}, {}]",
}
class ParameterOutOfBounds(Exception):
def __init__(
self,
value: int | float,
para_name: str,
bounds: Tuple[str],
closed: Literal["right", "left", "both"] = None,
):
Exception.__init__(self)
self.value = value
self.para_name = para_name
self.bounds = bounds
self.closed = closed
self.msg = "Parameter '{}' has to be in the {}, but {} was passed."
def __str__(self):
return self.msg.format(
self.para_name,
CLOSURE_TO_NOTION[self.closed].format(self.bounds[0], self.bounds[1]),
self.value,
)
......@@ -6,10 +6,73 @@ from __future__ import annotations
import functools
import math
from typing import Literal
import numpy as np
import pandas as pd
from saqc.lib.tools import getFreqDelta
def windowRoller(
data: pd.DataFrame,
window,
func: Literal["mean", "median", "std", "var", "sum"],
min_periods: int = 0,
center=True,
):
"""
pandas-rolling style computation with 2 dimensional windows, ranging over all df columns.
* implements efficient 2d rolling in case of regular timestamps or integer defined window
* else: dispatches to not optimized (no-numba) version in case of irregular timestamp
"""
supportedFuncs = ["mean", "median", "std", "var", "sum"]
if func not in supportedFuncs:
raise ValueError(f'"func" has to be one of {supportedFuncs}. Got {func}.')
func_kwargs = {}
if func in ["std", "var"]:
func_kwargs.update({"ddof": 1})
roll_func = getattr(np, "nan" + func)
regularFreq = getFreqDelta(data.index)
vals = data.values
if regularFreq is not None:
window = (
int(pd.Timedelta(window) / pd.Timedelta(regularFreq))
if isinstance(window, str)
else window
)
ramp = np.empty(((window - 1), vals.shape[1]))
ramp.fill(np.nan)
vals = np.concatenate([ramp, vals])
if center:
vals = np.roll(vals, axis=0, shift=-int(window / 2))
views = np.lib.stride_tricks.sliding_window_view(
vals, (window, vals.shape[1])
).squeeze()
result = roll_func(views, axis=(1, 2), **func_kwargs)
if min_periods > 0:
invalid_wins = (~np.isnan(views)).sum(axis=(1, 2)) < min_periods
result[invalid_wins] = np.nan
out = pd.Series(result, index=data.index, name="result")
else: # regularFreq is None
i_ser = pd.Series(range(data.shape[0]), index=data.index, name="result")
result = i_ser.rolling(window=window, center=center).apply(
raw=True,
func=lambda x: roll_func(
data.values[x.astype(int), :], axis=(0, 1), **func_kwargs
),
)
if min_periods > 0:
invalid_wins = (
i_ser.rolling(window=window, center=center).apply(
lambda x: (~np.isnan(data.values[x.astype(int), :])).sum()
)
) < min_periods
result[invalid_wins] = np.nan
out = result
return out
def removeRollingRamps(
data: pd.Series,
......
......@@ -10,9 +10,20 @@ from __future__ import annotations
import collections
import functools
import itertools
import operator as op
import re
import warnings
from typing import Any, Callable, Collection, List, Sequence, TypeVar, Union
from typing import (
Any,
Callable,
Collection,
List,
Literal,
Sequence,
Tuple,
TypeVar,
Union,
)
import numpy as np
import pandas as pd
......@@ -22,6 +33,40 @@ from scipy.cluster.hierarchy import fcluster, linkage
from saqc.lib.types import CompT
T = TypeVar("T", str, float, int)
BOUND_OPERATORS = {
None: (op.le, op.ge),
"both": (op.lt, op.gt),
"right": (op.le, op.gt),
"left": (op.gt, op.le),
}
def isInBounds(
val: int | float,
bounds: Tuple[int | float],
closed: Literal["left", "right", "both"] = None,
):
"""
check if val falls into the interval [left, right] and return boolean accordingly
val :
value to check
bounds :
Tuple containing left and right interval bounds. Pass `(a, b)` to define the interval [`a`, `b`].
Set `a=-inf` or `b=+inf` to set one sided restriction.
closed :
Enclosure includes the interval bounds into the constraint interval. By default, the bounds
are not included. Pass:
* `"both"`: to include both sides of the interval
* `"left"`: to include left bound
* `"right"`: to include right bound
"""
ops = BOUND_OPERATORS[closed]
if ops[0](val, bounds[0]) or ops[1](val, bounds[1]):
return False
return True
def assertScalar(name, value, optional=False):
......@@ -253,7 +298,6 @@ def estimateFrequency(
len_f = len(delta_f) * 2
min_energy = delta_f[0] * min_energy
# calc/assign low/high freq cut offs (makes life easier):
min_rate_i = int(
len_f / (pd.Timedelta(min_rate).total_seconds() * (10**delta_precision))
)
......@@ -383,7 +427,7 @@ def getFreqDelta(index: pd.Index) -> None | pd.Timedelta:
(``None`` will also be returned for pd.RangeIndex type.)
"""
delta = getattr(index, "freq", None)
delta = getattr(index, "window", None)
if delta is None and not index.empty:
i = pd.date_range(index[0], index[-1], len(index))
if i.equals(index):
......
......@@ -14,7 +14,6 @@ import sys
import warnings
from typing import Union
import numba as nb
import numpy as np
import numpy.polynomial.polynomial as poly
import pandas as pd
......@@ -209,7 +208,6 @@ def maxGap(in_arr):
return max(in_arr[0], max(np.diff(in_arr)))
@nb.njit
def _exceedConsecutiveNanLimit(arr, max_consec):
"""
Check if array has more consecutive NaNs than allowed.
......@@ -226,17 +224,13 @@ def _exceedConsecutiveNanLimit(arr, max_consec):
exceeded: bool
True if more than allowed consecutive NaNs appear, False otherwise.
"""
current = 0
idx = 0
while idx < arr.size:
while idx < arr.size and arr[idx]:
current += 1
idx += 1
if current > max_consec:
return True
current = 0
idx += 1
return False
s = arr.shape[0]
if s <= max_consec:
return False
views = np.lib.stride_tricks.sliding_window_view(
arr, window_shape=min(s, max_consec + 1)
)
return bool(views.all(axis=1).any())
def validationTrafo(data, max_nan_total, max_nan_consec):
......@@ -337,7 +331,7 @@ def interpolateNANs(data, method, order=2, gap_limit=2, extrapolate=None):
# helper variable for checking numerical value of gap limit, if its a numeric value (to avoid comparison to str)
gap_check = np.nan if isinstance(gap_limit, str) else gap_limit
data = pd.Series(data, copy=True)
limit_area = "inside" if not extrapolate else "outside"
limit_area = None if extrapolate else "inside"
if gap_check is None:
# if there is actually no limit set to the gaps to-be interpolated, generate a dummy mask for the gaps
gap_mask = pd.Series(True, index=data.index, name=data.name)
......@@ -538,61 +532,6 @@ def butterFilter(
return y
@nb.njit
def _coeffMat(x, deg):
# helper function to construct numba-compatible polynomial fit function
mat_ = np.zeros(shape=(x.shape[0], deg + 1))
const = np.ones_like(x)
mat_[:, 0] = const
mat_[:, 1] = x
if deg > 1:
for n in range(2, deg + 1):
mat_[:, n] = x**n
return mat_
@nb.jit(nopython=True)
def _fitX(a, b):
# helper function to construct numba-compatible polynomial fit function
# linalg solves ax = b
det_ = np.linalg.lstsq(a, b)[0]
return det_
@nb.jit(nopython=True)
def _fitPoly(x, y, deg):
# a numba compatible polynomial fit function
a = _coeffMat(x, deg)
p = _fitX(a, y)
# Reverse order so p[0] is coefficient of highest order
return p[::-1]
@nb.jit(nopython=True)
def evalPolynomial(P, x):
# a numba compatible polynomial evaluator
result = 0
for coeff in P:
result = x * result + coeff
return result
def polyRollerNumba(in_slice, miss_marker, val_range, center_index, poly_deg):
# numba compatible function to roll with when modelling data with polynomial model
miss_mask = in_slice == miss_marker
x_data = val_range[~miss_mask]
y_data = in_slice[~miss_mask]
fitted = _fitPoly(x_data, y_data, deg=poly_deg)
return evalPolynomial(fitted, center_index)
def polyRollerNoMissingNumba(in_slice, val_range, center_index, poly_deg):
# numba compatible function to roll with when modelling data with polynomial model -
# it is assumed, that in slice is an equidistant sample
fitted = _fitPoly(val_range, in_slice, deg=poly_deg)
return evalPolynomial(fitted, center_index)
def polyRoller(in_slice, miss_marker, val_range, center_index, poly_deg):
# function to roll with when modelling data with polynomial model
miss_mask = in_slice == miss_marker
......
......@@ -4,4 +4,4 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later
__version__ = "2.4.0"
__version__ = "2.4.1"
......@@ -39,7 +39,7 @@ setup(
"numpy",
"outlier-utils",
"pyarrow",
"pandas",
"pandas>=2.0.0",
"scikit-learn",
"scipy",
"typing_extensions",
......
......@@ -55,7 +55,7 @@ DMP = [
"2016-04-01 00:05:48,3573.0,NIL,,,32.685,NIL,,,nan,nan,nan,nan\n",
"2016-04-01 00:15:00,nan,nan,nan,nan,nan,nan,nan,nan,29.3157,NIL,,\n",
"2016-04-01 00:20:42,3572.0,NIL,,,32.7428,NIL,,,nan,nan,nan,nan\n",
'2016-04-01 00:30:00,nan,nan,nan,nan,nan,nan,nan,nan,29.3679,BAD,OTHER,"{""test"": ""flagMAD"", ""comment"": """"}"\n',
'2016-04-01 00:30:00,nan,nan,nan,nan,nan,nan,nan,nan,29.3679,BAD,OTHER,"{""test"": ""flagZScore"", ""comment"": """"}"\n',
"2016-04-01 00:35:37,3572.0,NIL,,,32.6186,NIL,,,nan,nan,nan,nan\n",
"2016-04-01 00:45:00,nan,nan,nan,nan,nan,nan,nan,nan,29.3679,NIL,,\n",
]
......
......@@ -88,7 +88,7 @@ def checkInvariants(data, flags, field, identical=True):
assert flags[field].dtype == float
# `pd.Index.identical` also check index attributes like `freq`
# `pd.Index.identical` also check index attributes like `window`
if identical:
assert data[field].index.identical(flags[field].index)
else:
......
......@@ -253,7 +253,7 @@ def test_flagDriftFromNorm(dat):
flags = initFlagsLike(data)
qc = SaQC(data, flags).flagDriftFromNorm(
field=fields,
freq="200min",
window="200min",
spread=5,
flag=BAD,
)
......
......@@ -32,7 +32,9 @@ def test_flagMad(spiky_data):
data = spiky_data[0]
field, *_ = data.columns
flags = initFlagsLike(data)
qc = SaQC(data, flags).flagMAD(field, "1H", flag=BAD)
qc = SaQC(data, flags).flagZScore(
field, window="1H", method="modified", thresh=3.5, flag=BAD
)
flag_result = qc.flags[field]
test_sum = (flag_result[spiky_data[1]] == BAD).sum()
assert test_sum == len(spiky_data[1])
......@@ -70,7 +72,6 @@ def test_flagSpikesLimitRaise(dat):
thresh=2,
freq="10min",
raise_window="20min",
numba_boost=False,
flag=BAD,
)
assert np.all(qc.flags[field][characteristics["raise"]] > UNFLAGGED)
......@@ -127,34 +128,35 @@ def test_grubbs(dat):
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_2")])
def test_flagCrossStatistics(dat):
data1, characteristics = dat(initial_level=0, final_level=0, out_val=0)
data2, characteristics = dat(initial_level=0, final_level=0, out_val=10)
fields = ["field1", "field2"]
s1, s2 = data1["data"], data2["data"]
s1 = pd.Series(data=s1.values, index=s1.index)
s2 = pd.Series(data=s2.values, index=s1.index)
data = DictOfSeries(field1=s1, field2=s2)
@pytest.mark.parametrize(
"parameters",
[("standard", 1), ("modified", 1), ("modified", 3), ("standard", "3h")],
)
def test_flagCrossStatistics(dat, parameters):
fields = [f"data{i}" for i in range(6)]
data = pd.DataFrame(
0, columns=fields, index=pd.date_range("2000", freq="1h", periods=10)
)
bad_idx = (np.random.randint(0, 10), np.random.randint(0, 6))
data.iloc[bad_idx[0], bad_idx[1]] = 10
flags = initFlagsLike(data)
qc = SaQC(data, flags).flagZScore(
fields, thresh=2, method=parameters[0], flag=BAD, axis=1, window=parameters[1]
)
with pytest.deprecated_call():
qc = SaQC(data, flags).flagCrossStatistics(
fields, thresh=3, method=np.mean, flag=BAD
)
for field in fields:
isflagged = qc.flags[field] > UNFLAGGED
assert isflagged[characteristics["raise"]].all()
isflagged = qc.flags.to_pandas() > UNFLAGGED
assert isflagged.iloc[bad_idx[0], bad_idx[1]]
assert isflagged.sum().sum() == 1
def test_flagZScores():
def test_flagZScoresUV():
np.random.seed(seed=1)
data = pd.Series(
[np.random.normal() for k in range(100)],
data = pd.DataFrame(
{"data": [np.random.normal() for k in range(100)]},
index=pd.date_range("2000", freq="1D", periods=100),
name="data",
)
data.iloc[[5, 80]] = 5
data.iloc[[40]] = -6
data.iloc[[5, 80], 0] = 5
data.iloc[[40], 0] = -6
qc = saqc.SaQC(data)
qc = qc.flagZScore("data", window=None)
......@@ -176,6 +178,39 @@ def test_flagZScores():
assert (qc.flags.to_pandas().iloc[[40, 80], 0] > 0).all()
def test_flagZScoresMV():
np.random.seed(seed=1)
data = pd.DataFrame(
{
"data": [np.random.normal() for k in range(100)],
"data2": [np.random.normal() for k in range(100)],
},
index=pd.date_range("2000", freq="1D", periods=100),
)
data.iloc[[5, 80], 0] = 5
data.iloc[[40], 0] = -6
data.iloc[[60], 1] = 10
qc = saqc.SaQC(data)
qc = qc.flagZScore(["data", "data2"], window=None)
assert (qc.flags.to_pandas().iloc[[5, 40, 80], 0] > 0).all()
assert (qc.flags.to_pandas().iloc[[60], 1] > 0).all()
qc = saqc.SaQC(data)
qc = qc.flagZScore("data", window=None, min_residuals=10)
assert (qc.flags.to_pandas()[["data", "data2"]] < 0).all().all()
qc = saqc.SaQC(data)
qc = qc.flagZScore(["data", "data2"], window="20D")
assert (qc.flags.to_pandas().iloc[[40, 80], 0] > 0).all()
qc = saqc.SaQC(data)
qc = qc.flagZScore("data", window=20)
assert (qc.flags.to_pandas().iloc[[40, 80], 0] > 0).all()
@pytest.mark.parametrize("n", [1, 10])
@pytest.mark.parametrize("p", [1, 2])
@pytest.mark.parametrize("thresh", ["auto", 2])
......@@ -185,10 +220,7 @@ def test_flagUniLOF(spiky_data, n, p, thresh):
qc = SaQC(data).flagUniLOF(field, n=n, p=p, thresh=thresh)
flag_result = qc.flags[field]
test_sum = (flag_result[spiky_data[1]] == BAD).sum()
try:
assert test_sum == len(spiky_data[1])
except AssertionError:
print("stop")
assert test_sum == len(spiky_data[1])
@pytest.mark.parametrize("vars", [1, 2, 3])
......
......@@ -45,7 +45,7 @@ F = False
(np.array([F, T, T, F, T, T, F]), 2, False),
],
)
def test__exceedConsecutiveNanLimit(arr, maxc, expected):
def test_exceedConsecutiveNanLimit(arr, maxc, expected):
result = tsops._exceedConsecutiveNanLimit(arr, maxc)
assert result is expected
......@@ -239,7 +239,7 @@ def test_rateOfChange(data, expected):
4,
"both",
[np.nan, 0, np.nan, np.nan, np.nan, 4, np.nan],
[np.nan, 0, 1, 2, 3, 4, np.nan],
[0, 0, 1, 2, 3, 4, 4],
),
(
None,
......@@ -253,7 +253,4 @@ def test_interpolatNANs(limit, extrapolate, data, expected):
got = tsops.interpolateNANs(
pd.Series(data), gap_limit=limit, method="linear", extrapolate=extrapolate
)
try:
assert got.equals(pd.Series(expected, dtype=float))
except AssertionError:
print("stop")
assert got.equals(pd.Series(expected, dtype=float))
#!/usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
# SPDX-License-Identifier: GPL-3.0-or-later