Skip to content
Snippets Groups Projects
Commit cf6e2c98 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

Merge branch 'DetectAndResetOffset' of https://git.ufz.de/rdm-software/saqc...

Merge branch 'DetectAndResetOffset' of https://git.ufz.de/rdm-software/saqc into DetectAndResetOffset
parents d7e4ff6c d1c14115
No related branches found
No related tags found
3 merge requests!193Release 1.4,!188Release 1.4,!138WIP: Detect and reset offset
Pipeline #10103 passed with stage
in 5 minutes and 40 seconds
......@@ -4,6 +4,7 @@
from functools import partial
from inspect import signature
import dios
import numpy as np
import pandas as pd
import scipy
......@@ -13,6 +14,7 @@ import itertools
import collections
import numba
from mlxtend.evaluate import permutation_test
from scipy import stats
from scipy.cluster.hierarchy import linkage, fcluster
......@@ -955,7 +957,7 @@ def flagDriftFromNorm(data, field, flagger, fields, segment_freq, norm_spread, n
for segment in segments:
if segment[1].shape[0] <= 1:
continue
drifters = detectDeviants(data, metric, norm_spread, norm_frac, linkage_method, 'variables')
drifters = detectDeviants(segment[1], metric, norm_spread, norm_frac, linkage_method, 'variables')
for var in drifters:
flagger = flagger.setFlags(fields[var], loc=segment[1].index, **kwargs)
......@@ -969,7 +971,7 @@ def flagDriftFromReference(data, field, flagger, fields, segment_freq, thresh,
"""
The function flags value courses that deviate from a reference course by a margin exceeding a certain threshold.
The deviation is meassured by the distance function passed to parameter metric.
The deviation is measured by the distance function passed to parameter metric.
Parameters
----------
......@@ -1027,3 +1029,103 @@ def flagDriftFromReference(data, field, flagger, fields, segment_freq, thresh,
return data, flagger
def flagDriftScale(data, field, flagger, fields_scale1, fields_scale2, segment_freq, norm_spread, norm_frac=0.5,
metric=lambda x, y: scipy.spatial.distance.pdist(np.array([x, y]),
metric='cityblock')/len(x),
linkage_method='single', **kwargs):
"""
The function transforms variables with different scales to one single scale and then flags value courses that
significantly deviate from a group of normal value courses. The scaling transformation is performed via linear
regression. The remaining steps are performed analogously to flagDriftFromNorm. The documentation of
flagDriftFromNorm gives a more detailed presentation of the remaining steps.
Parameters
----------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
field : str
A dummy parameter.
flagger : saqc.flagger
A flagger object, holding flags and additional informations related to `data`.
fields_scale1 : str
List of fieldnames in data to be included into the flagging process which are scaled according to scaling
scheme 1.
fields_scale2 : str
List of fieldnames in data to be included into the flagging process which are scaled according to scaling
scheme 2.
segment_freq : str
An offset string, determining the size of the seperate datachunks that the algorihm is to be piecewise
applied on.
norm_spread : float
A parameter limiting the maximum "spread" of the timeseries, allowed in the "normal" group. See Notes section
for more details.
norm_frac : float, default 0.5
Has to be in [0,1]. Determines the minimum percentage of variables, the "normal" group has to comprise to be the
normal group actually. The higher that value, the more stable the algorithm will be with respect to false
positives. Also, nobody knows what happens, if this value is below 0.5.
metric : Callable[(numpyp.array, numpy-array), float]
A distance function. It should be a function of 2 1-dimensional arrays and return a float scalar value.
This value is interpreted as the distance of the two input arrays. The default is the averaged manhatten metric.
See the Notes section to get an idea of why this could be a good choice.
linkage_method : {"single", "complete", "average", "weighted", "centroid", "median", "ward"}, default "single"
The linkage method used for hierarchical (agglomerative) clustering of the timeseries.
See the Notes section for more details.
The keyword gets passed on to scipy.hierarchy.linkage. See its documentation to learn more about the different
keywords (References [1]).
See wikipedia for an introduction to hierarchical clustering (References [2]).
kwargs
Returns
-------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
flagger : saqc.flagger
The flagger object, holding flags and additional Informations related to `data`.
Flags values may have changed relatively to the input flagger.
References
----------
Documentation of the underlying hierarchical clustering algorithm:
[1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.cluster.hierarchy.linkage.html
Introduction to Hierarchical clustering:
[2] https://en.wikipedia.org/wiki/Hierarchical_clustering
"""
fields = fields_scale1 + fields_scale2
data_to_flag = data[fields].to_df()
data_to_flag.dropna(inplace=True)
convert_slope = []
convert_intercept = []
for field1 in fields_scale1:
for field2 in fields_scale2:
slope, intercept, r_value, p_value, std_err = stats.linregress(data_to_flag[field1], data_to_flag[field2])
convert_slope.append(slope)
convert_intercept.append(intercept)
factor_slope = np.median(convert_slope)
factor_intercept = np.median(convert_intercept)
dat = dios.DictOfSeries()
for field1 in fields_scale1:
dat[field1] = factor_intercept + factor_slope * data_to_flag[field1]
for field2 in fields_scale2:
dat[field2] = data_to_flag[field2]
dat_to_flag = dat[fields].to_df()
segments = dat_to_flag.groupby(pd.Grouper(freq=segment_freq))
for segment in segments:
if segment[1].shape[0] <= 1:
continue
drifters = detectDeviants(segment[1], metric, norm_spread, norm_frac, linkage_method, 'variables')
for var in drifters:
flagger = flagger.setFlags(fields[var], loc=segment[1].index, **kwargs)
return data, flagger
\ No newline at end of file
......@@ -32,7 +32,7 @@ def test_flagRange(data, field, flagger):
assert (flagged == expected).all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
'''@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("method", ['wavelet', 'dtw'])
@pytest.mark.parametrize("pattern", [pytest.lazy_fixture("course_pattern_1"),
pytest.lazy_fixture("course_pattern_2"),] ,)
......@@ -53,7 +53,7 @@ def test_flagPattern(course_test, flagger, method, pattern):
flagger = flagger.initFlags(test_data)
data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="days", method=method)
assert flagger.isFlagged("data")[dict_pattern["pattern_2"]].all()
'''
......@@ -226,11 +226,18 @@ def test_flagDriftFromNormal(dat, flagger):
data = dat(periods=200, peak_level=5, name='d1')[0]
data['d2'] = dat(periods=200, peak_level=10, name='d2')[0]['d2']
data['d3'] = dat(periods=200, peak_level=100, name='d3')[0]['d3']
data['d4'] = 3 + 4 * data['d1']
data['d5'] = 3 + 4 * data['d1']
flagger = flagger.initFlags(data)
data_norm, flagger_norm = flagDriftFromNorm(data, 'dummy', flagger, ['d1', 'd2', 'd3'], segment_freq="200min",
norm_spread=5)
data_ref, flagger_ref = flagDriftFromReference(data, 'd1', flagger, ['d1', 'd2', 'd3'], segment_freq="3D",
thresh=20)
data_scale, flagger_scale = flagDriftScale(data, 'dummy', flagger, ['d1', 'd3'], ['d4', 'd5'], segment_freq="3D",
thresh=20, norm_spread=5)
assert flagger_norm.isFlagged()['d3'].all()
assert flagger_ref.isFlagged()['d3'].all()
assert flagger_scale.isFlagged()['d3'].all()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment