Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • berntm/saqc
  • rdm-software/saqc
  • schueler/saqc
3 results
Show changes
Showing
with 1479 additions and 463 deletions
varname;test
#------;--------------------------
SM2 ;flagRange(min=-20, max=60)
SM2 ;flagMAD(window="30d", z=3.5)
SM2 ;flagZScore(window="30d", thresh=3.5, method='modified', center=False)
SM2 ;plot()
\ No newline at end of file
......@@ -2,8 +2,8 @@ varname;test
#------;--------------------------
SM1;flagRange(min=10, max=60)
SM2;flagRange(min=10, max=60)
SM1;flagMAD(window="15d", z=3.5)
SM2;flagMAD(window="30d", z=3.5)
SM1;flagZScore(window="15d", thresh=3.5, method='modified')
SM2;flagZScore(window="30d", thresh=3.5, method='modified')
SM1;plot(path='../resources/temp/SM1processingResults')
SM2;plot(path='../resources/temp/SM2processingResults')
docs/resources/images/ZscorePopulation.png

793 KiB

SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
SPDX-License-Identifier: GPL-3.0-or-later
\ No newline at end of file
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
[tool.isort]
......@@ -11,3 +10,14 @@ addopts = "--strict-markers"
testpaths = "tests"
markers = "slow: marks tests as slow (deselect with '-m \"not slow\"')"
[tool.versioneer]
VCS = "git"
style = "pep440-pre"
versionfile_source = "saqc/_version.py"
versionfile_build = "saqc/_version.py"
tag_prefix = "v"
parentdir_prefix = "saqc-"
[build-system]
requires = ["setuptools", "versioneer[toml]==0.29"]
build-backend = "setuptools.build_meta"
\ No newline at end of file
......@@ -2,16 +2,15 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later
Click==8.1.3
Click==8.1.7
docstring_parser==0.15
dtw==1.4.0
matplotlib==3.7.1
numba==0.57.0
numpy==1.24.3
outlier-utils==0.0.3
pyarrow==11.0.0
pandas==2.0.1
scikit-learn==1.2.2
scipy==1.10.1
matplotlib==3.7.2
numpy==1.25.2
outlier-utils==0.0.5
pyarrow==13.0.0
pandas==2.1.0
scikit-learn==1.3.0
scipy==1.11.2
typing_extensions==4.5.0
fancy-collections==0.2.1
......@@ -8,7 +8,12 @@
"""The System for automated Quality Control package."""
from saqc.constants import BAD, DOUBTFUL, FILTER_ALL, FILTER_NONE, GOOD, UNFLAGGED
from saqc.exceptions import ParsingError
from saqc.core import Flags, DictOfSeries, SaQC
from saqc.core.translation import DmpScheme, FloatScheme, PositionalScheme, SimpleScheme
from saqc.parsing.reader import fromConfig
from saqc.version import __version__
from . import _version
__version__ = _version.get_versions()["version"]
......@@ -6,6 +6,9 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
import logging
from functools import partial
from pathlib import Path
......@@ -17,17 +20,18 @@ import pyarrow as pa
from saqc.core import DictOfSeries
from saqc.core.core import TRANSLATION_SCHEMES
from saqc.parsing.reader import fromConfig
from saqc.parsing.reader import _ConfigReader
from saqc.version import __version__
logger = logging.getLogger("SaQC")
LOG_FORMAT = "[%(asctime)s][%(name)s][%(levelname)s]: %(message)s"
def _setupLogging(loglvl):
logger.setLevel(loglvl)
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s]: %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logging.basicConfig(level=loglvl, format=LOG_FORMAT)
def setupIO(nodata):
......@@ -66,12 +70,14 @@ def writeData(writer_dict, df, fname):
@click.command()
@click.version_option(__version__)
@click.option(
"-c",
"--config",
type=click.Path(),
required=True,
help="path to the configuration file",
help="Path to a configuration file. Use a '.json' extension to provide a JSON-"
"configuration. Otherwise files are treated as CSV.",
)
@click.option(
"-d",
......@@ -79,37 +85,66 @@ def writeData(writer_dict, df, fname):
type=click.Path(),
multiple=True,
required=True,
help="path to the data file",
help="Path to a data file.",
)
@click.option(
"-o", "--outfile", type=click.Path(exists=False), help="path to the output file"
"-o",
"--outfile",
type=click.Path(exists=False),
required=False,
help="Path to a output file.",
)
@click.option(
"--scheme",
default=None,
default="simple",
show_default=True,
type=click.Choice(tuple(TRANSLATION_SCHEMES.keys())),
help="the flagging scheme to use",
help="A flagging scheme to use.",
)
@click.option(
"--nodata", default=np.nan, help="Set a custom nodata value.", show_default=True
)
@click.option("--nodata", default=np.nan, help="nodata value")
@click.option(
"--log-level",
"-ll",
default="INFO",
show_default=True,
type=click.Choice(["DEBUG", "INFO", "WARNING"]),
help="set output verbosity",
help="Set log verbosity.",
)
def main(config, data, scheme, outfile, nodata, log_level):
@click.option(
"--json-field",
default=None,
help="Use the value from the given FIELD from the root object of a json file. The "
"value must hold a array of saqc tests. If the option is not given, a passed "
"JSON config is assumed to have an array of saqc tests as root element.",
)
def main(
config: str,
data: str,
scheme: str,
outfile: str,
nodata: str | float,
log_level: str,
json_field: str | None,
):
# data is always a list of data files
_setupLogging(log_level)
reader, writer = setupIO(nodata)
data = [readData(reader, f) for f in data]
saqc = fromConfig(
config,
data=data,
scheme=TRANSLATION_SCHEMES[scheme or "simple"](),
)
config = str(config)
cr = _ConfigReader(data=data, scheme=scheme)
if config.endswith("json"):
f = None
if json_field is not None:
f = lambda j: j[str(json_field)]
cr = cr.readJson(config, unpack=f)
else:
cr = cr.readCsv(config)
saqc = cr.run()
data_result = saqc.data.to_pandas()
flags_result = saqc.flags
......
This diff is collapsed.
......@@ -35,8 +35,8 @@ np.seterr(invalid="ignore")
TRANSLATION_SCHEMES = {
"float": FloatScheme,
"simple": SimpleScheme,
"float": FloatScheme,
"dmp": DmpScheme,
"positional": PositionalScheme,
}
......
......@@ -11,7 +11,7 @@ from typing import Any, Callable, Dict, List, Tuple
import numpy as np
import pandas as pd
from pandas.api.types import is_categorical_dtype, is_float_dtype
from pandas.api.types import is_float_dtype
from saqc import UNFLAGGED
......@@ -526,7 +526,9 @@ class History:
raise TypeError(
f"value must be of type pd.Series, got type {type(obj).__name__}"
)
if not is_float_dtype(obj.dtype) and not is_categorical_dtype(obj.dtype):
if not is_float_dtype(obj.dtype) and not isinstance(
obj.dtype, pd.CategoricalDtype
):
raise ValueError("dtype must be float or categorical")
return obj
......
#! /usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
# SPDX-License-Identifier: GPL-3.0-or-later
# -*- coding: utf-8 -*-
from __future__ import annotations
class ParsingError(RuntimeError):
pass
......@@ -25,6 +25,7 @@ import pandas as pd
from saqc import BAD, FILTER_ALL
from saqc.core import flagging, register
from saqc.funcs.changepoints import _getChangePoints
from saqc.lib.checking import validateMinPeriods, validateWindow
from saqc.lib.tools import isunflagged
if TYPE_CHECKING:
......@@ -82,7 +83,7 @@ class BreaksMixin:
group_window :
Maximum size of a data chunk to consider it a candidate for an isolated group.
Data chunks that are bigger than the ``group_window`` are ignored.
Data chunks that are bigger than the :py:attr:`group_window` are ignored.
This does not include the possible gaps surrounding it.
See condition (1).
......@@ -97,16 +98,18 @@ class BreaksMixin:
3. None of the :math:`x_j` with :math:`0 < t_j - t_(k+n) <` `gap_window`,
is valid (succeding gap).
"""
validateWindow(gap_window, name="gap_window", allow_int=False)
validateWindow(group_window, name="group_window", allow_int=False)
dat = self._data[field].dropna()
if dat.empty:
return self
gap_ends = dat.rolling(gap_window).count() == 1
gap_ends[0] = False
gap_ends.iloc[0] = False
gap_ends = gap_ends[gap_ends]
gap_starts = dat[::-1].rolling(gap_window).count()[::-1] == 1
gap_starts[-1] = False
gap_starts.iloc[-1] = False
gap_starts = gap_starts[gap_starts]
if gap_starts.empty:
return self
......@@ -140,11 +143,11 @@ class BreaksMixin:
"""
Flag jumps and drops in data.
Flag data where the mean of its values significantly changes (, where the data "jumps" from one value level to
another).
The changes in value level are detected by comparing the mean for two adjacently rolling windows.
Whenever the difference between the mean in the two windows exceeds `thresh`, the value between the windows
is flagged a jump.
Flag data where the mean of its values significantly changes (where the data "jumps" from one
value level to another).
Value changes are detected by comparing the mean for two adjacent rolling windows. Whenever
the difference between the mean in the two windows exceeds py:attr:`thresh`, the value between
the windows is flagged.
Parameters
----------
......@@ -152,22 +155,21 @@ class BreaksMixin:
Threshold value by which the mean of data has to jump, to trigger flagging.
window :
Size of the two moving windows. This determines the number of observations used
for calculating the mean in every window.
The window size should be big enough to yield enough samples for a reliable mean calculation,
but it should also not be arbitrarily big, since it also limits the density of jumps that can be detected.
More precisely: Jumps that are not distanced to each other by more than three fourth (3/4) of the
selected window size, will not be detected reliably.
Size of the two moving windows. This determines the number of observations used for
calculating the mean in every window. The window size should be big enough to yield enough
samples for a reliable mean calculation, but it should also not be arbitrarily big, since
it also limits the density of jumps that can be detected.
More precisely: Jumps that are not distanced to each other by more than three fourth (3/4)
of the selected py:attr:`window` size, will not be detected reliably.
min_periods :
The minimum number of observations in window required to calculate a valid
mean value.
The minimum number of observations in py:attr:`window` required to calculate a valid mean value.
Examples
--------
Below picture gives an abstract interpretation of the parameter interplay in case of a positive value jump,
initialising a new mean level.
Below picture gives an abstract interpretation of the parameter interplay in case of a positive
value jump, initialising a new mean level.
.. figure:: /resources/images/flagJumpsPic.png
......@@ -180,6 +182,9 @@ class BreaksMixin:
Jumps that are not distanced to each other by more than three fourth (3/4) of the
selected window size, will not be detected reliably.
"""
validateWindow(window, allow_int=False)
validateMinPeriods(min_periods)
mask = _getChangePoints(
data=self._data[field],
stat_func=lambda x, y: np.abs(np.mean(x) - np.mean(y)),
......
......@@ -7,15 +7,21 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import typing
from typing import TYPE_CHECKING, Callable, Tuple
from typing import TYPE_CHECKING, Callable, Literal, Tuple
import numba
import numpy as np
import pandas as pd
from saqc import BAD, UNFLAGGED
from saqc.core import DictOfSeries, Flags, flagging, register
from saqc.lib.checking import (
isInBounds,
validateCallable,
validateChoice,
validateMinPeriods,
validateValueBounds,
validateWindow,
)
if TYPE_CHECKING:
from saqc import SaQC
......@@ -87,6 +93,11 @@ class ChangepointsMixin:
The default reduction function just selects the value that maximizes the
`stat_func`.
"""
validateCallable(stat_func, "stat_func")
validateCallable(thresh_func, "thresh_func")
validateCallable(reduce_func, "reduce_func")
# Hint: windows are checked in _getChangePoints
mask = _getChangePoints(
data=self._data[field],
stat_func=stat_func,
......@@ -168,6 +179,11 @@ class ChangepointsMixin:
model_by_resids :
If True, the results of `stat_funcs` are written, otherwise the regime labels.
"""
validateCallable(stat_func, "stat_func")
validateCallable(thresh_func, "thresh_func")
validateCallable(reduce_func, "reduce_func")
# Hint: windows are checked in _getChangePoints
rtyp = "residual" if model_by_resids else "cluster"
cluster = _getChangePoints(
data=self._data[field],
......@@ -194,19 +210,44 @@ def _getChangePoints(
min_periods: int | Tuple[int, int],
reduce_window: str | None = None,
reduce_func: Callable[[np.ndarray, np.ndarray], float] = lambda x, _: x.argmax(),
result: typing.Literal["cluster", "residual", "mask"] = "mask",
result: Literal["cluster", "residual", "mask"] = "mask",
) -> pd.Series:
"""
TODO: missing docstring
Parameters
----------
data :
stat_func :
thresh_func :
window :
min_periods :
reduce_window :
reduce_func :
result :
Returns
-------
"""
validateChoice(result, "result", ["cluster", "residual", "mask"])
orig_index = data.index
data = data.dropna() # implicit copy
if isinstance(window, (list, tuple)):
bwd_window, fwd_window = window
validateWindow(fwd_window, name="window[0]", allow_int=False)
validateWindow(bwd_window, name="window[1]", allow_int=False)
else:
validateWindow(window, name="window", allow_int=False)
bwd_window = fwd_window = window
if isinstance(window, (list, tuple)):
if isinstance(min_periods, (list, tuple)):
bwd_min_periods, fwd_min_periods = min_periods
validateMinPeriods(bwd_min_periods, "min_periods[0]")
validateMinPeriods(fwd_min_periods, "min_periods[1]")
else:
validateMinPeriods(min_periods)
bwd_min_periods = fwd_min_periods = min_periods
if reduce_window is None:
......@@ -215,13 +256,7 @@ def _getChangePoints(
+ pd.Timedelta(fwd_window).total_seconds()
)
reduce_window = f"{s}s"
for window in [fwd_window, bwd_window, reduce_window]:
if isinstance(window, int):
raise TypeError(
"all parameter defining a size of a window "
"must be time-offsets, not integer."
)
validateWindow(reduce_window, name="reduce_window", allow_int=False)
# find window bounds arrays..
num_index = pd.Series(range(len(data)), index=data.index, dtype=int)
......@@ -245,31 +280,8 @@ def _getChangePoints(
check_len = len(fwd_end)
data_arr = data.values
# Please keep this as I sometimes need to disable jitting manually
# to make it work with my debugger :/
# --palmb
try_to_jit = True
if try_to_jit:
jit_sf = numba.jit(stat_func, nopython=True)
jit_tf = numba.jit(thresh_func, nopython=True)
try:
jit_sf(
data_arr[bwd_start[0] : bwd_end[0]], data_arr[fwd_start[0] : fwd_end[0]]
)
jit_tf(
data_arr[bwd_start[0] : bwd_end[0]], data_arr[fwd_start[0] : fwd_end[0]]
)
stat_func = jit_sf
thresh_func = jit_tf
except (numba.TypingError, numba.UnsupportedError, IndexError):
try_to_jit = False
args = data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, check_len
if try_to_jit:
stat_arr, thresh_arr = _slidingWindowSearchNumba(*args)
else:
stat_arr, thresh_arr = _slidingWindowSearch(*args)
stat_arr, thresh_arr = _slidingWindowSearch(*args)
result_arr = stat_arr > thresh_arr
......@@ -280,24 +292,23 @@ def _getChangePoints(
det_index = masked_index[result_arr]
detected = pd.Series(True, index=det_index)
if reduce_window:
length = len(detected)
# find window bounds arrays
num_index = pd.Series(range(length), index=detected.index, dtype=int)
rolling = num_index.rolling(window=reduce_window, closed="both", center=True)
start = rolling.min().to_numpy(dtype=int)
end = (rolling.max() + 1).to_numpy(dtype=int)
detected = _reduceCPCluster(
stat_arr[result_arr],
thresh_arr[result_arr],
start,
end,
reduce_func,
length,
)
det_index = det_index[detected]
length = len(detected)
# find window bounds arrays
num_index = pd.Series(range(length), index=detected.index, dtype=int)
rolling = num_index.rolling(window=reduce_window, closed="both", center=True)
start = rolling.min().to_numpy(dtype=int)
end = (rolling.max() + 1).to_numpy(dtype=int)
detected = _reduceCPCluster(
stat_arr[result_arr],
thresh_arr[result_arr],
start,
end,
reduce_func,
length,
)
det_index = det_index[detected]
# The changepoint is the point "after" the change.
# So the detected index has to be shifted by one
......@@ -324,20 +335,6 @@ def _getChangePoints(
)
@numba.jit(parallel=True, nopython=True)
def _slidingWindowSearchNumba(
data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, num_val
):
stat_arr = np.zeros(num_val)
thresh_arr = np.zeros(num_val)
for win_i in numba.prange(0, num_val - 1):
x = data_arr[bwd_start[win_i] : split[win_i]]
y = data_arr[split[win_i] : fwd_end[win_i]]
stat_arr[win_i] = stat_func(x, y)
thresh_arr[win_i] = thresh_func(x, y)
return stat_arr, thresh_arr
def _slidingWindowSearch(
data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, num_val
):
......@@ -353,7 +350,7 @@ def _slidingWindowSearch(
def _reduceCPCluster(stat_arr, thresh_arr, start, end, obj_func, num_val):
out_arr = np.zeros(shape=num_val, dtype=bool)
for win_i in numba.prange(0, num_val):
for win_i in range(num_val):
s, e = start[win_i], end[win_i]
x = stat_arr[s:e]
y = thresh_arr[s:e]
......
......@@ -16,6 +16,7 @@ import pandas as pd
from saqc import BAD
from saqc.core import flagging
from saqc.lib.checking import validateMinPeriods, validateValueBounds, validateWindow
from saqc.lib.rolling import removeRollingRamps
from saqc.lib.tools import getFreqDelta, statPass
from saqc.lib.ts_operators import varQC
......@@ -50,30 +51,22 @@ class ConstantsMixin:
thresh :
Maximum total change allowed per window.
min_periods :
Minimum number of observations in window required to generate
a flag. Must be an integer greater or equal `2`, because a
single value would always be considered constant.
Defaults to `2`.
window :
Size of the moving window. This is the number of observations used
for calculating the statistic. Each window will be a fixed size.
If its an offset then this will be the time period of each window.
If it is an offset then this will be the time period of each window.
Each window will be a variable sized based on the observations included
in the time-period.
"""
if not isinstance(window, (str, int)):
raise TypeError("window must be offset string or int.")
d: pd.Series = self._data[field]
if not isinstance(window, int) and not pd.api.types.is_datetime64_any_dtype(
d.index
):
raise ValueError(
f"A time based value for 'window' is only possible for variables "
f"with a datetime based index, but variable '{field}' has an index "
f"of dtype {d.index.dtype}. Use an integer window instead."
)
# min_periods=2 ensures that at least two non-nan values are present
# in each window and also min() == max() == d[i] is not possible.
min_periods = max(min_periods, 2)
validateWindow(window, index=d.index)
validateMinPeriods(min_periods, minimum=2, optional=False)
# 1. find starting points of consecutive constant values as a boolean mask
# 2. fill the whole window with True's
......@@ -133,21 +126,27 @@ class ConstantsMixin:
maxna_group :
Same as `maxna` but for consecutive NaNs.
"""
dataseries = self._data[field]
delta = getFreqDelta(dataseries.index)
d: pd.Series = self._data[field]
validateWindow(window, allow_int=False, index=d.index)
window = pd.Timedelta(window)
delta = getFreqDelta(d.index)
if not delta:
raise IndexError("Timeseries irregularly sampled!")
if maxna is None:
maxna = np.inf
if maxna_group is None:
maxna_group = np.inf
validateValueBounds(maxna, "maxna", 0, closed="both", strict_int=True)
validateValueBounds(
maxna_group, "maxna_group", 0, closed="both", strict_int=True
)
min_periods = int(np.ceil(pd.Timedelta(window) / pd.Timedelta(delta)))
window = pd.Timedelta(window)
to_set = statPass(
dataseries,
d,
lambda x: varQC(x, maxna, maxna_group),
window,
thresh,
......
......@@ -7,21 +7,24 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING, Tuple, Union
from typing import TYPE_CHECKING, Literal, Tuple, Union
import numpy as np
import pandas as pd
from typing_extensions import Literal
from saqc.core import DictOfSeries, Flags, register
from saqc.lib.tools import getFreqDelta
from saqc.lib.checking import (
validateChoice,
validateMinPeriods,
validateValueBounds,
validateWindow,
)
from saqc.lib.tools import extractLiteral, getFreqDelta
from saqc.lib.ts_operators import (
butterFilter,
polyRoller,
polyRollerIrregular,
polyRollerNoMissing,
polyRollerNoMissingNumba,
polyRollerNumba,
)
if TYPE_CHECKING:
......@@ -91,6 +94,9 @@ class CurvefitMixin:
Passing 0, disables the feature and will result in over-fitting for too
sparse windows.
"""
validateWindow(window)
validateMinPeriods(min_periods)
validateValueBounds(order, "order", left=0, strict_int=True)
self._data, self._flags = _fitPolynomial(
data=self._data,
field=field,
......@@ -131,10 +137,10 @@ class CurvefitMixin:
Fill method to be applied on the data before filtering (butterfilter cant
handle ''np.nan''). See documentation of pandas.Series.interpolate method for
details on the methods associated with the different keywords.
filter_type :
The type of filter. Default is ‘lowpass’.
"""
validateValueBounds(filter_order, "filter_order", strict_int=True)
validateChoice(fill_method, fill_method, FILL_METHODS)
self._data[field] = butterFilter(
self._data[field],
cutoff=cutoff,
......@@ -156,6 +162,11 @@ def _fitPolynomial(
**kwargs,
) -> Tuple[DictOfSeries, Flags]:
# TODO: some (rather large) parts are functional similar to saqc.funcs.rolling.roll
validateWindow(window)
validateValueBounds(order, "order", 0, strict_int=True)
validateMinPeriods(min_periods)
if data[field].empty:
return data, flags
......@@ -176,17 +187,13 @@ def _fitPolynomial(
fitted = to_fit.rolling(
pd.Timedelta(window), closed="both", min_periods=min_periods, center=True
).apply(polyRollerIrregular, args=(centers, order))
else:
else: # if regular
if isinstance(window, str):
window = pd.Timedelta(window) // regular
if window % 2 == 0:
window = int(window - 1)
if min_periods is None:
min_periods = window
if len(to_fit) < 200000:
numba = False
else:
numba = True
val_range = np.arange(0, window)
center_index = window // 2
......@@ -202,43 +209,20 @@ def _fitPolynomial(
miss_marker = np.floor(miss_marker - 1)
na_mask = to_fit.isna()
to_fit[na_mask] = miss_marker
if numba:
fitted = to_fit.rolling(window).apply(
polyRollerNumba,
args=(miss_marker, val_range, center_index, order),
raw=True,
engine="numba",
engine_kwargs={"no_python": True},
)
# due to a tiny bug - rolling with center=True doesnt work
# when using numba engine.
fitted = fitted.shift(-int(center_index))
else:
fitted = to_fit.rolling(window, center=True).apply(
polyRoller,
args=(miss_marker, val_range, center_index, order),
raw=True,
)
fitted = to_fit.rolling(window, center=True).apply(
polyRoller,
args=(miss_marker, val_range, center_index, order),
raw=True,
)
fitted[na_mask] = np.nan
else:
# we only fit fully populated intervals:
if numba:
fitted = to_fit.rolling(window).apply(
polyRollerNoMissingNumba,
args=(val_range, center_index, order),
engine="numba",
engine_kwargs={"no_python": True},
raw=True,
)
# due to a tiny bug - rolling with center=True doesnt work
# when using numba engine.
fitted = fitted.shift(-int(center_index))
else:
fitted = to_fit.rolling(window, center=True).apply(
polyRollerNoMissing,
args=(val_range, center_index, order),
raw=True,
)
fitted = to_fit.rolling(window, center=True).apply(
polyRollerNoMissing,
args=(val_range, center_index, order),
raw=True,
)
data[field] = fitted
worst = flags[field].rolling(window, center=True, min_periods=min_periods).max()
......
......@@ -22,9 +22,15 @@ from typing_extensions import Literal
from saqc import BAD
from saqc.core import DictOfSeries, Flags, flagging, register
from saqc.funcs.changepoints import _getChangePoints
from saqc.lib.checking import (
validateCallable,
validateChoice,
validateFrequency,
validateValueBounds,
validateWindow,
)
from saqc.lib.docs import DOC_TEMPLATES
from saqc.lib.exceptions import ParameterOutOfBounds
from saqc.lib.tools import detectDeviants, filterKwargs, isInBounds, toSequence
from saqc.lib.tools import detectDeviants, filterKwargs, toSequence
from saqc.lib.ts_operators import expDriftModel, linearDriftModel
from saqc.lib.types import CurveFitter
......@@ -36,7 +42,7 @@ LinkageString = Literal[
"single", "complete", "average", "weighted", "centroid", "median", "ward"
]
MODELDICT = {"linear": linearDriftModel, "exponential": expDriftModel}
DRIFT_MODELS = {"linear": linearDriftModel, "exponential": expDriftModel}
def cityblock(x: np.ndarray | pd.Series, y: np.ndarray | pd.Series) -> np.ndarray:
......@@ -55,7 +61,7 @@ class DriftMixin:
def flagDriftFromNorm(
self: "SaQC",
field: Sequence[str],
window: str,
window: str, # TODO: this should be named 'freq'
spread: float,
frac: float = 0.5,
metric: Callable[
......@@ -132,19 +138,20 @@ class DriftMixin:
Introduction to Hierarchical clustering:
[2] https://en.wikipedia.org/wiki/Hierarchical_clustering
"""
if not isInBounds(frac, (0.5, 1), closed="both"):
raise ParameterOutOfBounds(frac, "frac", (0.5, 1), "both")
validateValueBounds(frac, "frac", left=0, right=1, closed="both")
validateCallable(metric, "metric")
validateChoice(method, "method", LinkageString)
if "freq" in kwargs:
warnings.warn(
"""
The parameter `freq` is deprecated and will be removed in version 3.0 of saqc.
Please us the parameter `window` instead.'
""",
"The parameter `freq` is deprecated and will be "
"removed in version 2.7 of saqc. Please us the "
"parameter `window` instead.'",
DeprecationWarning,
)
window = kwargs["freq"]
validateFrequency(window, "window")
fields = toSequence(field)
data = self._data[fields].to_pandas()
......@@ -211,9 +218,10 @@ class DriftMixin:
default, since it corresponds to the averaged value distance, two data sets have (as opposed
by euclidean, for example).
"""
validateFrequency(freq, "freq")
validateCallable(metric, "metric")
fields = toSequence(field)
if reference not in fields:
fields.append(reference)
......@@ -314,13 +322,14 @@ class DriftMixin:
``expDriftModel`` and ``linearDriftModel``.
"""
# extract model func:
if isinstance(model, str):
if model not in MODELDICT:
model = DRIFT_MODELS.get(model, None)
if model is None:
raise ValueError(
f"invalid model '{model}', choose one of '{MODELDICT.keys()}'"
f"unknown model {model!r}, available models: {list(DRIFT_MODELS)}"
)
model = MODELDICT[model]
validateCallable(model, "model")
validateValueBounds(cal_range, "cal_range", left=0, strict_int=True)
# 1: extract fit intervals:
if self._data[maintenance_field].empty:
......@@ -403,14 +412,19 @@ class DriftMixin:
tolerance :
If an offset string is passed, a data chunk of length `offset` right at the
start and right at the end is ignored when fitting the model. This is to account
for the unreliability of data near the changepoints of regimes.
start and right at the end is ignored when fitting the model. This is to
account for the unreliability of data near the changepoints of regimes.
Defaults to None.
epoch :
If True, use "seconds from epoch" as x input to the model func, instead of
"seconds from regime start".
"""
validateCallable(model, "model")
if tolerance is not None:
validateWindow(tolerance, name="tolerance", allow_int=False)
cluster_ser = self._data[cluster_field]
unique_successive = pd.unique(cluster_ser.values)
data_ser = self._data[field]
......@@ -419,7 +433,6 @@ class DriftMixin:
x_dict = {}
x_mask = {}
if tolerance is not None:
# get seconds
tolerance = pd.Timedelta(tolerance).total_seconds()
for label, regime in regimes:
if epoch is False:
......@@ -512,33 +525,34 @@ class DriftMixin:
This is to account for the unrelyability of data near the changepoints of regimes.
"""
# Hint: This whole function does not set any flags
# Hint:
# - This whole function does not set any flags
# - Checking is delegated to the called functions
cluster_field = field + "_CPcluster"
self = self.copyField(field, cluster_field)
self.data[cluster_field] = _getChangePoints(
data=self._data[cluster_field],
qc = self.copyField(field, cluster_field)
qc.data[cluster_field] = _getChangePoints(
data=qc._data[cluster_field],
stat_func=lambda x, y: np.abs(np.mean(x) - np.mean(y)),
thresh_func=lambda x, y: max_jump,
window=window,
min_periods=min_periods,
result="cluster",
)
self._data, self._flags = _assignRegimeAnomaly(
data=self._data,
qc._data, qc._flags = _assignRegimeAnomaly(
data=qc._data,
field=field,
flags=self._flags,
flags=qc._flags,
cluster_field=cluster_field,
spread=spread,
)
self = self.correctRegimeAnomaly(
qc = qc.correctRegimeAnomaly(
field,
cluster_field,
lambda x, p1: np.array([p1] * x.shape[0]),
tolerance=tolerance,
)
self = self.dropField(cluster_field)
return self
qc = qc.dropField(cluster_field)
return qc
@flagging()
def flagRegimeAnomaly(
......@@ -581,16 +595,21 @@ class DriftMixin:
method :
The linkage method for hierarchical (agglomerative) clustering of the variables.
metric : default absolute difference of means
metric :
A metric function for calculating the dissimilarity between 2 regimes.
Defaults to the difference in mean.
Defaults to the absolute difference in mean.
frac :
Has to be in [0,1]. Determines the minimum percentage of samples,
the "normal" group has to comprise to be the normal group actually.
The minimum percentage of samples, the "normal" group has to comprise to
actually be the normal group. Must be in the closed interval `[0,1]`,
otherwise a ValueError is raised.
"""
reserverd = ["set_cluster", "set_flags"]
kwargs = filterKwargs(kwargs, reserverd)
validateChoice(method, "method", LinkageString)
validateCallable(metric, "metric")
validateValueBounds(frac, "frac", left=0, right=1, closed="both")
self._data, self._flags = _assignRegimeAnomaly(
data=self._data,
field=field,
......@@ -648,16 +667,21 @@ class DriftMixin:
method :
The linkage method for hierarchical (agglomerative) clustering of the variables.
metric : default absolute difference of means
metric :
A metric function for calculating the dissimilarity between 2 regimes.
Defaults to the difference in mean.
Defaults to the absolute difference in mean.
frac :
Has to be in [0,1]. Determines the minimum percentage of samples,
the "normal" group has to comprise to be the normal group actually.
The minimum percentage of samples, the "normal" group has to comprise to
actually be the normal group. Must be in the closed interval `[0,1]`,
otherwise a ValueError is raised.
"""
reserverd = ["set_cluster", "set_flags", "flag"]
kwargs = filterKwargs(kwargs, reserverd)
validateChoice(method, "method", LinkageString)
validateCallable(metric, "metric")
validateValueBounds(frac, "frac", left=0, right=1, closed="both")
self._data, self._flags = _assignRegimeAnomaly(
data=self._data,
field=field,
......@@ -675,7 +699,10 @@ class DriftMixin:
return self
def _driftFit(x, shift_target, cal_mean, driftModel):
def _driftFit(
x: pd.Series, shift_target: pd.Series, cal_mean: int, drift_model: callable
):
"""TODO: Docstring"""
x_index = x.index - x.index[0]
x_data = x_index.total_seconds().values
x_data = x_data / x_data[-1] if len(x_data) > 1 else x_data
......@@ -683,14 +710,14 @@ def _driftFit(x, shift_target, cal_mean, driftModel):
origin_mean = np.mean(y_data[:cal_mean])
target_mean = np.mean(y_data[-cal_mean:])
dataFitFunc = functools.partial(driftModel, origin=origin_mean, target=target_mean)
dataFitFunc = functools.partial(drift_model, origin=origin_mean, target=target_mean)
# if drift model has free parameters:
if len(inspect.getfullargspec(dataFitFunc).args) > 1:
try:
# try fitting free parameters
fit_paras, *_ = curve_fit(dataFitFunc, x_data, y_data)
data_fit = dataFitFunc(x_data, *fit_paras)
data_shift = driftModel(
data_shift = drift_model(
x_data, *fit_paras, origin=origin_mean, target=shift_target
)
except RuntimeError:
......@@ -700,7 +727,7 @@ def _driftFit(x, shift_target, cal_mean, driftModel):
# when there are no free parameters in the model:
else:
data_fit = dataFitFunc(x_data)
data_shift = driftModel(x_data, origin=origin_mean, target=shift_target)
data_shift = drift_model(x_data, origin=origin_mean, target=shift_target)
return data_fit, data_shift
......@@ -721,14 +748,15 @@ def _assignRegimeAnomaly(
flag: float = BAD,
**kwargs,
) -> Tuple[DictOfSeries, Flags]:
"""TODO: Docstring."""
series = data[cluster_field]
cluster = np.unique(series)
cluster_dios = DictOfSeries({str(i): data[field][series == i] for i in cluster})
plateaus = detectDeviants(cluster_dios, metric, spread, frac, method, "samples")
cluster_frame = DictOfSeries({str(i): data[field][series == i] for i in cluster})
plateaus = detectDeviants(cluster_frame, metric, spread, frac, method, "samples")
if set_flags:
for p, cols in zip(plateaus, cluster_dios.columns[plateaus]):
flags[cluster_dios[cols].index, field] = flag
for p, cols in zip(plateaus, cluster_frame.columns[plateaus]):
flags[cluster_frame[cols].index, field] = flag
if set_cluster:
for p in plateaus:
......
This diff is collapsed.
......@@ -17,6 +17,14 @@ from typing_extensions import Literal
from saqc import UNFLAGGED
from saqc.core import register
from saqc.core.history import History
from saqc.lib.checking import (
isValidChoice,
validateCallable,
validateChoice,
validateMinPeriods,
validateValueBounds,
validateWindow,
)
from saqc.lib.tools import isflagged
from saqc.lib.ts_operators import interpolateNANs, shift2Freq
......@@ -64,7 +72,7 @@ class InterpolationMixin:
def interpolateByRolling(
self: "SaQC",
field: str,
window: Union[str, int],
window: str | int,
func: Callable[[pd.Series], float] = np.median,
center: bool = True,
min_periods: int = 0,
......@@ -72,25 +80,33 @@ class InterpolationMixin:
**kwargs,
) -> "SaQC":
"""
Interpolates nan-values in the data by assigning them the aggregation result of the window surrounding them.
Replace NaN by the aggregation result of the surrounding window.
Parameters
----------
window :
The size of the window, the aggregation is computed from. An integer define the number of periods to be used,
an string is interpreted as an offset. ( see `pandas.rolling` for more information).
Integer windows may result in screwed aggregations if called on none-harmonized or irregular data.
The size of the window, the aggregation is computed from.
An integer define the number of periods to be used, a string
is interpreted as an offset. ( see `pandas.rolling` for more
information). Integer windows may result in screwed aggregations
if called on none-harmonized or irregular data.
func : default median
The function used for aggregation.
center :
Center the window around the value. Can only be used with integer windows, otherwise it is silently ignored.
Center the window around the value. Can only be used with
integer windows, otherwise it is silently ignored.
min_periods :
Minimum number of valid (not np.nan) values that have to be available in a window for its aggregation to be
Minimum number of valid (not np.nan) values that have to be
available in a window for its aggregation to be
computed.
"""
validateWindow(window)
validateCallable(func, "func")
validateMinPeriods(min_periods)
datcol = self._data[field]
roller = datcol.rolling(window=window, center=center, min_periods=min_periods)
try:
......@@ -109,7 +125,6 @@ class InterpolationMixin:
flagcol = pd.Series(np.nan, index=self._flags[field].index)
flagcol.loc[interpolated] = np.nan if flag is None else flag
# todo kwargs must have all passed args except data,field,flags
meta = {
"func": "interpolateByRolling",
"args": (field,),
......@@ -165,16 +180,19 @@ class InterpolationMixin:
* ‘from_derivatives’: Refers to scipy.interpolate.BPoly.from_derivatives
order :
Order of the interpolation method, ignored if not supported by the chosen ``method``
Order of the interpolation method, ignored if not supported
by the chosen ``method``
limit :
Maximum number of missing values to interpolate. Only gaps smaller than ``limit`` will be filled.
The gap size can be given as a number of values (integer) or a temporal extensions (offset string).
With ``None``, all missing values will be interpolated.
Maximum number of missing values to interpolate. Only gaps
smaller than ``limit`` will be filled. The gap size can be
given as a number of values (integer) or a temporal extensions
(offset string). With ``None``, all missing values will be
interpolated.
extrapolate :
Use parameter to perform extrapolation instead of interpolation onto the trailing and/or leading chunks of
NaN values in data series.
Use parameter to perform extrapolation instead of interpolation
onto the trailing and/or leading chunks of NaN values in data series.
* 'None' (default) - perform interpolation
* 'forward'/'backward' - perform forward/backward extrapolation
......@@ -205,7 +223,8 @@ class InterpolationMixin:
2000-01-01 11:00:00 NaN
2000-01-01 12:00:00 NaN
Use :py:meth:`~saqc.SaQC.interpolate` to do linear interpolation of up to 2 consecutive missing values:
Use :py:meth:`~saqc.SaQC.interpolate` to do linear interpolation
of up to 2 consecutive missing values:
.. doctest:: interpolate
......@@ -230,7 +249,8 @@ class InterpolationMixin:
<BLANKLINE>
Use :py:meth:`~saqc.SaQC.interpolate` to do linear extrapolaiton of up to 1 consecutive missing values:
Use :py:meth:`~saqc.SaQC.interpolate` to do linear extrapolaiton
of up to 1 consecutive missing values:
.. doctest:: interpolate
......@@ -254,15 +274,21 @@ class InterpolationMixin:
2000-01-01 12:00:00 NaN |
<BLANKLINE>
"""
if limit is not None:
validateWindow(limit, "limit")
validateValueBounds(order, "order", left=0, strict_int=True)
validateChoice(
extrapolate, "extrapolate", ["forward", "backward", "both", None]
)
if "freq" in kwargs:
# the old interpolate version
warnings.warn(
f"""
The method `intepolate` is deprecated and will be removed in version 3.0 of saqc.
To achieve the same behaviour please use:
`qc.align(field={field}, freq={kwargs["freq"]}, method={method}, order={order}, flag={flag})`
""",
f"The method `interpolate` is deprecated and will be removed "
f"in version 2.7 of saqc. To achieve the same behaviour "
f"please use: `qc.align(field={field}, freq={kwargs['freq']}, "
f"method={method}, order={order}, flag={flag})`",
DeprecationWarning,
)
return self.align(
......@@ -291,7 +317,6 @@ class InterpolationMixin:
self._flags.history[field].append(
new_col, {"func": "interpolateInvalid", "args": (), "kwargs": kwargs}
)
return self
@register(mask=["field"], demask=[], squeeze=[])
......@@ -306,8 +331,8 @@ class InterpolationMixin:
**kwargs,
) -> "SaQC":
"""
Convert time series to specified frequency. Values affected by frequency
changes will be inteprolated using the given method.
Convert time series to specified frequency. Values affected by
frequency changes will be inteprolated using the given method.
Parameters
----------
......@@ -317,25 +342,36 @@ class InterpolationMixin:
method :
Interpolation technique to use. One of:
* ``'nshift'``: shift grid points to the nearest time stamp in the range = +/- 0.5 * ``freq``
* ``'bshift'``: shift grid points to the first succeeding time stamp (if any)
* ``'fshift'``: shift grid points to the last preceeding time stamp (if any)
* ``'linear'``: Ignore the index and treat the values as equally spaced.
* ``'time'``, ``'index'``, 'values': Use the actual numerical values of the index.
* ``'nshift'``: shift grid points to the nearest time stamp
in the range = +/- 0.5 * ``freq``
* ``'bshift'``: shift grid points to the first succeeding
time stamp (if any)
* ``'fshift'``: shift grid points to the last preceeding time
stamp (if any)
* ``'linear'``: Ignore the index and treat the values as equally
spaced.
* ``'time'``, ``'index'``, 'values': Use the actual numerical
values of the index.
* ``'pad'``: Fill in NaNs using existing values.
* ``'nearest'``, ``'zero'``, ``'slinear'``, ``'quadratic'``, ``'cubic'``, ``'spline'``, ``'barycentric'``, ``'polynomial'``:
Passed to ``scipy.interpolate.interp1d``. These methods use the numerical values of the index. Both ``'polynomial'`` and
``'spline'`` require that you also specify an ``order``, e.g. ``qc.interpolate(method='polynomial', order=5)``.
* ``'spline'``, ``'polynomial'``:
Passed to ``scipy.interpolate.interp1d``. These methods
use the numerical values of the index. An ``order`` must be
specified, e.g. ``qc.interpolate(method='polynomial', order=5)``.
* ``'nearest'``, ``'zero'``, ``'slinear'``, ``'quadratic'``, ``'cubic'``, ``'barycentric'``:
Passed to ``scipy.interpolate.interp1d``. These methods use
the numerical values of the index.
* ``'krogh'``, ``'spline'``, ``'pchip'``, ``'akima'``, ``'cubicspline'``:
Wrappers around the SciPy interpolation methods of similar names.
Wrappers around the SciPy interpolation methods of similar
names.
* ``'from_derivatives'``: Refers to ``scipy.interpolate.BPoly.from_derivatives``
order :
Order of the interpolation method, ignored if not supported by the chosen ``method``
Order of the interpolation method, ignored if not supported
by the chosen ``method``
extrapolate :
Use parameter to perform extrapolation instead of interpolation onto the trailing and/or leading chunks of
NaN values in data series.
Use parameter to perform extrapolation instead of interpolation
onto the trailing and/or leading chunks of NaN values in data series.
* ``None`` (default) - perform interpolation
* ``'forward'``/``'backward'`` - perform forward/backward extrapolation
......@@ -348,6 +384,12 @@ class InterpolationMixin:
# TODO:
# - should we keep `extrapolate`
validateWindow(freq, "freq", allow_int=False)
validateValueBounds(order, "order", left=0, strict_int=True)
validateChoice(
extrapolate, "extrapolate", ["forward", "backward", "both", None]
)
if self._data[field].empty:
return self
......@@ -377,16 +419,15 @@ class InterpolationMixin:
**kwargs,
},
}
flagcol = pd.Series(UNFLAGGED if overwrite else np.nan, index=history.index)
history.append(flagcol, meta)
self._data[field] = datacol
self._flags.history[field] = history
return self
# ============================================================
### Deprecated functions
# ============================================================
@register(mask=["field"], demask=[], squeeze=[])
def interpolateIndex(
......@@ -400,10 +441,11 @@ class InterpolationMixin:
**kwargs,
) -> "SaQC":
"""
Function to interpolate the data at regular (äquidistant) timestamps (or Grid points).
Function to interpolate the data at regular (equidistant)
timestamps also known as or grid points.
.. deprecated:: 2.4.0
Use :py:meth:`~saqc.SaQC.align` instead.
.. deprecated:: 2.4.0
Use :py:meth:`~saqc.SaQC.align` instead.
Parameters
----------
......@@ -415,33 +457,41 @@ class InterpolationMixin:
The interpolation method you want to apply.
order :
If your selected interpolation method can be performed at different 'orders' - here you pass the desired
order.
If your selected interpolation method can be performed at
different 'orders' - here you pass the desired order.
limit :
Upper limit of missing index values (with respect to ``freq``) to fill. The limit can either be expressed
as the number of consecutive missing values (integer) or temporal extension of the gaps to be filled
(Offset String).
If ``None`` is passed, no limit is set.
Upper limit of missing index values (with respect to ``freq``)
to fill. The limit can either be expressed as the number of
consecutive missing values (integer) or temporal extension
of the gaps to be filled (Offset String). If ``None`` is passed,
no limit is set.
extraplate :
Use parameter to perform extrapolation instead of interpolation onto the trailing and/or leading chunks of
NaN values in data series.
extrapolate :
Use parameter to perform extrapolation instead of interpolation
onto the trailing and/or leading chunks of NaN values in data
series.
* ``None`` (default) - perform interpolation
* ``'forward'``/``'backward'`` - perform forward/backward extrapolation
* ``'both'`` - perform forward and backward extrapolation
"""
msg = """
The method `interpolateIndex` is deprecated and will be removed in verion 3.0 of saqc.
To achieve the same behavior use:
"""
call = "qc.align(field={field}, freq={freq}, method={method}, order={order}, extrapolate={extrapolate})"
call = (
f'qc.align(field="{field}", freq="{freq}", method="{method}", '
f'order={order}, extrapolate="{extrapolate}")'
)
if limit != 2:
call = f"{call}.interpolate(field={field}, method={method}, order={order}, limit={limit}, extrapolate={extrapolate})"
call = (
f'qc.interpolate(field="{field}", method="{method}", '
f'order="{order}", limit="{limit}", extrapolate="{extrapolate}")'
)
warnings.warn(
f"The method interpolateIndex is deprectated and will be removed with SaQC==3.0. Use `{call}` instead",
DeprecationWarning,
)
# HINT: checking is delegated to called functions
warnings.warn(f"{msg}`{call}`", DeprecationWarning)
out = self.align(
field=field,
freq=freq,
......@@ -482,16 +532,14 @@ class InterpolationMixin:
Use :py:meth:`~saqc.SaQC.interpolate` instead.
"""
warnings.warn(
f"""
The method `intepolateInvalid` is deprecated and will be removed
with version 3.0 of saqc. To achieve the same behavior, please use
`qc.interpolate(
field={field}, method={method}, order={order},
limit={limit}, extrapolate={extrapolate}, flag={flag}
)`
"""
"The method `intepolateInvalid` is deprecated and will be removed "
"with version 2.7 of saqc. To achieve the same behavior, please "
f"use `qc.interpolate(field={field}, method={method}, order={order}, "
f"limit={limit}, extrapolate={extrapolate}, flag={flag})`",
DeprecationWarning,
)
# HINT: checking is delegated to called function
return self.interpolate(
field=field,
method=method,
......@@ -526,19 +574,14 @@ def _shift(
* 'nshift' : shift grid points to the nearest time stamp in the range = +/- 0.5 * ``freq``
* 'bshift' : shift grid points to the first succeeding time stamp (if any)
* 'fshift' : shift grid points to the last preceeding time stamp (if any)
freq_check :
* ``None`` : do not validate the ``freq`` string.
* 'check' : check ``freq`` against an frequency estimation, produces a warning in case of miss matches.
* 'auto' : estimate frequency, `freq` is ignored.
* 'fshift' : shift grid points to the last preceding time stamp (if any)
Returns
-------
saqc.SaQC
"""
# TODO
# - Do we need `freq_check`? If so could we move it to `align`?
validateChoice(method, "method", ["fshift", "bshift", "nshift"])
validateWindow(freq, "freq", allow_int=False)
datcol = saqc._data[field]
if datcol.empty:
......@@ -567,8 +610,15 @@ def _interpolate(
method: str,
order: int | None,
dfilter: float,
extrapolate: Literal["forward", "backward", "both"] | None = None,
extrapolate: Literal["forward", "backward", "both", None] = None,
) -> Tuple[pd.Series, History]:
"""TODO: Docstring"""
validateChoice(extrapolate, "extrapolate", ["forward", "backward", "both", None])
validateWindow(freq, "freq", allow_int=False)
if order is not None:
validateValueBounds(order, "order", 0, strict_int=True)
datcol = saqc._data[field].copy()
start, end = datcol.index[0].floor(freq), datcol.index[-1].ceil(freq)
......
......@@ -8,27 +8,37 @@
from __future__ import annotations
import operator
from typing import TYPE_CHECKING, Callable
import warnings
from typing import TYPE_CHECKING, Callable, Literal
import numpy as np
import pandas as pd
from scipy.stats import median_abs_deviation
from saqc.constants import BAD
from saqc.core.register import flagging
from saqc.lib.checking import (
isCallable,
validateChoice,
validateMinPeriods,
validateWindow,
)
from saqc.lib.tools import isunflagged, statPass
STATS_DICT = {"std": np.std, "var": np.var, "mad": median_abs_deviation}
if TYPE_CHECKING:
from saqc import SaQC
class NoiseMixin:
@flagging()
def flagByStatLowPass(
self: "SaQC",
field: str,
func: Callable[[np.ndarray, pd.Series], float],
window: str | pd.Timedelta,
thresh: float,
func: Literal["std", "var", "mad"]
| Callable[[np.ndarray, pd.Series], float] = "std",
sub_window: str | pd.Timedelta | None = None,
sub_thresh: float | None = None,
min_periods: int | None = None,
......@@ -36,7 +46,9 @@ class NoiseMixin:
**kwargs,
) -> "SaQC":
"""
Flag data chunks of length ``window``, if:
Flag data chunks of length ``window`` dependent on the data deviation.
Flag data chunks of length ``window`` if
1. they excexceed ``thresh`` with regard to ``func`` and
2. all (maybe overlapping) sub-chunks of the data chunks with length ``sub_window``,
......@@ -45,7 +57,11 @@ class NoiseMixin:
Parameters
----------
func :
Aggregation function applied on every chunk.
Either a String value, determining the aggregation function applied on every chunk.
* 'std': standard deviation
* 'var': variance
* 'mad': median absolute deviation
Or a Callable function mapping 1 dimensional arraylikes onto scalars.
window :
Window (i.e. chunk) size.
......@@ -64,26 +80,95 @@ class NoiseMixin:
Minimum number of values needed in a chunk to perfom the test.
Ignored if ``window`` is an integer.
"""
warnings.warn(
"function 'flagByStatLowPass' is deprecated and will be removed in a future release, "
"use 'flagByScatterLowpass' instead.",
DeprecationWarning,
)
return self.flagByScatterLowpass(
field=field,
window=window,
thresh=thresh,
func=func,
sub_window=sub_window,
sub_thresh=sub_thresh,
min_periods=min_periods,
flag=flag,
)
@flagging()
def flagByScatterLowpass(
self: "SaQC",
field: str,
window: str | pd.Timedelta,
thresh: float,
func: Literal["std", "var", "mad"]
| Callable[[np.ndarray, pd.Series], float] = "std",
sub_window: str | pd.Timedelta | None = None,
sub_thresh: float | None = None,
min_periods: int | None = None,
flag: float = BAD,
**kwargs,
) -> "SaQC":
"""
Flag data chunks of length ``window`` dependent on the data deviation.
Flag data chunks of length ``window`` if
1. they excexceed ``thresh`` with regard to ``func`` and
2. all (maybe overlapping) sub-chunks of the data chunks with length ``sub_window``,
exceed ``sub_thresh`` with regard to ``func``
Parameters
----------
func :
Either a string, determining the aggregation function applied on every chunk
* 'std': standard deviation
* 'var': variance
* 'mad': median absolute deviation
Or a Callable, mapping 1 dimensional array likes onto scalars.
window :
Window (i.e. chunk) size.
datcol = self._data[field]
if not min_periods:
min_periods = 0
if not sub_thresh:
sub_thresh = thresh
window = pd.Timedelta(window)
thresh :
Threshold. A given chunk is flagged, if the return value of ``func`` excceeds ``thresh``.
sub_window :
Window size of sub chunks, that are additionally tested for exceeding ``sub_thresh``
with respect to ``func``.
sub_thresh :
Threshold. A given sub chunk is flagged, if the return value of ``func` excceeds ``sub_thresh``.
min_periods :
Minimum number of values needed in a chunk to perfom the test.
Ignored if ``window`` is an integer.
"""
if (not isCallable(func)) and (func not in ["std", "var", "mad"]):
raise TypeError(
f"Parameter 'func' must either be of type 'Callable' or one out of ['std', 'var', 'mad']. Got {func}."
)
validateWindow(window, allow_int=False)
validateMinPeriods(min_periods)
if sub_window is not None:
validateWindow(sub_window, "sub_window", allow_int=False)
sub_window = pd.Timedelta(sub_window)
if not isCallable(func):
func = STATS_DICT[func]
to_set = statPass(
datcol,
func,
window,
thresh,
operator.gt,
sub_window,
sub_thresh,
min_periods,
datcol=self._data[field],
stat=func,
winsz=pd.Timedelta(window),
thresh=thresh,
comparator=operator.gt,
sub_winsz=sub_window,
sub_thresh=sub_thresh or thresh,
min_periods=min_periods or 0,
)
mask = isunflagged(self._flags[field], kwargs["dfilter"]) & to_set
self._flags[mask, field] = flag
......