diff --git a/README.md b/README.md index f73e9422dc338d02203f0b50cf1e07c95fb123d0..596306f74b97fa0dd2675a120bf3eefefd85b92a 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ SM2 ; shift(freq="15Min") 'SM(1|2)+' ; flagMissing() SM1 ; flagRange(min=10, max=60) SM2 ; flagRange(min=10, max=40) -SM2 ; flagMAD(window="30d", z=3.5) +SM2 ; flagZScore(window="30d", thresh=3.5, method='modified', center=False) Dummy ; flagGeneric(field=["SM1", "SM2"], func=(isflagged(x) | isflagged(y))) ``` @@ -98,7 +98,7 @@ saqc = (saqc .flagMissing("SM(1|2)+", regex=True) .flagRange("SM1", min=10, max=60) .flagRange("SM2", min=10, max=40) - .flagMAD("SM2", window="30d", z=3.5) + .flagZScore("SM2", window="30d", thresh=3.5, method='modified', center=False) .flagGeneric(field=["SM1", "SM2"], target="Dummy", func=lambda x, y: (isflagged(x) | isflagged(y)))) ``` diff --git a/docs/resources/data/config.csv b/docs/resources/data/config.csv index 77914d8035af74daa7e2669378c1ebfd6bfa8819..e1220b67326a3ed7c180f88afeca089bacddea55 100644 --- a/docs/resources/data/config.csv +++ b/docs/resources/data/config.csv @@ -3,4 +3,4 @@ varname ; test SM2 ; align(freq="15Min", method="nshift") SM2 ; flagMissing() 'SM(1|2)+' ; flagRange(min=10, max=60) -SM2 ; flagMAD(window="30d", z=3.5) +SM2 ; flagZScore(window="30d", thresh=3.5, method='modified', center=False) diff --git a/docs/resources/data/config_ci.csv b/docs/resources/data/config_ci.csv index 11e366b59c66d2859e31ece42454ddc89edad620..0b55296f828f9c8d08ad0bd653f50b4c78bc0fb5 100644 --- a/docs/resources/data/config_ci.csv +++ b/docs/resources/data/config_ci.csv @@ -3,5 +3,5 @@ SM2;align(freq="15Min", method="nshift");False '.*';flagRange(min=10, max=60);False SM2;flagMissing();False SM2;flagRange(min=10, max=60);False -SM2;flagMAD(window="30d", z=3.5);False +SM2;flagZScore(window="30d", thresh=3.5, method='modified', center=False);False Dummy;flag(func=(isflagged(SM1) | isflagged(SM2))) diff --git a/docs/resources/data/myconfig.csv b/docs/resources/data/myconfig.csv index ea8d6c09063b95789adb7d2244ad7ff939222f00..cb5092a185eb21196bdf3fa2a47991350c88c169 100644 --- a/docs/resources/data/myconfig.csv +++ b/docs/resources/data/myconfig.csv @@ -1,5 +1,5 @@ varname;test #------;-------------------------- SM2 ;flagRange(min=10, max=60) -SM2 ;flagMAD(window="30d", z=3.5) +SM2 ;flagZScore(window="30d", thresh=3.5, method="modified", center=False) SM2 ;plot() \ No newline at end of file diff --git a/docs/resources/data/myconfig2.csv b/docs/resources/data/myconfig2.csv index 26f29d7d928a97629c5abe38080f88e303bc38c7..8628fc34588cd9dd7161a97ab30296bc3b2f8ed8 100644 --- a/docs/resources/data/myconfig2.csv +++ b/docs/resources/data/myconfig2.csv @@ -1,5 +1,5 @@ varname;test #------;-------------------------- SM2 ;flagRange(min=-20, max=60) -SM2 ;flagMAD(window="30d", z=3.5) +SM2 ;flagZScore(window="30d", thresh=3.5, method='modified', center=False) SM2 ;plot() \ No newline at end of file diff --git a/docs/resources/data/myconfig4.csv b/docs/resources/data/myconfig4.csv index 5977ac246ae962366e9c8fbfa87b0bcd13ad840c..311c2ee20c96a10ae869bbcf0c276956cfc04d31 100644 --- a/docs/resources/data/myconfig4.csv +++ b/docs/resources/data/myconfig4.csv @@ -2,8 +2,8 @@ varname;test #------;-------------------------- SM1;flagRange(min=10, max=60) SM2;flagRange(min=10, max=60) -SM1;flagMAD(window="15d", z=3.5) -SM2;flagMAD(window="30d", z=3.5) +SM1;flagZScore(window="15d", thresh=3.5, method='modified') +SM2;flagZScore(window="30d", thresh=3.5, method='modified') SM1;plot(path='../resources/temp/SM1processingResults') SM2;plot(path='../resources/temp/SM2processingResults') diff --git a/docs/resources/images/ZscorePopulation.png b/docs/resources/images/ZscorePopulation.png new file mode 100644 index 0000000000000000000000000000000000000000..2caca34053f41b0b2dae43d887b3649c90f4b53a Binary files /dev/null and b/docs/resources/images/ZscorePopulation.png differ diff --git a/docs/resources/images/ZscorePopulation.png.license b/docs/resources/images/ZscorePopulation.png.license new file mode 100644 index 0000000000000000000000000000000000000000..f8c6bf8cd36fb9a9a0a0dd474407f40908bf5d1f --- /dev/null +++ b/docs/resources/images/ZscorePopulation.png.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ + +SPDX-License-Identifier: GPL-3.0-or-later \ No newline at end of file diff --git a/saqc/funcs/outliers.py b/saqc/funcs/outliers.py index 86b3a2c55bdd0e3a3a4f7784648bd883d6c5dc0c..9be06ce2d1fba8a1a051de380a525ffea14962e0 100644 --- a/saqc/funcs/outliers.py +++ b/saqc/funcs/outliers.py @@ -24,6 +24,7 @@ from saqc import BAD, UNFLAGGED from saqc.core import DictOfSeries, Flags, flagging, register from saqc.funcs.scores import _univarScoring from saqc.lib.docs import DOC_TEMPLATES +from saqc.lib.rolling import windowRoller from saqc.lib.tools import getFreqDelta, isflagged, toSequence if TYPE_CHECKING: @@ -852,6 +853,13 @@ class OutliersMixin: ---------- [1] https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm """ + msg = """ + The method `flagMAD` is deprecated and will be removed in verion 3.0 of saqc. + To achieve the same behavior use: + """ + call = f"qc.flagZScore(field={field}, window={window}, method='modified', thresh={z}, min_residuals={min_residuals}, min_periods={min_periods}, center={center})" + + warnings.warn(f"{msg}`{call}`", DeprecationWarning) self = self.flagZScore( field, @@ -1226,68 +1234,53 @@ class OutliersMixin: ---------- [1] https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm """ - warnings.warn( - "The method `flagCrossStatistics` will be deprecated in a future version of saqc", - PendingDeprecationWarning, + msg = """ + The method `flagCrossStatistics` is deprecated and will be removed in verion 3.0 of saqc. + To achieve the same behavior use: + """ + new_method_string = { + "modZscore": "modified", + "Zscore": "standard", + np.mean: "standard", + np.median: "modified", + } + call = f"qc.flagZScore(field={field}, window=1, method={new_method_string[method]}, thresh={thresh}, axis=1)" + + warnings.warn(f"{msg}`{call}`", DeprecationWarning) + + return self.flagZScore( + field={field}, + window=1, + method={new_method_string[method]}, + thresh={thresh}, + axis=1, + flag=flag, ) - fields = toSequence(field) - - df = self._data[fields].to_pandas(how="inner") - - if isinstance(method, str): - if method == "modZscore": - MAD_series = df.subtract(df.median(axis=1), axis=0).abs().median(axis=1) - diff_scores = ( - (0.6745 * (df.subtract(df.median(axis=1), axis=0))) - .divide(MAD_series, axis=0) - .abs() - ) - - elif method == "Zscore": - diff_scores = ( - df.subtract(df.mean(axis=1), axis=0) - .divide(df.std(axis=1), axis=0) - .abs() - ) - - else: - raise ValueError(method) - - else: - try: - stat = getattr(df, method.__name__)(axis=1) - except AttributeError: - stat = df.aggregate(method, axis=1) - - diff_scores = df.subtract(stat, axis=0).abs() - - mask = diff_scores > thresh - if not mask.empty: - for f in fields: - m = mask[f].reindex(index=self._flags[f].index, fill_value=False) - self._flags[m, f] = flag - - return self - - @flagging() + @register( + mask=["field"], + demask=["field"], + squeeze=["field"], + multivariate=True, + docstring={"field": DOC_TEMPLATES["field"]}, + ) def flagZScore( self: "SaQC", - field: str, + field: Sequence[str], + method: Literal["standard", "modified"] = "standard", window: str | int | None = None, thresh: float = 3, min_residuals: int | None = None, min_periods: int | None = None, - model_func: Callable[[np.ndarray | pd.Series], float] = np.nanmean, - norm_func: Callable[[np.ndarray | pd.Series], float] = np.nanstd, center: bool = True, + axis: int = 0, flag: float = BAD, **kwargs, ) -> "SaQC": """ Flag data where its (rolling) Zscore exceeds a threshold. - The function implements flagging derived from a basic Zscore calculation. To handle non + The function implements flagging derived from standard or modified Zscore calculation. To handle non stationary data, the Zscoring can be applied with a rolling window. Therefor, the function allows for a minimum residual to be specified in order to mitigate overflagging in local regimes of low variance. @@ -1297,51 +1290,164 @@ class OutliersMixin: Parameters ---------- window : - Size of the window. Either determined via an Offset String, denoting the windows temporal + Size of the window. Either determined via an offset string, denoting the windows temporal extension or by an integer, denoting the windows number of periods. ``NaN`` also count as periods. If ``None`` is passed, all data points share the same scoring window, which than equals the whole data. + method : + Which method to use for ZScoring: + + * `"standard"`: standard Zscoring, using *mean* for the expectation and *standard deviation (std)* as scaling factor + * `"modified"`: modified Zscoring, using *median* as the expectation and *median absolute deviation (MAD)* as the scaling Factor + + See notes section for detailed scoring formula thresh : Cutoff level for the Zscores, above which associated points are marked as outliers. min_residuals : Minimum residual value points must have to be considered outliers. min_periods : Minimum number of valid meassurements in a scoring window, to consider the resulting score valid. - model_func : default mean - Function to calculate the center moment in every window. - norm_func : default std - Function to calculate the scaling for every window. center : Weather or not to center the target value in the scoring window. If ``False``, the target value is the last value in the window. + axis : + Along which axis to calculate the scoring statistics: + + * `0` (default) - calculate statistics along time axis + * `1` - calculate statistics over multiple variables + + See Notes section for a visual clarification of the workings + of `axis` and `window`. Notes ----- - Steps of calculation: - - 1. Consider a window :math:`W` of successive points :math:`W = x_{1},...x_{w}` - containing the value :math:`y_{K}` which is to be checked. - (The index of :math:`K` depends on the selection of the parameter :py:attr:`center`.) - 2. The "moment" :math:`M` for the window gets calculated via :math:`M=` :py:attr:`model_func` :math:`(W)`. - 3. The "scaling" :math:`N` for the window gets calculated via :math:`N=` :py:attr:`norm_func` :math:`(W)`. - 4. The "score" :math:`S` for the point :math:`x_{k}` gets calculated via :math:`S=(x_{k} - M) / N`. - 5. Finally, :math:`x_{k}` gets flagged, if :math:`|S| >` :py:attr:`thresh` and - :math:`|M - x_{k}| >=` :py:attr:`min_residuals`. + + The flag for :math:`x` is determined as follows: + + 1. Depending on ``window`` and ``axis``, the context population :math:`X` is collected (see pictures below) + + * If ``axis=0``, any value is flagged in the context of those values of the same variable (``field``), that are + in `window` range. + * If ``axis=1``, any value is flagged in the context of all values of all variables (``fields``), that are + in `window` range. + * If ``axis=0`` and ``window=1``, any value is flagged in the context of all values of all variables (``fields``), + that share the same timestamp. + + .. figure:: /resources/images/ZscorePopulation.png + :class: with-border + + + + + 2. Depending on ``method``, a score :math:`Z` is calculated for :math:`x` via :math:`Z = \\frac{|E(X) - X|}{S(X)}` + + * ``method="standard"``: :math:`E(X)=mean(X)`, :math:`S(X)=std(X)` + * ``method="modified"``: :math:`E(X)=median(X)`, :math:`S(X)=MAD(X)` + + 3. :math:`x` is flagged, if :math:`Z >` ``thresh`` """ - datser = self._data[field] + + if "norm_func" in kwargs or "model_func" in kwargs: + warnings.warn( + "Parameters norm_func and model_func are deprecated, use parameter method instead.\n" + 'To model with mean and scale with standard deviation, use method="standard".\n' + 'To model with median and scale with median absolute deviation (MAD) use method="modified".\n' + "Other/Custom model and scaling functions are not supported any more" + ) + if ( + "mean" in kwargs.get("model_func", "").__name__ + or "std" in kwargs.get("norm_func", "").__name__ + ): + method = "standard" + elif ( + "median" in kwargs.get("model_func", lambda x: x).__name__ + or "median" in kwargs.get("norm_func", lambda x: x).__name__ + ): + method = "modified" + else: + raise ValueError( + "Support for scoring with functions not similar to either Zscore or modified Zscore is " + "not supported anymore" + ) + + dat = self._data[field].to_pandas(how="outer") + if min_residuals is None: min_residuals = 0 - score, model, _ = _univarScoring( - datser, - window=window, - norm_func=norm_func, - model_func=model_func, - center=center, - min_periods=min_periods, - ) - to_flag = (score.abs() > thresh) & ((model - datser).abs() >= min_residuals) - self._flags[to_flag, field] = flag + if dat.empty: + return self + + if min_periods is None: + min_periods = 0 + + if window is None: + if dat.notna().sum().sum() >= min_periods: + if method == "standard": + mod = pd.DataFrame( + {f: dat[f].mean() for f in dat.columns}, index=dat.index + ) + norm = pd.DataFrame( + {f: dat[f].std() for f in dat.columns}, index=dat.index + ) + + else: + mod = pd.DataFrame( + {f: dat[f].median() for f in dat.columns}, index=dat.index + ) + norm = pd.DataFrame( + {f: (dat[f] - mod[f]).abs().median() for f in dat.columns}, + index=dat.index, + ) + else: + return self + else: # window is not None + if axis == 0: + if method == "standard": + mod = dat.rolling( + window, center=center, min_periods=min_periods + ).mean() + norm = dat.rolling( + window, center=center, min_periods=min_periods + ).std() + else: + mod = dat.rolling( + window, center=center, min_periods=min_periods + ).median() + norm = ( + (mod - dat) + .abs() + .rolling(window, center=center, min_periods=min_periods) + .median() + ) + + else: # axis == 1: + if window == 1: + if method == "standard": + mod = dat.mean(axis=1) + norm = dat.std(axis=1) + else: # method == 'modified' + mod = dat.median(axis=1) + norm = (dat.subtract(mod, axis=0)).abs().median(axis=1) + else: # window > 1 + if method == "standard": + mod = windowRoller(dat, window, "mean", min_periods, center) + norm = windowRoller(dat, window, "std", min_periods, center) + else: # method == 'modified' + mod = windowRoller(dat, window, "median", min_periods, center) + norm = windowRoller( + dat.subtract(mod, axis=0).abs(), + window, + "median", + min_periods, + center, + ) + residuals = dat.subtract(mod, axis=0).abs() + score = residuals.divide(norm, axis=0) + + to_flag = (score.abs() > thresh) & (residuals >= min_residuals) + for f in field: + self._flags[to_flag[f], f] = flag return self diff --git a/saqc/lib/rolling.py b/saqc/lib/rolling.py index d8c7a4d7d400e003bfe54ff2f3ce2b179bd61343..6016d29d6f12b59fd29af060b5a5363ccb8df73b 100644 --- a/saqc/lib/rolling.py +++ b/saqc/lib/rolling.py @@ -6,10 +6,73 @@ from __future__ import annotations import functools import math +from typing import Literal import numpy as np import pandas as pd +from saqc.lib.tools import getFreqDelta + + +def windowRoller( + data: pd.DataFrame, + window, + func: Literal["mean", "median", "std", "var", "sum"], + min_periods: int = 0, + center=True, +): + """ + pandas-rolling style computation with 2 dimensional windows, ranging over all df columns. + * implements efficient 2d rolling in case of regular timestamps or integer defined window + * else: dispatches to not optimized (no-numba) version in case of irregular timestamp + """ + supportedFuncs = ["mean", "median", "std", "var", "sum"] + if func not in supportedFuncs: + raise ValueError(f'"func" has to be one of {supportedFuncs}. Got {func}.') + func_kwargs = {} + if func in ["std", "var"]: + func_kwargs.update({"ddof": 1}) + roll_func = getattr(np, "nan" + func) + regularFreq = getFreqDelta(data.index) + vals = data.values + if regularFreq is not None: + window = ( + int(pd.Timedelta(window) / pd.Timedelta(regularFreq)) + if isinstance(window, str) + else window + ) + ramp = np.empty(((window - 1), vals.shape[1])) + ramp.fill(np.nan) + vals = np.concatenate([ramp, vals]) + if center: + vals = np.roll(vals, axis=0, shift=-int(window / 2)) + + views = np.lib.stride_tricks.sliding_window_view( + vals, (window, vals.shape[1]) + ).squeeze() + result = roll_func(views, axis=(1, 2), **func_kwargs) + if min_periods > 0: + invalid_wins = (~np.isnan(views)).sum(axis=(1, 2)) < min_periods + result[invalid_wins] = np.nan + out = pd.Series(result, index=data.index, name="result") + else: # regularFreq is None + i_ser = pd.Series(range(data.shape[0]), index=data.index, name="result") + result = i_ser.rolling(window=window, center=center).apply( + raw=True, + func=lambda x: roll_func( + data.values[x.astype(int), :], axis=(0, 1), **func_kwargs + ), + ) + if min_periods > 0: + invalid_wins = ( + i_ser.rolling(window=window, center=center).apply( + lambda x: (~np.isnan(data.values[x.astype(int), :])).sum() + ) + ) < min_periods + result[invalid_wins] = np.nan + out = result + return out + def removeRollingRamps( data: pd.Series, diff --git a/tests/cli/test_integration.py b/tests/cli/test_integration.py index db64b427bcc2823ff0fe79e6ee8587e009b0cabc..c8694d1bd3cee152e6db937c187552d49a59d9ad 100644 --- a/tests/cli/test_integration.py +++ b/tests/cli/test_integration.py @@ -55,7 +55,7 @@ DMP = [ "2016-04-01 00:05:48,3573.0,NIL,,,32.685,NIL,,,nan,nan,nan,nan\n", "2016-04-01 00:15:00,nan,nan,nan,nan,nan,nan,nan,nan,29.3157,NIL,,\n", "2016-04-01 00:20:42,3572.0,NIL,,,32.7428,NIL,,,nan,nan,nan,nan\n", - '2016-04-01 00:30:00,nan,nan,nan,nan,nan,nan,nan,nan,29.3679,BAD,OTHER,"{""test"": ""flagMAD"", ""comment"": """"}"\n', + '2016-04-01 00:30:00,nan,nan,nan,nan,nan,nan,nan,nan,29.3679,BAD,OTHER,"{""test"": ""flagZScore"", ""comment"": """"}"\n', "2016-04-01 00:35:37,3572.0,NIL,,,32.6186,NIL,,,nan,nan,nan,nan\n", "2016-04-01 00:45:00,nan,nan,nan,nan,nan,nan,nan,nan,29.3679,NIL,,\n", ] diff --git a/tests/funcs/test_outlier_detection.py b/tests/funcs/test_outlier_detection.py index 80cdfd0451a95f974b1244f51840d84e78667842..0541525865c734382a3abddd27a481e0259c4543 100644 --- a/tests/funcs/test_outlier_detection.py +++ b/tests/funcs/test_outlier_detection.py @@ -32,7 +32,9 @@ def test_flagMad(spiky_data): data = spiky_data[0] field, *_ = data.columns flags = initFlagsLike(data) - qc = SaQC(data, flags).flagMAD(field, "1H", flag=BAD) + qc = SaQC(data, flags).flagZScore( + field, window="1H", method="modified", thresh=3.5, flag=BAD + ) flag_result = qc.flags[field] test_sum = (flag_result[spiky_data[1]] == BAD).sum() assert test_sum == len(spiky_data[1]) @@ -127,34 +129,35 @@ def test_grubbs(dat): @pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_2")]) -def test_flagCrossStatistics(dat): - data1, characteristics = dat(initial_level=0, final_level=0, out_val=0) - data2, characteristics = dat(initial_level=0, final_level=0, out_val=10) - fields = ["field1", "field2"] - s1, s2 = data1["data"], data2["data"] - s1 = pd.Series(data=s1.values, index=s1.index) - s2 = pd.Series(data=s2.values, index=s1.index) - data = DictOfSeries(field1=s1, field2=s2) +@pytest.mark.parametrize( + "parameters", + [("standard", 1), ("modified", 1), ("modified", 3), ("standard", "3h")], +) +def test_flagCrossStatistics(dat, parameters): + fields = [f"data{i}" for i in range(6)] + data = pd.DataFrame( + 0, columns=fields, index=pd.date_range("2000", freq="1h", periods=10) + ) + bad_idx = (np.random.randint(0, 10), np.random.randint(0, 6)) + data.iloc[bad_idx[0], bad_idx[1]] = 10 flags = initFlagsLike(data) + qc = SaQC(data, flags).flagZScore( + fields, thresh=2, method=parameters[0], flag=BAD, axis=1, window=parameters[1] + ) - with pytest.deprecated_call(): - qc = SaQC(data, flags).flagCrossStatistics( - fields, thresh=3, method=np.mean, flag=BAD - ) - for field in fields: - isflagged = qc.flags[field] > UNFLAGGED - assert isflagged[characteristics["raise"]].all() + isflagged = qc.flags.to_pandas() > UNFLAGGED + assert isflagged.iloc[bad_idx[0], bad_idx[1]] + assert isflagged.sum().sum() == 1 -def test_flagZScores(): +def test_flagZScoresUV(): np.random.seed(seed=1) - data = pd.Series( - [np.random.normal() for k in range(100)], + data = pd.DataFrame( + {"data": [np.random.normal() for k in range(100)]}, index=pd.date_range("2000", freq="1D", periods=100), - name="data", ) - data.iloc[[5, 80]] = 5 - data.iloc[[40]] = -6 + data.iloc[[5, 80], 0] = 5 + data.iloc[[40], 0] = -6 qc = saqc.SaQC(data) qc = qc.flagZScore("data", window=None) @@ -176,6 +179,39 @@ def test_flagZScores(): assert (qc.flags.to_pandas().iloc[[40, 80], 0] > 0).all() +def test_flagZScoresMV(): + np.random.seed(seed=1) + data = pd.DataFrame( + { + "data": [np.random.normal() for k in range(100)], + "data2": [np.random.normal() for k in range(100)], + }, + index=pd.date_range("2000", freq="1D", periods=100), + ) + data.iloc[[5, 80], 0] = 5 + data.iloc[[40], 0] = -6 + data.iloc[[60], 1] = 10 + qc = saqc.SaQC(data) + qc = qc.flagZScore(["data", "data2"], window=None) + assert (qc.flags.to_pandas().iloc[[5, 40, 80], 0] > 0).all() + assert (qc.flags.to_pandas().iloc[[60], 1] > 0).all() + + qc = saqc.SaQC(data) + qc = qc.flagZScore("data", window=None, min_residuals=10) + + assert (qc.flags.to_pandas()[["data", "data2"]] < 0).all().all() + + qc = saqc.SaQC(data) + qc = qc.flagZScore(["data", "data2"], window="20D") + + assert (qc.flags.to_pandas().iloc[[40, 80], 0] > 0).all() + + qc = saqc.SaQC(data) + qc = qc.flagZScore("data", window=20) + + assert (qc.flags.to_pandas().iloc[[40, 80], 0] > 0).all() + + @pytest.mark.parametrize("n", [1, 10]) @pytest.mark.parametrize("p", [1, 2]) @pytest.mark.parametrize("thresh", ["auto", 2])