Skip to content
Snippets Groups Projects
Commit 246df649 authored by David Schäfer's avatar David Schäfer
Browse files

Merge branch 'develop' into seasonalMasker

parents 3eb79f2e 5970ee03
No related branches found
No related tags found
3 merge requests!193Release 1.4,!188Release 1.4,!85Seasonal and custom data masking
Pipeline #8058 passed with stage
in 16 minutes and 14 seconds
...@@ -6,9 +6,14 @@ from inspect import signature ...@@ -6,9 +6,14 @@ from inspect import signature
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import scipy
import dtw import dtw
import pywt import pywt
import itertools
import collections
from mlxtend.evaluate import permutation_test from mlxtend.evaluate import permutation_test
from scipy.cluster.hierarchy import linkage, fcluster
from saqc.lib.tools import groupConsecutives, seasonalMask from saqc.lib.tools import groupConsecutives, seasonalMask
from saqc.funcs.proc_functions import proc_fork, proc_drop, proc_projectFlags from saqc.funcs.proc_functions import proc_fork, proc_drop, proc_projectFlags
...@@ -852,3 +857,191 @@ def flagCrossScoring(data, field, flagger, fields, thresh, cross_stat='modZscore ...@@ -852,3 +857,191 @@ def flagCrossScoring(data, field, flagger, fields, thresh, cross_stat='modZscore
flagger = flagger.setFlags(var, mask[var], **kwargs) flagger = flagger.setFlags(var, mask[var], **kwargs)
return data, flagger return data, flagger
def flagDriftFromNorm(data, field, flagger, fields, segment_freq, norm_spread, norm_frac=0.5,
metric=lambda x, y: scipy.spatial.distance.pdist(np.array([x, y]),
metric='cityblock')/len(x),
linkage_method='single', **kwargs):
"""
The function flags value courses that significantly deviate from a group of normal value courses.
"Normality" is determined in terms of a maximum spreading distance, that members of a normal group must not exceed.
In addition, only a group is considered "normal" if it contains more then `norm_frac` percent of the
variables in "fields".
See the Notes section for a more detailed presentation of the algorithm
Parameters
----------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
field : str
A dummy parameter.
flagger : saqc.flagger
A flagger object, holding flags and additional informations related to `data`.
fields : str
List of fieldnames in data, determining wich variables are to be included into the flagging process.
segment_freq : str
An offset string, determining the size of the seperate datachunks that the algorihm is to be piecewise
applied on.
norm_spread : float
A parameter limiting the maximum "spread" of the timeseries, allowed in the "normal" group. See Notes section
for more details.
norm_frac : float, default 0.5
Has to be in [0,1]. Determines the minimum percentage of variables, the "normal" group has to comprise to be the
normal group actually. The higher that value, the more stable the algorithm will be with respect to false
positives. Also, nobody knows what happens, if this value is below 0.5.
metric : Callable[(numpyp.array, numpy-array), float]
A distance function. It should be a function of 2 1-dimensional arrays and return a float scalar value.
This value is interpreted as the distance of the two input arrays. The default is the averaged manhatten metric.
See the Notes section to get an idea of why this could be a good choice.
linkage_method : {"single", "complete", "average", "weighted", "centroid", "median", "ward"}, default "single"
The linkage method used for hierarchical (agglomerative) clustering of the timeseries.
See the Notes section for more details.
The keyword gets passed on to scipy.hierarchy.linkage. See its documentation to learn more about the different
keywords (References [1]).
See wikipedia for an introduction to hierarchical clustering (References [2]).
kwargs
Returns
-------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
flagger : saqc.flagger
The flagger object, holding flags and additional Informations related to `data`.
Flags values may have changed relatively to the input flagger.
Notes
-----
following steps are performed for every data "segment" of length `segment_freq` in order to find the
"abnormal" data:
1. Calculate the distances d(x_i,x_j) for all x_i in parameter `fields` and "d" denoting the distance function
passed to the parameter `metric`.
2. Calculate a dendogram with a hierarchical linkage algorithm, specified by the parameter `linkage_method`
3. Flatten the dendogram at the level, the agglomeration costs exceed the value given by the parameter `norm_spread`
4. check if there is a cluster containing more than `norm_frac` percentage of the variables in fields.
if yes: flag all the variables that are not in that cluster (inside the segment)
if no: flag nothing
The main parameter giving control over the algorithms behavior is the `norm_spread` parameter, that determines
the maximum spread of a normal group by limiting the costs, a cluster agglomeration must not exceed in every
linkage step.
For singleton clusters, that costs just equal half the distance, the timeseries in the clusters, have to
each other. So, no timeseries can be clustered together, that are more then
2*`norm_spread` distanted from each other.
When timeseries get clustered together, this new clusters distance to all the other timeseries/clusters is
calculated according to the linkage method specified by `linkage_method`. By default, it is the minimum distance,
the members of the clusters have to each other.
Having that in mind, it is advisable to choose a distance function, that can be well interpreted in the units
dimension of the measurement and where the interpretation is invariant over the length of the timeseries.
That is, why, the "averaged manhatten metric" is set as the metric default, since it corresponds to the
averaged value distance, two timeseries have (as opposed by euclidean, for example).
References
----------
Documentation of the underlying hierarchical clustering algorithm:
[1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.cluster.hierarchy.linkage.html
Introduction to Hierarchical clustering:
[2] https://en.wikipedia.org/wiki/Hierarchical_clustering
"""
data_to_flag = data[fields].to_df()
data_to_flag.dropna(inplace=True)
var_num = len(fields)
dist_mat = np.zeros((var_num, var_num))
segments = data_to_flag.groupby(pd.Grouper(freq=segment_freq))
for segment in segments:
combs = list(itertools.combinations(range(0, var_num), 2))
if segment[1].shape[0] <= 1:
continue
for i, j in combs:
dist = metric(segment[1].iloc[:, i].values, segment[1].iloc[:, j].values)
dist_mat[i, j] = dist
condensed = np.abs(dist_mat[tuple(zip(*combs))])
Z = linkage(condensed, method=linkage_method)
cluster = fcluster(Z, norm_spread, criterion='distance')
counts = collections.Counter(cluster)
norm_cluster = -1
for item in counts.items():
if item[1] > norm_frac*var_num:
norm_cluster = item[0]
break
if norm_cluster == -1 or counts[norm_cluster] == var_num:
continue
drifters = [i for i, x in enumerate(cluster) if x != norm_cluster]
for var in drifters:
flagger = flagger.setFlags(fields[var], loc=segment[1].index, **kwargs)
return data, flagger
def flagDriftFromReference(data, field, flagger, fields, segment_freq, thresh,
metric=lambda x, y: scipy.spatial.distance.pdist(np.array([x, y]),
metric='cityblock')/len(x),
**kwargs):
"""
The function flags value courses that deviate from a reference course by a margin exceeding a certain threshold.
The deviation is meassured by the distance function passed to parameter metric.
Parameters
----------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
field : str
The reference variable, the deviation from wich determines the flagging.
flagger : saqc.flagger
A flagger object, holding flags and additional informations related to `data`.
fields : str
List of fieldnames in data, determining wich variables are to be included into the flagging process.
segment_freq : str
An offset string, determining the size of the seperate datachunks that the algorihm is to be piecewise
applied on.
thresh : float
The threshod by wich normal variables can deviate from the reference variable at max.
metric : Callable[(numpyp.array, numpy-array), float]
A distance function. It should be a function of 2 1-dimensional arrays and return a float scalar value.
This value is interpreted as the distance of the two input arrays. The default is the averaged manhatten metric.
See the Notes section to get an idea of why this could be a good choice.
kwargs
Returns
-------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
flagger : saqc.flagger
The flagger object, holding flags and additional Informations related to `data`.
Flags values may have changed relatively to the input flagger.
Notes
-----
it is advisable to choose a distance function, that can be well interpreted in the units
dimension of the measurement and where the interpretation is invariant over the length of the timeseries.
That is, why, the "averaged manhatten metric" is set as the metric default, since it corresponds to the
averaged value distance, two timeseries have (as opposed by euclidean, for example).
"""
data_to_flag = data[fields].to_df()
data_to_flag.dropna(inplace=True)
if field not in fields:
fields.append(field)
var_num = len(fields)
segments = data_to_flag.groupby(pd.Grouper(freq=segment_freq))
for segment in segments:
if segment[1].shape[0] <= 1:
continue
for i in range(var_num):
dist = metric(segment[1].iloc[:, i].values, segment[1].loc[:, field].values)
if dist > thresh:
flagger = flagger.setFlags(fields[i], loc=segment[1].index, **kwargs)
return data, flagger
\ No newline at end of file
...@@ -30,6 +30,7 @@ def course_1(char_dict): ...@@ -30,6 +30,7 @@ def course_1(char_dict):
peak_level=10, peak_level=10,
initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0),
char_dict=char_dict, char_dict=char_dict,
name='data'
): ):
t_index = pd.date_range(initial_index, freq=freq, periods=periods) t_index = pd.date_range(initial_index, freq=freq, periods=periods)
...@@ -41,7 +42,7 @@ def course_1(char_dict): ...@@ -41,7 +42,7 @@ def course_1(char_dict):
char_dict["drop"] = s.index[int(np.floor(len(t_index) / 2) + 1) :] char_dict["drop"] = s.index[int(np.floor(len(t_index) / 2) + 1) :]
char_dict["peak"] = s.index[int(np.floor(len(t_index) / 2)) - 1 : int(np.floor(len(t_index) / 2)) + 1] char_dict["peak"] = s.index[int(np.floor(len(t_index) / 2)) - 1 : int(np.floor(len(t_index) / 2)) + 1]
data = DictOfSeries(data=s, columns=["data"]) data = DictOfSeries(data=s, columns=[name])
return data, char_dict return data, char_dict
return fix_funk return fix_funk
......
...@@ -117,7 +117,7 @@ def test_flagIsolated(data, flagger): ...@@ -117,7 +117,7 @@ def test_flagIsolated(data, flagger):
data, flagger_result = flagIsolated( data, flagger_result = flagIsolated(
data, field, flagger_result, group_window="2D", gap_window="2.1D", continuation_range="1.1D", data, field, flagger_result, group_window="2D", gap_window="2.1D", continuation_range="1.1D",
)/home/luenensc )
assert flagger_result.isFlagged(field)[[3, 5, 13, 14]].all() assert flagger_result.isFlagged(field)[[3, 5, 13, 14]].all()
...@@ -219,3 +219,18 @@ def test_flagManual(data, flagger): ...@@ -219,3 +219,18 @@ def test_flagManual(data, flagger):
chunk = isflagged.loc[i] chunk = isflagged.loc[i]
assert (chunk == expected_value).all() assert (chunk == expected_value).all()
last = curr last = curr
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_1")])
def test_flagDriftFromNormal(dat, flagger):
data = dat(periods=200, peak_level=5, name='d1')[0]
data['d2'] = dat(periods=200, peak_level=10, name='d2')[0]['d2']
data['d3'] = dat(periods=200, peak_level=100, name='d3')[0]['d3']
flagger = flagger.initFlags(data)
data_norm, flagger_norm = flagDriftFromNorm(data, 'dummy', flagger, ['d1', 'd2', 'd3'], segment_freq="200min",
norm_spread=5)
data_ref, flagger_ref = flagDriftFromReference(data, 'd1', flagger, ['d1', 'd2', 'd3'], segment_freq="3D",
thresh=20)
assert flagger_norm.isFlagged()['d3'].all()
assert flagger_ref.isFlagged()['d3'].all()
...@@ -48,16 +48,13 @@ def test_harmSingleVarIntermediateFlagging(data, flagger, reshaper): ...@@ -48,16 +48,13 @@ def test_harmSingleVarIntermediateFlagging(data, flagger, reshaper):
pre_data = data.copy() pre_data = data.copy()
pre_flags = flagger.getFlags() pre_flags = flagger.getFlags()
freq = "15min" freq = "15min"
assert len(data.columns) == 1 assert len(data.columns) == 1
field = data.columns[0] field = data.columns[0]
data, flagger = harm_linear2Grid(data, "data", flagger, freq) data, flagger = harm_linear2Grid(data, "data", flagger, freq)
# flag something bad # flag something bad
flagger = flagger.setFlags("data", loc=data[field].index[3:4]) flagger = flagger.setFlags("data", loc=data[field].index[3:4])
data, flagger = harm_deharmonize(data, "data", flagger, method="inverse_" + reshaper) data, flagger = harm_deharmonize(data, "data", flagger, method="inverse_" + reshaper)
d = data[field] d = data[field]
if reshaper == "nagg": if reshaper == "nagg":
assert flagger.isFlagged(loc=d.index[3:7]).squeeze().all() assert flagger.isFlagged(loc=d.index[3:7]).squeeze().all()
assert (~flagger.isFlagged(loc=d.index[0:3]).squeeze()).all() assert (~flagger.isFlagged(loc=d.index[0:3]).squeeze()).all()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment