Skip to content
Snippets Groups Projects
Commit f12fb74c authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

flagging plateaus

parent be356a77
No related branches found
No related tags found
1 merge request!854flagging plateaus
......@@ -8,6 +8,7 @@ SPDX-License-Identifier: GPL-3.0-or-later
## Unreleased
[List of commits](https://git.ufz.de/rdm-software/saqc/-/compare/v2.6.0...develop)
### Added
- `flagPlateaus`: added function to search and flag anomalous value plateaus of certain temporal extension
### Changed
### Removed
### Fixed
......
......@@ -16,5 +16,6 @@ Basic Anomalies
~SaQC.flagRaise
~SaQC.flagConstants
~SaQC.flagByVariance
~SaQC.flagPlateau
......@@ -19,4 +19,5 @@ Univariate Outlier Detection
~SaQC.flagRange
~SaQC.flagLOF
~SaQC.flagZScore
~SaQC.flagPlateau
This diff is collapsed.
SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
SPDX-License-Identifier: GPL-3.0-or-later
\ No newline at end of file
......@@ -39,6 +39,37 @@ if TYPE_CHECKING:
from saqc import SaQC
def _stray(partition, min_periods, iter_start, alpha):
sample_size = len(partition)
flag_index = pd.Index([])
if len(partition) == 0 or sample_size < min_periods:
return flag_index
if isinstance(partition, np.ndarray):
idx = np.arange(len(partition))
else:
idx = partition.index
partition = partition.values
sorted_i = partition.argsort()
resids = partition[sorted_i]
gaps = np.append(0, np.diff(resids))
tail_size = int(max(min(np.floor(sample_size / 4), 50), 2))
tail_indices = np.arange(2, tail_size + 1)
i_start = int(max(np.floor(sample_size * iter_start), 1))
ghat = np.array([np.nan] * sample_size)
for i in range(i_start, sample_size):
ghat[i] = sum((tail_indices / (tail_size - 1)) * gaps[i - tail_indices + 1])
log_alpha = np.log(1 / alpha)
for iter_index in range(i_start, sample_size):
if gaps[iter_index] > log_alpha * ghat[iter_index]:
flag_index = idx[sorted_i[iter_index:]]
break
return flag_index
class OutliersMixin:
@staticmethod
def _validateLOF(algorithm, n, p, density):
......@@ -508,32 +539,8 @@ class OutliersMixin:
# calculate flags for every window
for _, partition in partitions:
sample_size = len(partition)
if partition.empty or sample_size < min_periods:
continue
sorted_i = partition.values.argsort()
resids = partition.values[sorted_i]
gaps = np.append(0, np.diff(resids))
tail_size = int(max(min(np.floor(sample_size / 4), 50), 2))
tail_indices = np.arange(2, tail_size + 1)
i_start = int(max(np.floor(sample_size * iter_start), 1) + 1)
ghat = np.array([np.nan] * sample_size)
for i in range(i_start - 1, sample_size):
ghat[i] = sum(
(tail_indices / (tail_size - 1)) * gaps[i - tail_indices + 1]
)
log_alpha = np.log(1 / alpha)
for iter_index in range(i_start - 1, sample_size):
if gaps[iter_index] > log_alpha * ghat[iter_index]:
index = partition.index[sorted_i[iter_index:]]
self._flags[index, field] = flag
break
index = _stray(partition, min_periods, iter_start, alpha)
self._flags[index, field] = flag
return self
......
This diff is collapsed.
......@@ -6,6 +6,9 @@
# -*- coding: utf-8 -*-
import os
import numpy as np
import pandas as pd
import pytest
......@@ -24,6 +27,69 @@ def field(data):
return data.columns[0]
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
def test_flagPlateau():
path = os.path.join(
os.path.abspath(""), "docs/resources/data/turbidity_plateaus.csv"
)
dat = pd.read_csv(path, parse_dates=[0], index_col=0)
dat = dat.interpolate("linear")
dat = dat.ffill().bfill()
qc = SaQC(dat)
qc = qc.flagPlateau(
"base3", min_length="10min", max_length="7d", granularity="20min"
)
anomalies = [
(0, 0),
(5313, 5540),
(10000, 10200),
(15000, 15500),
(17000, 17114),
(17790, 17810),
]
f = qc["base3"].flags.to_pandas().squeeze() > 0
for i in range(1, len(anomalies)):
a_slice = slice(anomalies[i][0], anomalies[i][1])
na_slice = slice(anomalies[i - 1][1], anomalies[i][0])
assert f.iloc[a_slice].all()
assert not (f.iloc[na_slice].any())
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
def test_flagPlateau_long():
path = os.path.join(
os.path.abspath(""), "docs/resources/data/turbidity_plateaus.csv"
)
dat = pd.read_csv(path, parse_dates=[0], index_col=0)
dat = dat.interpolate("linear")
dat = dat.ffill().bfill()
_long = np.append(dat.values, [dat.values] * 10)
dat = pd.Series(
_long,
index=pd.date_range("2000", freq="10min", periods=len(_long)),
name="base3",
)
qc = SaQC(dat)
qc = qc.flagPlateau(
"base3", min_length="10min", max_length="7d", granularity="20min"
)
anomalies = [
(0, 0),
(5313, 5540),
(10000, 10200),
(15000, 15500),
(17000, 17114),
(17790, 17810),
]
f = qc["base3"].flags.to_pandas().squeeze() > 0
for i in range(1, len(anomalies)):
a_slice = slice(anomalies[i][0], anomalies[i][1])
na_slice = slice(anomalies[i - 1][1], anomalies[i][0])
assert f.iloc[a_slice].all()
assert not (f.iloc[na_slice].any())
@pytest.mark.parametrize("plot", [True, False])
@pytest.mark.parametrize("normalize", [True, False])
def test_flagPattern_dtw(plot, normalize):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment