diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index 91050977a7a231aea840dbd680e6990ab768b949..1bac19c57ca43dc6f00c91952ebb4e3070bcde5a 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -6,9 +6,14 @@ from inspect import signature import numpy as np import pandas as pd +import scipy import dtw import pywt +import itertools +import collections from mlxtend.evaluate import permutation_test +from scipy.cluster.hierarchy import linkage, fcluster + from saqc.lib.tools import groupConsecutives, sesonalMask @@ -853,3 +858,126 @@ def flagCrossScoring(data, field, flagger, fields, thresh, cross_stat='modZscore flagger = flagger.setFlags(var, mask[var], **kwargs) return data, flagger + +def flagDriftFromNorm(data, field, flagger, fields, segment_freq, norm_spread, norm_frac=0.5, + metric=lambda x, y: scipy.spatial.distance.pdist(np.array([x, y]), + metric='cityblock')/len(x), + linkage_method='single', **kwargs): + """ + The function flags value courses that significantly deviate from a group of normal value courses. + + "Normality" is determined in terms of a maximum spreading distance, that members of a normal group must not exceed. + In addition, only a group is considered "normal" if it contains more then `norm_frac` percent of the + variables in "fields". + + See the Notes section for a more detailed presentation of the algorithm + + Parameters + ---------- + data : dios.DictOfSeries + A dictionary of pandas.Series, holding all the data. + field : str + A dummy parameter. + flagger : saqc.flagger + A flagger object, holding flags and additional informations related to `data`. + fields : str + List of fieldnames in data, determining wich variables are to be included into the flagging process. + segment_freq : str + An offset string, determining the size of the seperate datachunks that the algorihm is to be piecewise + applied on. + norm_spread : float + A parameter limiting the maximum "spread" of the timeseries, allowed in the "normal" group. See Notes section + for more details. + norm_frac : float, default 0.5 + Has to be in [0,1]. Determines the minimum percentage of variables, the "normal" group has to comprise to be the + normal group actually. The higher that value, the more stable the algorithm will be with respect to false + positives. Also, nobody knows what happens, if this value is below 0.5. + metric : Callable[(numpyp.array, numpy-array), float] + A distance function. It should be a function of 2 1-dimensional arrays and return a float scalar value. + This value is interpreted as the distance of the two input arrays. The default is the averaged manhatten metric. + See the Notes section to get an idea of why this could be a good choice. + linkage_method : {"single", "complete", "average", "weghted", "centroid", "median", "ward"}, default "single" + The linkage method used for hierarchical (agglomerative) clustering of the timeseries. + See the Notes section for more details. + The keyword gets passed on to scipy.hierarchy.linkage. See its documentation to learn more about the different + keywords (References [1]). + See wikipedia for an introduction to hierarchical clustering (References [2]). + kwargs + + Returns + ------- + data : dios.DictOfSeries + A dictionary of pandas.Series, holding all the data. + flagger : saqc.flagger + The flagger object, holding flags and additional Informations related to `data`. + Flags values may have changed relatively to the input flagger. + + Notes + ----- + following steps are performed for every data "segment" of length `segment_freq` in order to find the + "abnormal" data: + + 1. Calculate the distances d(x_i,x_j) for all x_i in parameter `fields` and "d" denoting the distance function + passed to the parameter `metric`. + 2. Calculate a dendogram with a hierarchical linkage algorithm, specified by the parameter `linkage_method` + 3. Flatten the dendogram at the level, the agglomeration costs exceed the value given by the parameter `norm_spread` + 4. check if there is a cluster containing more than `norm_frac` percentage of the variables in fields. + if yes: flag all the variables that are not in that cluster (inside the segment) + if no: flag nothing + + The main parameter giving control over the algorithms behavior is the `norm_spread` parameter, that determines + the maximum spread of a normal group by limiting the costs, a cluster agglomeration must not exceed in every + linkage step. + For singleton clusters, that costs just equal half the distance, the timeseries in the clusters, have to + each other. So, no timeseries can be clustered together, that are more then + 2*`norm_spread` distanted from each other. + When timeseries get clustered together, this new clusters distance to all the other timeseries/clusters is + calculated according to the linkage method specified by `linkage_method`. By default, it is the minimum distance, + the members of the clusters have to each other. + Having that in mind, it is advisable to choose a distance function, that can be well interpreted in the units + dimension of the measurement and where the interpretation is invariant over the length of the timeseries. + That is, why, the "averaged manhatten metric" is set as the metric default, since it corresponds to the + averaged value distance, two timeseries have (as opposed by euclidean, for example). + + References + ---------- + Documentation of the underlying hierarchical clustering algorithm: + [1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.cluster.hierarchy.linkage.html + Introduction to Hierarchical clustering: + [2] https://en.wikipedia.org/wiki/Hierarchical_clustering + """ + + data_to_flag = data[fields].to_df() + data_to_flag.dropna(inplace=True) + var_num = len(fields) + dist_mat = np.zeros((var_num, var_num)) + segments = data_to_flag.groupby(pd.Grouper(freq=segment_freq)) + + for segment in segments: + combs = list(itertools.combinations(range(0, var_num), 2)) + if segment[1].shape[0] <= 1: + continue + for i, j in combs: + dist = metric(segment[1].iloc[:, i].values, segment[1].iloc[:, j].values) + dist_mat[i, j] = dist + + condensed = np.abs(dist_mat[tuple(zip(*combs))]) + Z = linkage(condensed, method=linkage_method) + cluster = fcluster(Z, norm_spread, criterion='distance') + counts = collections.Counter(cluster) + norm_cluster = -1 + + for item in counts.items(): + if item[1] > norm_frac*var_num: + norm_cluster = item[0] + break + + if norm_cluster == -1 or counts[norm_cluster] == var_num: + continue + + drifters = [i for i, x in enumerate(cluster) if x != norm_cluster] + + for var in drifters: + flagger = flagger.setFlags(fields[var], loc=segment[1].index, **kwargs) + + return data, flagger \ No newline at end of file diff --git a/test/funcs/conftest.py b/test/funcs/conftest.py index af99fd0df10b0169f72f574af355e23c811aaa21..32017f6b002e30cd3c3bd50ab74854d026b56d4e 100644 --- a/test/funcs/conftest.py +++ b/test/funcs/conftest.py @@ -30,6 +30,7 @@ def course_1(char_dict): peak_level=10, initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), char_dict=char_dict, + name='data' ): t_index = pd.date_range(initial_index, freq=freq, periods=periods) @@ -41,7 +42,7 @@ def course_1(char_dict): char_dict["drop"] = s.index[int(np.floor(len(t_index) / 2) + 1) :] char_dict["peak"] = s.index[int(np.floor(len(t_index) / 2)) - 1 : int(np.floor(len(t_index) / 2)) + 1] - data = DictOfSeries(data=s, columns=["data"]) + data = DictOfSeries(data=s, columns=[name]) return data, char_dict return fix_funk diff --git a/test/funcs/test_functions.py b/test/funcs/test_functions.py index 711504822f0128a4d01bf41396594a83b21406af..777a60425f64d7e3414fc4605404d8fcb0de4ce2 100644 --- a/test/funcs/test_functions.py +++ b/test/funcs/test_functions.py @@ -219,3 +219,14 @@ def test_flagManual(data, flagger): chunk = isflagged.loc[i] assert (chunk == expected_value).all() last = curr + +@pytest.mark.parametrize("flagger", TESTFLAGGER) +@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_1")]) +def test_flagDriftFromNormal(dat, flagger): + data = dat(periods=200, peak_level=5, name='d1')[0] + data['d2'] = dat(periods=200, peak_level=10, name='d2')[0]['d2'] + data['d3'] = dat(periods=200, peak_level=100, name='d3')[0]['d3'] + flagger = flagger.initFlags(data) + data, flagger = flagDriftFromNorm(data, 'dummy', flagger, ['d1', 'd2', 'd3'], segment_freq="200min", + norm_spread=5) + assert flagger.isFlagged()['d3'].all() diff --git a/test/funcs/test_harm_funcs.py b/test/funcs/test_harm_funcs.py index 2717e3e3b9d2e100017f708fe6e0cbdd8576cfd9..d8825f9689c4c7108b53e9dc22772d203449ab04 100644 --- a/test/funcs/test_harm_funcs.py +++ b/test/funcs/test_harm_funcs.py @@ -48,16 +48,13 @@ def test_harmSingleVarIntermediateFlagging(data, flagger, reshaper): pre_data = data.copy() pre_flags = flagger.getFlags() freq = "15min" - assert len(data.columns) == 1 field = data.columns[0] - data, flagger = harm_linear2Grid(data, "data", flagger, freq) # flag something bad flagger = flagger.setFlags("data", loc=data[field].index[3:4]) data, flagger = harm_deharmonize(data, "data", flagger, method="inverse_" + reshaper) d = data[field] - if reshaper == "nagg": assert flagger.isFlagged(loc=d.index[3:7]).squeeze().all() assert (~flagger.isFlagged(loc=d.index[0:3]).squeeze()).all()