Skip to content
Snippets Groups Projects
outliers.py 63.52 KiB
#! /usr/bin/env python

# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later

# -*- coding: utf-8 -*-

from __future__ import annotations

import uuid
import warnings
from typing import TYPE_CHECKING, Callable, Sequence, Tuple

import numpy as np
import numpy.polynomial.polynomial as poly
import pandas as pd
from outliers import smirnov_grubbs  # noqa, on pypi as outlier-utils
from scipy.stats import median_abs_deviation
from typing_extensions import Literal

from saqc import BAD, UNFLAGGED
from saqc.core import DictOfSeries, Flags, flagging, register
from saqc.lib.checking import (
    isCallable,
    isFloatLike,
    validateChoice,
    validateFraction,
    validateFrequency,
    validateMinPeriods,
    validateValueBounds,
    validateWindow,
)
from saqc.lib.docs import DOC_TEMPLATES
from saqc.lib.rolling import windowRoller
from saqc.lib.tools import getFreqDelta, isflagged, toSequence

if TYPE_CHECKING:
    from saqc import SaQC


class OutliersMixin:
    @staticmethod
    def _validateLOF(algorithm, n, p, density):
        """validate parameter for LOF and UniLOF"""
        validateValueBounds(n, "n", left=0, strict_int=True)
        validateValueBounds(p, "p", left=0, strict_int=True)
        validateChoice(
            algorithm, "algorithm", ["ball_tree", "kd_tree", "brute", "auto"]
        )
        if density != "auto" and not isFloatLike(density) and not isCallable(density):
            raise ValueError(
                f"'density' must be 'auto' or a float or a function, not {density}"
            )

    @register(
        mask=["field"],
        demask=["field"],
        squeeze=["field"],
        multivariate=True,
        handles_target=False,
    )
    def flagLOF(
        self: "SaQC",
        field: Sequence[str],
        n: int = 20,
        thresh: Literal["auto"] | float = 1.5,
        algorithm: Literal["ball_tree", "kd_tree", "brute", "auto"] = "ball_tree",
        p: int = 1,
        flag: float = BAD,
        **kwargs,
    ) -> "SaQC":
        """
        Flag values where the Local Outlier Factor (LOF) exceeds cutoff.

        Parameters
        ----------
        n :
            Number of neighbors to be included into the LOF calculation.
            Defaults to ``20``, which is a
            value found to be suitable in the literature.

            * :py:attr:`n` determines the "locality" of an observation
              (its :py:attr:`n` nearest neighbors) and sets the upper
              limit to the number of values in outlier clusters (i.e.
              consecutive outliers). Outlier clusters of size greater
              than :py:attr:`n`/2 may not be detected reliably.
            * The larger :py:attr:`n`, the lesser the algorithm's sensitivity
              to local outliers and small or singleton outliers points.
              Higher values greatly increase numerical costs.

        thresh :
            The threshold for flagging the calculated LOF. A LOF of around
            ``1`` is considered normal and most likely corresponds to
            inlier points.

            * The "automatic" threshing introduced with the publication
              of the algorithm defaults to ``1.5``.
            * In this implementation, :py:attr:`thresh` defaults (``'auto'``)
              to flagging the scores with a modified 3-sigma rule.

        algorithm :
            Algorithm used for calculating the :py:attr:`n`-nearest neighbors.

        p :
            Degree of the metric ("Minkowski"), according to which the
            distance to neighbors is determined. Most important values are:

            * ``1`` - Manhattan Metric
            * ``2`` - Euclidian Metric

        density :
            How to calculate the temporal distance/density for the variable to flag.

            * ``'auto'`` - introduces linear density with an increment
              equal to the median of the absolute diff of the variable to flag.
            * ``float`` - introduces linear density with an increment
              equal to :py:attr:`density`
            * Callable - calculates the density by applying the function
              passed onto the variable to flag (passed as Series).

        Notes
        -----
        * The :py:meth:`~saqc.SaQC.flagLOF` function calculates the Local
          Outlier Factor (LOF) for every point in the input timeseries.
          The *LOF* is a scalar value, that roughly correlates to the
          *reachability*, or "outlierishnes" of the evaluated datapoint.
          If a point is as reachable, as all its :py:attr:`n`-nearest
          neighbors, the *LOF* score evaluates to around ``1``. If it
          is only as half as reachable as all its ``n``-nearest neighbors
          are (so to say, as double as "outlierish"), the score is about
          ``2``. So, the Local Outlier *Factor* relates a point's *reachability*
          to the *reachability* of its :py:attr:`n`-nearest neighbors
          in a multiplicative fashion (as a "factor").
        * The *reachability* of a point thereby is determined as an aggregation
          of the points distances to its :py:attr:`n`-nearest neighbors,
          measured with regard to the minkowski metric of degree :py:attr:`p`
          (usually euclidean).
        * To derive a binary label for every point (outlier: *yes*, or *no*),
          the scores are cut off at a level, determined by :py:attr:`thresh`.

        """
        self._validateLOF(algorithm, n, p, 1.0)
        if thresh != "auto" and not isFloatLike(thresh):
            raise ValueError(f"'thresh' must be 'auto' or a float, not {thresh}")

        fields = toSequence(field)
        field_ = str(uuid.uuid4())
        qc = self.assignLOF(
            field=fields,
            target=field_,
            n=n,
            algorithm=algorithm,
            p=p,
        )
        s = qc.data[field_]
        if thresh == "auto":
            s = pd.concat([s, (-s - 2)])
            s_mask = (s - s.mean() / s.std())[: len(s) // 2].abs() > 3
        else:
            s_mask = s < abs(thresh)

        for f in fields:
            mask = ~isflagged(qc._flags[f], kwargs["dfilter"]) & s_mask
            qc._flags[mask, f] = flag

        return qc.dropField(field_)

    @flagging()
    def flagUniLOF(
        self: "SaQC",
        field: str,
        n: int = 20,
        thresh: Literal["auto"] | float = 1.5,
        algorithm: Literal["ball_tree", "kd_tree", "brute", "auto"] = "ball_tree",
        p: int = 1,
        density: Literal["auto"] | float = "auto",
        fill_na: bool = True,
        slope_correct: bool = True,
        min_offset: float = None,
        flag: float = BAD,
        **kwargs,
    ) -> "SaQC":
        """
        Flag "univariate" Local Outlier Factor (LOF) exceeding cutoff.

        The function is a wrapper around a usual LOF implementation, aiming
        for an easy to use, parameter minimal outlier detection function
        for single variables, that does not necessitate prior modelling
        of the variable. LOF is applied onto a concatenation of the `field`
        variable and a "temporal density", or "penalty" variable, that
        measures temporal distance between data points. See notes Section
        for a more exhaustive explaination. See the Notes section for
        more details on the algorithm.

        Parameters
        ----------
        n :
            Number of periods to be included into the LOF calculation.
            Defaults to `20`, which is a value found to be suitable in
            the literature.

            * :py:attr:`n` determines the "locality" of an observation
              (its :py:attr:`n` nearest neighbors) and sets the upper
              limit to the number of values in an outlier clusters (i.e.
              consecutive outliers). Outlier clusters of size greater
              than :py:attr:`n`/2 may not be detected reliably.
            * The larger :py:attr:`n`, the lesser the algorithm's sensitivity
              to local outliers and small or singleton outlier points.
              Higher values greatly increase numerical costs.

        thresh :
            The threshold for flagging the calculated LOF. A LOF of around
            ``1`` is considered normal and most likely corresponds to
            inlier points. This parameter is considered the main calibration
            parameter of the algorithm.

            * The threshing defaults to ``1.5``, wich is the default value
              found to be suitable in the literature.
            * ``'auto'`` enables flagging the scores with a modified 3-sigma
              rule, resulting in a thresh around ``4``, which usually
              greatly mitigates overflagging compared to the literature
              recommendation, but often is too high.
            * sensitive range for the parameter may be ``[1,15]``, assuming
              default settings for the other parameters.

        algorithm :
            Algorithm used for calculating the :py:attr:`n`-nearest neighbors
            needed for LOF calculation.

        p :
            Degree of the metric ("Minkowski"), according to which distance
            to neighbors is determined. Most important values are:

            * ``1`` - Manhatten Metric
            * ``2`` - Euclidian Metric

        density :
            How to calculate the temporal distance/density for the variable
            to flag.

            * ``'auto'`` - introduces linear density with an increment
              equal to the median of the absolute diff of the variable to flag.
            * ``float`` - introduces linear density with an increment
              equal to :py:attr:`density`

        fill_na :
            If True, NaNs in the data are filled with a linear interpolation.

        slope_correct :
            if True, a correction is applied, that removes outlier cluster that actually
            just seem to be steep slopes

        min_offset :
            If set, only those outlier cluster will be flagged, that are preceeded and succeeeded
            by sufficiently large value "jumps". Defaults to estimating the sufficient value jumps from
            the median over the absolute step sizes between data points.

        Notes
        -----

        * The :py:meth:`~saqc.SaQC.flagUniLOF` function calculates an
          univariate Local Outlier Factor (UniLOF) - score for every
          point in the one dimensional input data series. The *UniLOF*
          score of any data point is a scalar value, that roughly correlates
          to its *reachability*, or "outlierishnes" in the 2-dimensional
          space constituted by the data-values and the time axis. So
          the Algorithm basically operates on the "graph", or the "plot"
          of the input timeseries.

        * If a point in this "graph" is as reachable, as all its :py:attr:`n`-nearest
          neighbors, its *UniLOF* score evaluates to around ``1``. If
          it is only as half as reachable as all its :py:attr:`n` neighbors
          are (so to say, as double as "outlierish"), its score evaluates
          to ``2`` roughly. So, the Univariate Local Outlier *Factor*
          relates a points *reachability* to the *reachability* of its
          :py:attr:`n`-nearest neighbors in a multiplicative fashion
          (as a "factor").

        * The *reachability* of a point thereby is derived as an aggregation
          of the points distance to its :py:attr:`n`-nearest neighbors,
          measured with regard to the minkowski metric of degree :py:attr:`p`
          (usually euclidean).

        * The parameter :py:attr:`density` thereby determines how dimensionality
          of the time is removed, to make it a dimensionless, real valued
          coordinate.

        * To derive a binary label for every point (outlier: *yes*, or
          *no*), the scores are cut off at a level, determined by :py:attr:`thresh`.

        Examples
        --------

        See the :ref:`outlier detection cookbook
        <cookbooks/OutlierDetection:Outlier Detection>` for a detailed
        introduction into the usage and tuning of the function.

        .. plot::
           :context: reset
           :include-source: False

           import matplotlib
           import saqc
           import pandas as pd
           data = pd.read_csv('../resources/data/hydro_data.csv')
           data = data.set_index('Timestamp')
           data.index = pd.DatetimeIndex(data.index)
           qc = saqc.SaQC(data)

        Example usage with default parameter configuration:

        Loading data via pandas csv file parser, casting index to DateTime,
        generating a :py:class:`~saqc.SaQC` instance from the data and
        plotting the variable representing light scattering at 254 nanometers
        wavelength.

        .. doctest:: flagUniLOFExample

           >>> import saqc
           >>> data = pd.read_csv('./resources/data/hydro_data.csv')
           >>> data = data.set_index('Timestamp')
           >>> data.index = pd.DatetimeIndex(data.index)
           >>> qc = saqc.SaQC(data)
           >>> qc.plot('sac254_raw') # doctest: +SKIP

        .. plot::
           :context:
           :include-source: False
           :class: center

           qc.plot('sac254_raw')

        We apply :py:meth:`~saqc.SaqC.flagUniLOF` in with default parameter
        values. Meaning, that the main calibration paramters :py:attr:`n`
        and :py:attr:`thresh` evaluate to `20` and `1.5` respectively.

        .. doctest:: flagUniLOFExample

           >>> import saqc
           >>> qc = qc.flagUniLOF('sac254_raw')
           >>> qc.plot('sac254_raw') # doctest: +SKIP

        .. plot::
           :context: close-figs
           :include-source: False
           :class: center

           qc = qc.flagUniLOF('sac254_raw')
           qc.plot('sac254_raw')

        See also
        --------
        :ref:`introduction to outlier detection with saqc <cookbooks/OutlierDetection:Outlier Detection>`
        """
        self._validateLOF(algorithm, n, p, density)
        if thresh != "auto" and not isFloatLike(thresh):
            raise ValueError(f"'thresh' must be 'auto' or a float, not {thresh}")

        tmp_field = str(uuid.uuid4())
        qc = self.assignUniLOF(
            field=field,
            target=tmp_field,
            n=n,
            algorithm=algorithm,
            p=p,
            density=density,
            fill_na=fill_na,
        )
        s = qc.data[tmp_field]
        if thresh == "auto":
            _s = pd.concat([s, (-s - 2)])
            s_mask = ((_s - _s.mean()) / _s.std()).iloc[: int(s.shape[0])].abs() > 3
        else:
            s_mask = s < -abs(thresh)
        s_mask = ~isflagged(qc._flags[field], kwargs["dfilter"]) & s_mask

        if slope_correct:
            g_mask = s_mask.diff()
            g_mask = g_mask.cumsum()
            dat = self._data[field]
            od_groups = dat.interpolate("linear").groupby(by=g_mask)
            first_vals = od_groups.first()
            last_vals = od_groups.last()
            max_vals = od_groups.max()
            min_vals = od_groups.min()
            if min_offset is None:
                if density == "auto":
                    d_diff = dat.diff()
                    eps = d_diff.abs().median()
                    if eps == 0:
                        eps = d_diff[d_diff != 0].abs().median()
                else:
                    eps = density
                eps = 3 * eps
            else:
                eps = min_offset
            up_slopes = (min_vals + eps >= last_vals.shift(1)) & (
                max_vals - eps <= first_vals.shift(-1)
            )
            down_slopes = (max_vals - eps <= last_vals.shift(1)) & (
                min_vals + eps >= first_vals.shift(-1)
            )
            corrections = up_slopes | down_slopes
            for s_id in corrections[corrections].index:
                correct_idx = od_groups.get_group(s_id).index
                s_mask[correct_idx] = False

        qc._flags[s_mask, field] = flag
        qc = qc.dropField(tmp_field)
        return qc

    @flagging()
    def flagRange(
        self: "SaQC",
        field: str,
        min: float = -np.inf,
        max: float = np.inf,
        flag: float = BAD,
        **kwargs,
    ) -> "SaQC":
        """
        Function flags values exceeding the closed
        interval [:py:attr:`min`, :py:attr:`max`].

        Parameters
        ----------
        min :
            Lower bound for valid data.
        max :
            Upper bound for valid data.
        """
        # using .values is much faster
        datacol = self._data[field].to_numpy()
        mask = (datacol < min) | (datacol > max)
        self._flags[mask, field] = flag
        return self

    @flagging()
    def flagByStray(
        self: "SaQC",
        field: str,
        window: int | str | None = None,
        min_periods: int = 11,
        iter_start: float = 0.5,
        alpha: float = 0.05,
        flag: float = BAD,
        **kwargs,
    ) -> "SaQC":
        """
        Flag outliers in 1-dimensional (score) data using the STRAY Algorithm.

        For more details about the algorithm please refer to [1].

        Parameters
        ----------

        window :
            Determines the segmentation of the data into partitions, the
            kNN algorithm is applied onto individually.

            * ``None``: Apply Scoring on whole data set at once
            * ``int``: Apply scoring on successive data chunks of periods
              with the given length. Must be greater than 0.
            * offset String : Apply scoring on successive partitions of
              temporal extension matching the passed offset string

        min_periods :
            Minimum number of periods per partition that have to be present
            for a valid outlier detection to be made in this partition

        iter_start :
            Float in ``[0, 1]`` that determines which percentage of data
            is considered "normal". ``0.5`` results in the stray algorithm
            to search only the upper 50% of the scores for the cut off
            point. (See reference section for more information)

        alpha :
            Level of significance by which it is tested, if a score might
            be drawn from another distribution than the majority of the data.

        References
        ----------
        [1]  Priyanga Dilini Talagala, Rob J. Hyndman & Kate Smith-Miles (2021):
             Anomaly Detection in High-Dimensional Data,
             Journal of Computational and Graphical Statistics, 30:2, 360-374,
             DOI: 10.1080/10618600.2020.1807997
        """
        scores = self._data[field].dropna()

        if window is None:
            window = len(scores)
        if not isinstance(window, int):
            validateFrequency(window, "window")

        validateMinPeriods(min_periods)
        validateValueBounds(iter_start, "iter_start", left=0, right=1, closed="both")

        if scores.empty:
            return self

        if isinstance(window, int):
            s = pd.Series(data=np.arange(0, len(scores)), index=scores.index)
            s = s.transform(lambda x: int(np.floor(x / window)))
            partitions = scores.groupby(s)
        else:  # pd.Timedelta pd.DateOffset or str
            partitions = scores.groupby(pd.Grouper(freq=window))

        # calculate flags for every window
        for _, partition in partitions:
            sample_size = len(partition)

            if partition.empty or sample_size < min_periods:
                continue

            sorted_i = partition.values.argsort()
            resids = partition.values[sorted_i]
            gaps = np.append(0, np.diff(resids))

            tail_size = int(max(min(np.floor(sample_size / 4), 50), 2))
            tail_indices = np.arange(2, tail_size + 1)

            i_start = int(max(np.floor(sample_size * iter_start), 1) + 1)
            ghat = np.array([np.nan] * sample_size)

            for i in range(i_start - 1, sample_size):
                ghat[i] = sum(
                    (tail_indices / (tail_size - 1)) * gaps[i - tail_indices + 1]
                )

            log_alpha = np.log(1 / alpha)
            for iter_index in range(i_start - 1, sample_size):
                if gaps[iter_index] > log_alpha * ghat[iter_index]:
                    index = partition.index[sorted_i[iter_index:]]
                    self._flags[index, field] = flag
                    break

        return self

    @register(
        mask=["field"],
        demask=["field"],
        squeeze=["field"],
        multivariate=True,
        handles_target=False,
        docstring={"field": DOC_TEMPLATES["field"]},
    )
    def flagMVScores(
        self: "SaQC",
        field: Sequence[str],
        trafo: Callable[[pd.Series], pd.Series] = lambda x: x,
        alpha: float = 0.05,
        n: int = 10,
        func: Callable[[pd.Series], float] | str = "sum",
        iter_start: float = 0.5,
        window: int | str | None = None,
        min_periods: int = 11,
        stray_range: str | None = None,
        drop_flagged: bool = False,  # TODO: still a case ?
        thresh: float = 3.5,
        min_periods_r: int = 1,
        flag: float = BAD,
        **kwargs,
    ) -> "SaQC":
        """
        The algorithm implements a 3-step outlier detection procedure for
        simultaneously flagging of higher dimensional data (dimensions > 3).

        In [1], the procedure is introduced and exemplified with an application on
        hydrological data. See the notes section for an overview over the algorithms
        basic steps.

            .. deprecated:: 2.6.0
               Deprecated Function. Please refer to :py:meth:`~saqc.SaQC.flagByStray`.

        Parameters
        ----------
        trafo :
            Transformation to be applied onto every column before scoring. For more
            fine-grained control, the data could also be transformed before
            :py:meth:`~saqc.SaQC.flagMVScores` is called.

        alpha :
            Level of significance by which it is tested, if an observations score might
            be drawn from another distribution than the majority of the data.

        n :
            Number of neighbors included in the scoring process for every datapoint.

        func :
            Function that aggregates a value's k-smallest distances, returning a
            scalar score.

        iter_start :
            Value in ``[0,1]`` that determines which percentage of data is considered
            "normal". 0.5 results in the threshing algorithm to search only the upper
            50% of the scores for the cut-off point. (See reference section for more
            information)

        window :
            Only effective if :py:attr:`threshing` is set to ``'stray'``. Determines
            the size of the data partitions, the data is decomposed into. Each
            partition is checked seperately for outliers. Either given as an Offset
            String, denoting the windows temporal extension or as an integer,
            denoting the windows number of periods. ``NaN`` also count as periods. If
            ``None``, all data points share the same scoring window, which than
            equals the whole data.

        min_periods :
            Only effective if :py:attr:`threshing` is set to ``'stray'`` and
            :py:attr:`partition` is an integer. Minimum number of periods per
            :py:attr:`partition` that have to be present for a valid outlier
            detection to be made in this partition.

        stray_range :
            If not ``None``, it is tried to reduce the stray result onto single
            outlier components of the input :py:attr:`field`. The offset string
            denotes the range of the temporal surrounding to include into the MAD
            testing while trying to reduce flags.

        drop_flagged :
            Only effective when :py:attr:`stray_range` is not ``None``. Whether or
            not to drop flagged values from the temporal surroundings.

        thresh :
            Only effective when :py:attr:`stray_range` is not ``None``. The
            'critical' value, controlling wheather the MAD score is considered
            referring to an outlier or not. Higher values result in less rigid
            flagging. The default value is widely considered apropriate in the
            literature.

        min_periods_r :
            Only effective when :py:attr:`stray_range` is not ``None``. Minimum
            number of measurements necessary in an interval to actually perform the
            reduction step.

        Notes
        -----
        The basic steps are:

        1. transforming

        The different data columns are transformed via timeseries transformations to
        (a) make them comparable and
        (b) make outliers more stand out.

        This step is usually subject to a phase of research/try and error. See [1]
        for more details.

        Note, that the data transformation as a built-in step of the algorithm,
        will likely get deprecated in the future. It's better to transform the data in
        a processing step, preceeding the multivariate flagging process. Also,
        by doing so, one gets mutch more control and variety in the transformation
        applied, since the `trafo` parameter only allows for application of the same
        transformation to all the variables involved.

        2. scoring

        Every observation gets assigned a score depending on its k nearest neighbors.
        See the `scoring_method` parameter description for details on the different
        scoring methods. Furthermore, [1] may give some insight in the pro and cons of
        the different methods.

        3. threshing

        The gaps between the (greatest) scores are tested for beeing drawn from the
        same distribution as the majority of the scores. If a gap is encountered,
        that, with sufficient significance, can be said to not be drawn from the same
        distribution as the one all the smaller gaps are drawn from, than the
        observation belonging to this gap, and all the observations belonging to gaps
        larger than this gap, get flagged outliers. See description of the
        `threshing` parameter for more details. Although [1] gives a fully detailed
        overview over the `stray` algorithm.

        References
        ----------
        [1]  Priyanga Dilini Talagala, Rob J. Hyndman & Kate Smith-Miles (2021):
             Anomaly Detection in High-Dimensional Data,
             Journal of Computational and Graphical Statistics, 30:2, 360-374,
             DOI: 10.1080/10618600.2020.1807997
        """

        # parameter deprecations

        if "partition" in kwargs:
            warnings.warn(
                """
                The parameter `partition` is deprecated and will be removed in version
                2.7 of saqc. Please us the parameter `window` instead.
                """,
                DeprecationWarning,
            )
            window = kwargs["partition"]

        if "partition_min" in kwargs:
            warnings.warn(
                """
                The parameter `partition_min` is deprecated and will be removed in
                version 2.7 of saqc. Please us the parameter `min_periods` instead.
                """,
                DeprecationWarning,
            )
            min_periods = kwargs["partition_min"]

        if min_periods != 11:
            warnings.warn(
                """
                You were setting a customary value for the `min_periods` parameter:
                note that this parameter does no longer refer to the reduction interval
                length, but now controls the number of periods having to be present in
                an interval of size `window` (deprecated:`partition`) for the algorithm
                to be performed in that interval.
                To alter the size of the reduction window, use the parameter
                `min_periods_r`. Changes readily apply.
                This warning will be removed in saqc version 2.7.
                """,
                DeprecationWarning,
            )

        warnings.warn(
            """
                flagMVScores is deprecated and will be removed with Version 2.8.
                To replicate the function, transform the different fields involved
                via explicit applications of some transformations, than calculate the
                kNN scores via `saqc.SaQC.assignkNScores` and finally assign the STRAY
                algorithm via `saqc.SaQC.flagByStray`.
                """,
            DeprecationWarning,
        )

        # Hint: checking is delegated to the called functions

        fields = toSequence(field)

        qc = self
        fields_ = []
        for f in fields:
            field_ = str(uuid.uuid4())
            qc = qc.copyField(field=f, target=field_)
            qc = qc.transform(field=field_, func=trafo, freq=window)
            fields_.append(field_)

        knn_field = str(uuid.uuid4())
        qc = qc.assignKNNScore(
            field=fields_,
            target=knn_field,
            n=n,
            func=func,
            freq=window,
            algorithm="ball_tree",
            min_periods=min_periods,
            **kwargs,
        )
        for field_ in fields_:
            qc = qc.dropField(field_)

        qc = qc.flagByStray(
            field=knn_field,
            freq=window,
            min_periods=min_periods,
            iter_start=iter_start,
            alpha=alpha,
            flag=flag,
            **kwargs,
        )

        qc._data, qc._flags = _evalStrayLabels(
            data=qc._data,
            field=knn_field,
            target=fields,
            flags=qc._flags,
            reduction_range=stray_range,
            reduction_drop_flagged=drop_flagged,
            reduction_thresh=thresh,
            reduction_min_periods=min_periods_r,
            flag=flag,
            **kwargs,
        )
        return qc.dropField(knn_field)

    @flagging()
    def flagRaise(
        self: "SaQC",
        field: str,
        thresh: float,
        raise_window: str,
        freq: str,
        average_window: str | None = None,
        raise_factor: float = 2.0,
        slope: float | None = None,
        weight: float = 0.8,
        flag: float = BAD,
        **kwargs,
    ) -> "SaQC":
        """
        The function flags raises and drops in value courses, that exceed a certain threshold within a certain timespan.

        .. deprecated:: 2.6.0
           Function is deprecated since its not humanly parameterisable. Also more suitable alternatives are available. Depending on use case, use: :py:meth:`~saqc.SaQC.flagUniLOF`, :py:meth:`~saqc.SaQC.flagZScore`, :py:meth:`~saqc.SaQC.flagJumps` instead.

        Parameters
        ----------
        thresh :
            The threshold, for the total rise (:py:attr:`thresh` ``> 0``),
            or total drop (:py:attr:`thresh` ``< 0``), value courses must
            not exceed within a timespan of length :py:attr:`raise_window`.

        raise_window :
            An offset string, determining the timespan, the rise/drop
            thresholding refers to. Window is inclusively defined.

        freq :
            An offset string, determining the frequency, the timeseries
            to flag is supposed to be sampled at. The window is inclusively
            defined.

        average_window :
            See condition (2) of the description given in the Notes. Window
            is inclusively defined, defaults to 1.5 times the size of
            :py:attr:`raise_window`.

        raise_factor :
            See condition (2).

        slope :
            See condition (3).

        weight :
            See condition (3).

        Notes
        -----
        The dataset is NOT supposed to be harmonized to a time series with an
        equidistant requency grid.

        The value :math:`x_{k}` of a time series :math:`x` with associated
        timestamps :math:`t_i`, is flagged a raise, if:

        1. There is any value :math:`x_{s}`, preceeding :math:`x_{k}` within
           :py:attr:`raise_window` range, so that
           :math:`M = |x_k - x_s | >`  :py:attr:`thresh` :math:`> 0`

        2. The weighted average :math:`\\mu^{*}` of the values, preceding
           :math:`x_{k}` within :py:attr:`average_window` range indicates,
           that :math:`x_{k}` does not return from an "outlierish" value
           course, meaning that
           :math:`x_k > \\mu^* + ( M` / :py:attr:`raise_factor` :math:`)`

        3. Additionally, if :py:attr:`slope` is not ``None``, :math:`x_{k}`
           is checked or being sufficiently divergent from its very predecessor
           :math:`x_{k-1}`, meaning that, it is additionally checked if:
           * :math:`x_k - x_{k-1} >` :py:attr:`slope`
           * :math:`t_k - t_{k-1} >` :py:attr:`weight` :math:`\\times` :py:attr:`freq`
        """

        warnings.warn(
            "The function flagRaise is deprecated with no 100% exact replacement function."
            "When looking for changes in the value course, the use of flagRaise can be replicated and more "
            "easily aimed for, via the method flagJump.\n"
            "When looking for raises to outliers or plateaus, use one of: "
            "flagZScore (outliers), flagUniLOF (outliers and small plateaus) or flagOffset (plateaus)",
            DeprecationWarning,
        )

        validateWindow(raise_window, "raise_window", allow_int=False)
        validateWindow(freq, "freq", allow_int=False)
        validateWindow(average_window, "average_window", allow_int=False, optional=True)

        # prepare input args
        dataseries = self._data[field].dropna()
        raise_window_td = pd.Timedelta(raise_window)
        freq_dt = pd.Timedelta(freq)
        if slope is not None:
            slope = np.abs(slope)

        if average_window is None:
            average_window = 1.5 * raise_window_td

        if thresh < 0:
            dataseries *= -1
            thresh *= -1

        def raise_check(x, thresh):
            test_set = x[-1] - x[0:-1]
            max_val = np.max(test_set)
            if max_val >= thresh:
                return max_val
            else:
                return np.nan

        def custom_rolling_mean(x):
            return np.sum(x[:-1])

        # get invalid-raise/drop mask:
        raise_series = dataseries.rolling(raise_window_td, min_periods=2, closed="both")

        raise_series = raise_series.apply(raise_check, args=(thresh,), raw=True)

        if raise_series.isna().all():
            return self

        # "unflag" values of insufficient deviation to their predecessors
        if slope is not None:
            w_mask = (
                pd.Series(dataseries.index).diff().dt.total_seconds()
                / freq_dt.total_seconds()
            ) > weight
            slope_mask = np.abs(dataseries.diff()) < slope
            to_unflag = raise_series.notna() & w_mask.values & slope_mask
            raise_series[to_unflag] = np.nan

        # calculate and apply the weighted mean weights (pseudo-harmonization):
        weights = (
            pd.Series(dataseries.index).diff(periods=2).shift(-1).dt.total_seconds()
            / freq_dt.total_seconds()
            / 2
        )

        weights.iloc[0] = 0.5 + (
            pd.Timestamp(dataseries.index[1]) - pd.Timestamp(dataseries.index[0])
        ).total_seconds() / (freq_dt.total_seconds() * 2)

        weights.iloc[-1] = 0.5 + (
            pd.Timestamp(dataseries.index[-1]) - pd.Timestamp(dataseries.index[-2])
        ).total_seconds() / (freq_dt.total_seconds() * 2)

        weights[weights > 1.5] = 1.5
        weights.index = dataseries.index
        weighted_data = dataseries.mul(weights)

        # rolling weighted mean calculation
        weighted_rolling_mean = weighted_data.rolling(
            average_window, min_periods=2, closed="both"
        )
        weights_rolling_sum = weights.rolling(
            average_window, min_periods=2, closed="both"
        )
        weighted_rolling_mean = weighted_rolling_mean.apply(
            custom_rolling_mean, raw=True
        )
        weights_rolling_sum = weights_rolling_sum.apply(custom_rolling_mean, raw=True)

        weighted_rolling_mean = weighted_rolling_mean / weights_rolling_sum
        # check means against critical raise value:
        to_flag = dataseries >= weighted_rolling_mean + (raise_series / raise_factor)
        to_flag &= raise_series.notna()
        self._flags[to_flag[to_flag].index, field] = flag

        return self

    @flagging()
    def flagMAD(
        self: "SaQC",
        field: str,
        window: str | int | None = None,
        z: float = 3.5,
        min_residuals: int | None = None,
        min_periods: int | None = None,
        center: bool = False,
        flag: float = BAD,
        **kwargs,
    ) -> "SaQC":
        """
        Flag outiers using the modified Z-score outlier detection method.

        See references [1] for more details on the algorithm.

            .. deprecated:: 2.6.0
               Deprecated Function. Please refer to :py:meth:`~saqc.SaQC.flagZScore`.


        Note
        ----
        Data needs to be sampled at a regular equidistant time grid.

        Parameters
        ----------
        window :
            Size of the window. Either given as an Offset String, denoting the window's temporal extension or
            as an integer, denoting the window's number of periods. ``NaN`` also count as periods.
            If ``None``, all data points share the same scoring window, which than equals the whole data.
        z :
            The value the Z-score is tested against. Defaulting to ``3.5`` (Recommendation of [1])
        min_periods :
            Minimum number of valid meassurements in a scoring window, to consider the resulting score valid.
        center :
            Weather or not to center the target value in the scoring window. If ``False``, the
            target value is the last value in the window.

        References
        ----------
        [1] https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm
        """
        warnings.warn(
            f"The method `flagMAD` is deprecated and will be removed in "
            "version 2.7 of saqc. To achieve the same behavior use:"
            f"`qc.flagZScore(field={field}, window={window}, method='modified', "
            f"thresh={z}, min_residuals={min_residuals}, min_periods={min_periods}, "
            f"center={center})`",
            DeprecationWarning,
        )
        return self.flagZScore(
            field,
            window=window,
            thresh=z,
            min_residuals=min_residuals,
            model_func=np.median,
            norm_func=lambda x: median_abs_deviation(
                x, scale="normal", nan_policy="omit"
            ),
            center=center,
            min_periods=min_periods,
            flag=flag,
        )

    @flagging()
    def flagOffset(
        self: "SaQC",
        field: str,
        tolerance: float,
        window: int | str,
        thresh: float | None = None,
        thresh_relative: float | None = None,
        flag: float = BAD,
        **kwargs,
    ) -> "SaQC":
        """
        A basic outlier test that works on regularly and irregularly sampled data.

        The test classifies values/value courses as outliers by detecting not only a rise
        in value, but also, by checking for a return to the initial value level.

        Notes
        -----
        This definition of a "spike" not only includes one-value outliers, but also plateau-ish value courses.

        Values :math:`x_n, x_{n+1}, .... , x_{n+k}` of a timeseries :math:`x` with
        associated timestamps :math:`t_n, t_{n+1}, .... , t_{n+k}` are considered spikes, if:

        1. :math:`|x_{n-1} - x_{n + s}| >` :py:attr:`thresh`, for all :math:`s \\in [0,1,2,...,k]`
        2. if :py:attr:`thresh_relative` > 0, :math:`x_{n + s} > x_{n - 1}*(1+` :py:attr:`thresh_relative` :math:`)`
        3. if :py:attr:`thresh_relative` < 0, :math:`x_{n + s} < x_{n - 1}*(1+` :py:attr:`thresh_relative` :math:`)`
        4. :math:`|x_{n-1} - x_{n+k+1}| <` :py:attr:`tolerance`
        5. :math:`|t_{n-1} - t_{n+k+1}| <` :py:attr:`window`


        Parameters
        ----------
        tolerance :
            Maximum difference allowed between the value, directly preceding and the value directly
            succeeding an offset to trigger flagging of the offsetting values. See condition (4).
        window :
            Maximum length allowed for offset value courses, to trigger flagging of the offsetting values.
            See condition (5). Integer defined window length are only allowed for regularly sampled
            timeseries.
        thresh :
            Minimum difference between a value and its successors, to consider the successors an anomalous
            offset group. See condition (1). If ``None``, condition (1) is not tested.
        thresh_relative :
            Minimum relative change between a value and its successors, to consider the successors an anomalous
            offset group. See condition (2). If ``None``, condition (2) is not tested.

        Examples
        --------
        Below picture gives an abstract interpretation of the parameter interplay in case of a positive
        value jump, initialising an offset course.

        .. figure:: /resources/images/flagOffsetPic.png

           The four values marked red, are flagged, because (1) the initial value jump *exceeds* the value
           given by :py:attr:`thresh`, (2) the temporal extension of the group does *not exceed* the range
           given by `window` and (3) the returning value after the group, lies *within* the value range
           determined by :py:attr:`tolerance`


        .. plot::
           :context:
           :include-source: False

           import matplotlib
           import saqc
           import pandas as pd
           data = pd.DataFrame({'data':np.array([5,5,8,16,17,7,4,4,4,1,1,4])}, index=pd.date_range('2000',freq='1h', periods=12))


        Lets generate a simple, regularly sampled timeseries with an hourly sampling rate and generate an
        :py:class:`saqc.SaQC` instance from it.

        .. doctest:: flagOffsetExample

           >>> import saqc
           >>> data = pd.DataFrame({'data':np.array([5,5,8,16,17,7,4,4,4,1,1,4])}, index=pd.date_range('2000',freq='1h', periods=12))
           >>> data
                                data
           2000-01-01 00:00:00     5
           2000-01-01 01:00:00     5
           2000-01-01 02:00:00     8
           2000-01-01 03:00:00    16
           2000-01-01 04:00:00    17
           2000-01-01 05:00:00     7
           2000-01-01 06:00:00     4
           2000-01-01 07:00:00     4
           2000-01-01 08:00:00     4
           2000-01-01 09:00:00     1
           2000-01-01 10:00:00     1
           2000-01-01 11:00:00     4
           >>> qc = saqc.SaQC(data)

        Now we are applying :py:meth:`~saqc.SaQC.flagOffset` and try to flag offset courses, that dont extend
        longer than *6 hours* in time (:py:attr:`window`) and that have an initial value jump higher than ``2``
        (:py:attr:`thresh`), and that do return to the initial value level within a tolerance of ``1.5``
        (:py:attr:`tolerance`).

        .. doctest:: flagOffsetExample

           >>> qc = qc.flagOffset("data", thresh=2, tolerance=1.5, window='6h')
           >>> qc.plot('data')  # doctest: +SKIP

        .. plot::
           :context: close-figs
           :include-source: False

           >>> qc = saqc.SaQC(data)
           >>> qc = qc.flagOffset("data", thresh=2, tolerance=1.5, window='6h')
           >>> qc.plot('data')  # doctest: +SKIP

        Note, that both, negative and positive jumps are considered starting points of negative or positive
        offsets. If you want to impose the additional condition, that the initial jump must exceed
        +90%* of the value level, you can additionally set the :py:attr:`thresh_relative` parameter:

        .. doctest:: flagOffsetExample

           >>> qc = qc.flagOffset("data", thresh=2, thresh_relative=.9, tolerance=1.5, window='6h')
           >>> qc.plot('data') # doctest:+SKIP

        .. plot::
           :context: close-figs
           :include-source: False

           >>> qc = saqc.SaQC(data)
           >>> qc = qc.flagOffset("data", thresh=2, thresh_relative=.9, tolerance=1.5, window='6h')
           >>> qc.plot('data')  # doctest: +SKIP

        Now, only positive jumps, that exceed a value gain of +90%* are considered starting points of offsets.

        In the same way, you can aim for only negative offsets, by setting a negative relative threshold.
        The below example only flags offsets, that fall off by at least *50%* in value, with an absolute
        value drop of at least 2.

        .. doctest:: flagOffsetExample

           >>> qc = qc.flagOffset("data", thresh=2, thresh_relative=-.5, tolerance=1.5, window='6h')
           >>> qc.plot('data') # doctest:+SKIP

        .. plot::
           :context: close-figs
           :include-source: False

           >>> qc = saqc.SaQC(data)
           >>> qc = qc.flagOffset("data", thresh=2, thresh_relative=-.5, tolerance=1.5, window='6h')
           >>> qc.plot('data')  # doctest: +SKIP
        """
        validateWindow(window)
        if thresh is None and thresh_relative is None:
            raise ValueError(
                "At least one of parameters 'thresh' and 'thresh_relative' "
                "has to be given. Got 'thresh'=None, 'thresh_relative'=None "
                "instead."
            )
        if thresh is None:
            thresh = 0

        dat = self._data[field].dropna()
        if thresh_relative is not None:
            rel_jumps = np.sign(thresh_relative) * dat > np.sign(
                thresh_relative
            ) * dat.shift(+1) * (1 + thresh_relative)

        data_diff = dat.diff()
        initial_jumps = data_diff.abs() > thresh
        if thresh_relative:
            initial_jumps &= rel_jumps
        return_in_time = (
            dat[::-1]
            .rolling(window, min_periods=2)
            .apply(lambda x: np.abs(x[-1] - x[:-1]).min() < tolerance, raw=True)[::-1]
            .astype(bool)
        )
        return_in_time = return_in_time & initial_jumps.reindex(
            dat.index, fill_value=False
        ).shift(-1, fill_value=False)
        offset_start_candidates = dat[return_in_time]
        win_delta = pd.Timedelta(window)
        corners = pd.Series(False, index=dat.index)
        to_flag = pd.Series(False, index=dat.index)
        ns = pd.Timedelta("1ns")
        for c in zip(offset_start_candidates.index, offset_start_candidates.values):
            ret = (dat[c[0]] - dat[c[0] + ns : c[0] + win_delta]).abs()[1:] < tolerance
            if not ret.empty:
                r = ret.idxmax()
                chunk = dat[c[0] : r]
                sgn = np.sign(chunk.iloc[1] - c[1])
                t_val = ((chunk[1:-1] - c[1]) * sgn > thresh).all()
                r_val = True
                if thresh_relative:
                    r_val = (
                        np.sign(thresh_relative) * chunk[1:-1]
                        > np.sign(thresh_relative) * c[1] * (1 + thresh_relative)
                    ).all()
                if t_val and r_val and (not corners[c[0]]):
                    flag_i = dat[c[0] + ns : chunk.index[-1] - ns].index
                    to_flag[flag_i] = True
                    corners.loc[flag_i[-1]] = True
        to_flag = to_flag.reindex(self._data[field].index, fill_value=False)

        self._flags[to_flag, field] = flag
        return self

    @flagging()
    def flagByGrubbs(
        self: "SaQC",
        field: str,
        window: str | int,
        alpha: float = 0.05,
        min_periods: int = 8,
        pedantic: bool = False,
        flag: float = BAD,
        **kwargs,
    ) -> "SaQC":
        """
        Flag outliers using the Grubbs algorithm.

        .. deprecated:: 2.6.0
           Use :py:meth:`~saqc.SaQC.flagUniLOF` or :py:meth:`~saqc.SaQC.flagZScore` instead.

        Parameters
        ----------
        window :
            Size of the testing window.
            If an integer, the fixed number of observations used for each window.
            If an offset string the time period of each window.

        alpha :
            Level of significance, the grubbs test is to be performed at. Must be between 0 and 1.

        min_periods :
            Minimum number of values needed in a :py:attr:`window` in order to perform the grubs test.
            Ignored if :py:attr:`window` is an integer.

        pedantic :
            If ``True``, every value gets checked twice. First in the initial rolling :py:attr:`window`
            and second in a rolling window that is lagging by :py:attr:`window` / 2. Recommended to avoid
            false positives at the window edges. Ignored if :py:attr:`window` is an offset string.

        References
        ----------
        introduction to the grubbs test:

        [1] https://en.wikipedia.org/wiki/Grubbs%27s_test_for_outliers
        """

        warnings.warn(
            "The function flagByGrubbs is deprecated due to its inferior performance, with "
            "no 100% exact replacement function. When looking for outliers use one of: "
            "flagZScore, flagUniLOF",
            DeprecationWarning,
        )

        validateWindow(window)
        validateFraction(alpha, "alpha")
        validateMinPeriods(min_periods, optional=False)

        datcol = self._data[field].copy()
        rate = getFreqDelta(datcol.index)

        # if timeseries that is analyzed, is regular,
        # window size can be transformed to a number of periods:
        if rate and isinstance(window, str):
            window = pd.Timedelta(window) // rate

        to_group = pd.DataFrame(data={"ts": datcol.index, "data": datcol})
        to_flag = pd.Series(False, index=datcol.index)

        # period number defined test intervals
        if isinstance(window, int):
            grouper_series = pd.Series(
                data=np.arange(0, len(datcol)), index=datcol.index
            )
            grouper_series_lagged = grouper_series + (window / 2)
            grouper_series = grouper_series.transform(lambda x: x // window)
            grouper_series_lagged = grouper_series_lagged.transform(
                lambda x: x // window
            )
            partitions = to_group.groupby(grouper_series)
            partitions_lagged = to_group.groupby(grouper_series_lagged)

        # offset defined test intervals:
        else:
            partitions = to_group.groupby(pd.Grouper(freq=window))
            partitions_lagged = []

        for _, partition in partitions:
            if partition.shape[0] > min_periods:
                detected = smirnov_grubbs.two_sided_test_indices(
                    partition["data"].values, alpha=alpha
                )
                detected = partition["ts"].iloc[detected]
                to_flag[detected.index] = True

        if isinstance(window, int) and pedantic:
            to_flag_lagged = pd.Series(False, index=datcol.index)

            for _, partition in partitions_lagged:
                if partition.shape[0] > min_periods:
                    detected = smirnov_grubbs.two_sided_test_indices(
                        partition["data"].values, alpha=alpha
                    )
                    detected = partition["ts"].iloc[detected]
                    to_flag_lagged[detected.index] = True

            to_flag &= to_flag_lagged

        self._flags[to_flag, field] = flag
        return self

    @register(
        mask=["field"],
        demask=["field"],
        squeeze=["field"],
        multivariate=True,
        docstring={"field": DOC_TEMPLATES["field"]},
    )
    def flagZScore(
        self: "SaQC",
        field: Sequence[str],
        method: Literal["standard", "modified"] = "standard",
        window: str | int | None = None,
        thresh: float = 3,
        min_residuals: int | None = None,
        min_periods: int | None = None,
        center: bool = True,
        axis: int = 0,
        flag: float = BAD,
        **kwargs,
    ) -> "SaQC":
        """
        Flag data where its (rolling) Zscore exceeds a threshold.

        The function implements flagging derived from standard or modified Zscore calculation. To handle non
        stationary data, the Zscoring can be applied with a rolling window. Therefor, the function
        allows for a minimum residual to be specified in order to mitigate overflagging in local
        regimes of low variance.

        See the Notes section for a detailed overview of the calculation

        Parameters
        ----------
        window :
            Size of the window. Either determined via an offset string, denoting the windows temporal
            extension or by an integer, denoting the windows number of periods. ``NaN`` also count as
            periods. If ``None`` is passed, all data points share the same scoring window, which than
            equals the whole data.
        method :
            Which method to use for ZScoring:

            * `"standard"`: standard Zscoring, using *mean* for the expectation and *standard deviation (std)* as scaling factor
            * `"modified"`: modified Zscoring, using *median* as the expectation and *median absolute deviation (MAD)* as the scaling Factor

            See notes section for detailed scoring formula
        thresh :
            Cutoff level for the Zscores, above which associated points are marked as outliers.
        min_residuals :
            Minimum residual value points must have to be considered outliers.
        min_periods :
            Minimum number of valid meassurements in a scoring window, to consider the resulting score valid.
        center :
            Weather or not to center the target value in the scoring window. If ``False``, the
            target value is the last value in the window.
        axis :
            Along which axis to calculate the scoring statistics:

            * `0` (default) - calculate statistics along time axis
            * `1` - calculate statistics over multiple variables

            See Notes section for a visual clarification of the workings
            of `axis` and `window`.

        Notes
        -----

        The flag for :math:`x` is determined as follows:

        1. Depending on ``window`` and ``axis``, the context population :math:`X` is collected (see pictures below)

           * If ``axis=0``, any value is flagged in the context of those values of the same variable (``field``), that are
             in `window` range.
           * If ``axis=1``, any value is flagged in the context of all values of all variables (``fields``), that are
             in `window` range.
           * If ``axis=0`` and ``window=1``, any value is flagged in the context of all values of all variables (``fields``),
             that share the same timestamp.

        .. figure:: /resources/images/ZscorePopulation.png
           :class: with-border




        2. Depending on ``method``, a score :math:`Z` is calculated for :math:`x` via :math:`Z = \\frac{|E(X) - X|}{S(X)}`

           * ``method="standard"``: :math:`E(X)=mean(X)`, :math:`S(X)=std(X)`
           * ``method="modified"``: :math:`E(X)=median(X)`, :math:`S(X)=MAD(X)`

        3. :math:`x` is flagged, if :math:`Z >` ``thresh``
        """

        if "norm_func" in kwargs or "model_func" in kwargs:
            warnings.warn(
                "Parameters norm_func and model_func are deprecated, use parameter method instead.\n"
                'To model with mean and scale with standard deviation, use method="standard".\n'
                'To model with median and scale with median absolute deviation (MAD) use method="modified".\n'
                "Other/Custom model and scaling functions are not supported any more"
            )
            if (
                "mean" in kwargs.get("model_func", "").__name__
                or "std" in kwargs.get("norm_func", "").__name__
            ):
                method = "standard"
            elif (
                "median" in kwargs.get("model_func", lambda x: x).__name__
                or "median" in kwargs.get("norm_func", lambda x: x).__name__
            ):
                method = "modified"
            else:
                raise ValueError(
                    "Support for scoring with functions not similar to "
                    "either Zscore or modified Zscore is not supported "
                    "anymore"
                )

        validateChoice(method, "method", ["standard", "modified"])
        validateWindow(window, optional=True)
        validateMinPeriods(min_periods)

        min_residuals = min_residuals or 0
        min_periods = min_periods or 0

        dat = self._data[field].to_pandas(how="outer")
        if dat.empty:
            return self

        if window is None:
            if dat.notna().sum().sum() >= min_periods:
                if method == "standard":
                    mod = pd.DataFrame(
                        {f: dat[f].mean() for f in dat.columns}, index=dat.index
                    )
                    norm = pd.DataFrame(
                        {f: dat[f].std() for f in dat.columns}, index=dat.index
                    )

                else:
                    mod = pd.DataFrame(
                        {f: dat[f].median() for f in dat.columns}, index=dat.index
                    )
                    norm = pd.DataFrame(
                        {f: (dat[f] - mod[f]).abs().median() for f in dat.columns},
                        index=dat.index,
                    )
            else:
                return self

        else:  # window is not None
            if axis == 0:
                if method == "standard":
                    mod = dat.rolling(
                        window, center=center, min_periods=min_periods
                    ).mean()
                    norm = dat.rolling(
                        window, center=center, min_periods=min_periods
                    ).std()
                else:
                    mod = dat.rolling(
                        window, center=center, min_periods=min_periods
                    ).median()
                    norm = (
                        (mod - dat)
                        .abs()
                        .rolling(window, center=center, min_periods=min_periods)
                        .median()
                    )

            else:  # axis == 1:
                if window == 1:
                    if method == "standard":
                        mod = dat.mean(axis=1)
                        norm = dat.std(axis=1)
                    else:  # method == 'modified'
                        mod = dat.median(axis=1)
                        norm = (dat.subtract(mod, axis=0)).abs().median(axis=1)
                else:  # window > 1
                    if method == "standard":
                        mod = windowRoller(dat, window, "mean", min_periods, center)
                        norm = windowRoller(dat, window, "std", min_periods, center)
                    else:  # method == 'modified'
                        mod = windowRoller(dat, window, "median", min_periods, center)
                        norm = windowRoller(
                            dat.subtract(mod, axis=0).abs(),
                            window,
                            "median",
                            min_periods,
                            center,
                        )
        residuals = dat.subtract(mod, axis=0).abs()
        score = residuals.divide(norm, axis=0)

        to_flag = (score.abs() > thresh) & (residuals >= min_residuals)
        for f in field:
            self._flags[to_flag[f], f] = flag
        return self


def _evalStrayLabels(
    data: DictOfSeries,
    field: str,
    flags: Flags,
    target: Sequence[str],
    reduction_range: str | None = None,
    reduction_drop_flagged: bool = False,  # TODO: still a case ?
    reduction_thresh: float = 3.5,
    reduction_min_periods: int = 1,
    at_least_one: bool = True,
    flag: float = BAD,
    **kwargs,
) -> Tuple[DictOfSeries, Flags]:
    """
    The function "reduces" an observations flag to components of it, by applying MAD
    (See references) test onto every components temporal surrounding.

    Parameters
    ----------
    data :
        A dictionary of pandas.Series, holding all the data.

    field :
        The fieldname of the column, holding the labels to be evaluated.

    flags :
        Container to store quality flags to data.

    target :
        A list of strings, holding the column names of the variables, the stray labels
        shall be projected onto.

    val_frame :
        Input NxM DataFrame of observations, where N is the number of observations and
        M the number of components per observation.

    to_flag_frame :
        Input dataframe of observations to be tested, where N is the number of
        observations and M the number of components per observation.

    reduction_range :
        An offset string, denoting the range of the temporal surrounding to include
        into the MAD testing. If ``None`` is passed, no testing will be performed and
        all targets will have the stray flag projected.

    reduction_drop_flagged :
        Wheather or not to drop flagged values other than the value under test, from the
        temporal surrounding before checking the value with MAD.

    reduction_thresh :
        The `critical` value, controlling wheather the MAD score is considered
        referring to an outlier or not. Higher values result in less rigid flagging.
        The default value is widely used in the literature. See references section
        for more details ([1]).

    at_least_one :
        If none of the variables, the outlier label shall be reduced to, is an outlier
        with regard to the test, all (True) or none (False) of the variables are flagged

    flag : float, default BAD
        flag to set.

    References
    ----------
    [1] https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm
    """
    val_frame = data[target].to_pandas()
    stray_detects = flags[field] > UNFLAGGED
    stray_detects = stray_detects[stray_detects]
    to_flag_frame = pd.DataFrame(False, columns=target, index=stray_detects.index)

    if reduction_range is None:
        for field in to_flag_frame.columns:
            flags[to_flag_frame.index, field] = flag
        return data, flags

    for var in target:
        for index in enumerate(to_flag_frame.index):
            index_slice = slice(
                index[1] - pd.Timedelta(reduction_range),
                index[1] + pd.Timedelta(reduction_range),
            )
            test_slice = val_frame[var][index_slice].dropna()

            # check, wheather value under test is sufficiently centered:
            first = test_slice.first_valid_index()
            last = test_slice.last_valid_index()
            min_range = pd.Timedelta(reduction_range) / 4

            if (
                pd.Timedelta(index[1] - first) < min_range
                or pd.Timedelta(last - index[1]) < min_range
            ):
                polydeg = 0
            else:
                polydeg = 2

            if reduction_drop_flagged:
                test_slice = test_slice.drop(to_flag_frame.index, errors="ignore")

            if len(test_slice) < reduction_min_periods:
                to_flag_frame.loc[index[1], var] = True
                continue

            x = test_slice.index.values.astype(float)
            x_0 = x[0]
            x = (x - x_0) / 10**12

            polyfitted = poly.polyfit(y=test_slice.values, x=x, deg=polydeg)

            testval = poly.polyval(
                (float(index[1].to_numpy()) - x_0) / 10**12, polyfitted
            )
            testval = val_frame[var][index[1]] - testval

            resids = test_slice.values - poly.polyval(x, polyfitted)
            med_resids = np.median(resids)
            MAD = np.median(np.abs(resids - med_resids))
            crit_val = 0.6745 * (abs(med_resids - testval)) / MAD

            if crit_val > reduction_thresh:
                to_flag_frame.loc[index[1], var] = True

    if at_least_one:
        to_flag_frame[~to_flag_frame.any(axis=1)] = True

    for field in to_flag_frame.columns:
        col = to_flag_frame[field]
        flags[col[col].index, field] = flag

    return data, flags