Skip to content
Snippets Groups Projects
Commit dd90847b authored by Bert Palm's avatar Bert Palm 🎇
Browse files

improved slidingOutlier formerly known as polyresmad/MAD

parent 2402e474
No related branches found
No related tags found
No related merge requests found
......@@ -2,9 +2,12 @@
# -*- coding: utf-8 -*-
import logging
import time
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
from scipy.stats import zscore
from .register import register
import numpy.polynomial.polynomial as poly
......@@ -14,57 +17,118 @@ from ..lib.tools import (
getPandasVarNames,
getPandasData,
offset2seconds,
checkQCParameters)
checkQCParameters,
slidingWindowIndices)
@register("sliding_outlier")
def slidingOutlier(data, flags, field, flagger, winsz='1h', dx='1h', count=1, deg=1, z=3.5, method='modZ', **kwargs):
""" A outlier detection in a sliding window. The method for detection can be a simple Z-score or the more robust
modified Z-score, as introduced here [1].
The steps are:
1. a window of size `winsz` is cut from the data
2. the data is fit by a polynomial of the given degree `deg`
3. the outlier `method` detect potential outlier
4. the window is continued by `dx` to the next data-slot.
5. processing continue at 1. until end of data.
5. all potential outlier, that are detected `count`-many times, are promoted to real outlier and flagged by the
`flagger`
:param data: pandas dataframe. holding the data
:param flags: pandas dataframe. holding the flags
:param field: fieldname in `data` and `flags`, which holds the relevant infos
:param flagger: flagger.
:param winsz: int or time-offset string (see [2]). The size of the window the outlier detection is run in. default: 1h
:param dx: int or time-offset string (see [2]). Stepsize the window is set further. default: 1h
:param method: str. `modZ` or `zscore`. see [1] at section `Z-Scores and Modified Z-Scores`
:param count: int. this many times, a datapoint needs to be detected in different windows, to be finally
flagged as outlier
:param deg: int. The degree for the polynomial fit, to calculate the residuum
:param z: float. the value the (mod.) Z-score is tested against. Defaulting to 3.5 (Recommendation of [1])
Links:
[1] https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm
[2] https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects
"""
@register("polyresmad")
def polyResMad(data, flags, field, flagger, winsz, count=1, deg=1, dx=1, z=3.5, **kwargs):
d = getPandasData(data, field).copy()
d = d[d.notna()]
x = (d.index - d.index.min()).total_seconds().values
y = d.values
use_offset = False
dx_s = dx
winsz_s = winsz
# check param consistency
if isinstance(winsz, str) or isinstance(dx, str):
if isinstance(winsz, str) and isinstance(dx, str):
use_offset = True
dx_s = offset2seconds(dx)
winsz_s = offset2seconds(winsz)
else:
raise TypeError("`winsz` and `dx` must both be an offset or both be numeric")
# checks
if len(x) < deg + 1:
raise ValueError(f'deg {deg} to low')
# check params
if deg < 0:
raise ValueError("deg must be positive")
if dx <= 0:
raise ValueError("step size `dx` must be positive and not zero")
if dx >= winsz and count > 1:
ValueError("If stepsize `dx` is bigger that the window every value is just seen once, so use count=1")
if winsz < count * dx:
if z < 0:
raise ValueError("z must be positive")
if count <= 0:
raise ValueError("count must be positive and not zero")
if dx_s >= winsz_s and count > 1:
ValueError("If stepsize `dx` is bigger that the window-size, every value is seen just once, so use count=1")
elif count > winsz_s // dx_s:
raise ValueError(f"Adjust `dx`, `stepsize` or `winsz`. A single data point is "
f"seen `winsz / dx = {winsz // dx}` times, but count is set to {count}")
f"seen `floor(winsz / dx) = {winsz_s // dx_s}` times, but count is set to {count}")
# prepare the method
if method == 'modZ':
def calc_(residual):
diff = np.abs(residual - np.median(residual))
mad = np.median(diff)
return (mad > 0) & (0.6745 * diff > z * mad)
elif method == 'zscore':
def calc_(residual):
score = zscore(residual, ddof=1)
return np.abs(score) > z
else:
raise NotImplementedError
method = calc_
# prepare data, work on numpy arrays for the fulfilling pleasure of performance
d = data[field].dropna()
all_indices = np.arange(len(d.index))
x = (d.index - d.index[0]).total_seconds().values
y = d.values
counters = np.full(len(d.index), count)
counters = np.ones(len(x)) * count
if use_offset:
loopfun = slidingWindowIndices
else:
def loopfun (arr, wsz, step):
for i in range(0, len(arr) - wsz + 1, step):
yield i, i + wsz
# sliding window loop
for i in range(0, len(x) - winsz + 1, dx):
# Indices of the chunk
chunk = np.arange(i, i + winsz)
# Exclude points that have been already discarded
indices = chunk[counters[chunk] > 0]
for start, end in loopfun(d.index, winsz, dx):
# mask points that have been already discarded
mask = counters[start:end] > 0
indices = all_indices[all_indices[start:end][mask]]
xchunk = x[indices]
ychunk = y[indices]
if xchunk.size == 0:
continue
# get residual
xx, yy = x[indices], y[indices]
coef = poly.polyfit(xx, yy, deg)
model = poly.polyval(xx, coef)
residual = yy - model
coef = poly.polyfit(xchunk, ychunk, deg)
model = poly.polyval(xchunk, coef)
residual = ychunk - model
# calc mad
diff = np.abs(residual - np.median(residual))
mad = np.median(diff)
zscore = (mad > 0) & (0.6745 * diff > z * mad)
score = method(residual)
# count`em in
goneMad = np.where(zscore)[0]
goneMad = score.nonzero()[0]
counters[indices[goneMad]] -= 1
outlier = np.where(counters <= 0)[0]
idx = d.iloc[outlier.tolist()].index
flags = flagger.setFlags(flags, field, idx, **kwargs)
loc = d[outlier].index
flags = flagger.setFlags(flags, field, loc=loc, **kwargs)
return data, flags
......
......@@ -9,7 +9,7 @@ from saqc.flagger.baseflagger import BaseFlagger
from saqc.flagger.dmpflagger import DmpFlagger
from saqc.flagger.simpleflagger import SimpleFlagger
from saqc.funcs.spike_detection import flagSpikes_SpektrumBased, flagMad, polyResMad, flagSpikes_Basic
from saqc.funcs.spike_detection import flagSpikes_SpektrumBased, flagMad, slidingOutlier, flagSpikes_Basic
from saqc.lib.tools import getPandasData
......@@ -50,14 +50,24 @@ def test_flagMad(spiky_data, flagger):
@pytest.mark.parametrize('flagger', TESTFLAGGERS)
def test_flagPolyResMad(spiky_data, flagger):
data = spiky_data[0]
flags = flagger.initFlags(data.to_frame())
data, flag_result = polyResMad(data, flags, 'spiky_data', flagger, winsz=300, dx=50)
@pytest.mark.parametrize('method', ['modZ', 'zscore'])
def test_slidingOutlier(spiky_data, flagger, method):
# test for numeric input
data = spiky_data[0].to_frame()
flags = flagger.initFlags(data)
_, flag_result = slidingOutlier(data, flags, 'spiky_data', flagger, winsz=300, dx=50, method=method)
flag_result = getPandasData(flag_result, 0)
test_sum = (flag_result[spiky_data[1]] == flagger.BAD).sum()
assert test_sum == len(spiky_data[1])
# test for offset input
_, flag_result = slidingOutlier(data, flags, 'spiky_data', flagger, winsz='1500min', dx='250min', method=method)
flag_result = getPandasData(flag_result, 0)
test_sum = (flag_result[spiky_data[1]] == flagger.BAD).sum()
assert test_sum == len(spiky_data[1])
@pytest.mark.parametrize('flagger', TESTFLAGGERS)
def test_flagSpikes_Basic(spiky_data, flagger):
data = spiky_data[0]
......@@ -66,5 +76,3 @@ def test_flagSpikes_Basic(spiky_data, flagger):
flag_result = getPandasData(flag_result, 0)
test_sum = (flag_result[spiky_data[1]] == flagger.BAD).sum()
assert test_sum == len(spiky_data[1])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment