diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index c8588f6407d488bb691110aae29d6891faa0de05..855e423ed580f32b92499972762220c44237bc72 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -16,7 +16,7 @@ from mlxtend.evaluate import permutation_test from scipy.cluster.hierarchy import linkage, fcluster -from saqc.lib.tools import groupConsecutives, seasonalMask, FreqIndexer, customRolling +from saqc.lib.tools import groupConsecutives, detectDeviants from saqc.lib.ts_operators import count from saqc.funcs.proc_functions import proc_fork, proc_drop, proc_projectFlags from saqc.funcs.modelling import modelling_mask @@ -860,6 +860,7 @@ def flagCrossScoring(data, field, flagger, fields, thresh, cross_stat='modZscore return data, flagger +@register(masking='all') def flagDriftFromNorm(data, field, flagger, fields, segment_freq, norm_spread, norm_frac=0.5, metric=lambda x, y: scipy.spatial.distance.pdist(np.array([x, y]), metric='cityblock')/len(x), @@ -950,34 +951,11 @@ def flagDriftFromNorm(data, field, flagger, fields, segment_freq, norm_spread, n data_to_flag = data[fields].to_df() data_to_flag.dropna(inplace=True) - var_num = len(fields) - dist_mat = np.zeros((var_num, var_num)) segments = data_to_flag.groupby(pd.Grouper(freq=segment_freq)) - for segment in segments: - combs = list(itertools.combinations(range(0, var_num), 2)) if segment[1].shape[0] <= 1: continue - for i, j in combs: - dist = metric(segment[1].iloc[:, i].values, segment[1].iloc[:, j].values) - dist_mat[i, j] = dist - - condensed = np.abs(dist_mat[tuple(zip(*combs))]) - Z = linkage(condensed, method=linkage_method) - cluster = fcluster(Z, norm_spread, criterion='distance') - counts = collections.Counter(cluster) - norm_cluster = -1 - - for item in counts.items(): - if item[1] > norm_frac*var_num: - norm_cluster = item[0] - break - - if norm_cluster == -1 or counts[norm_cluster] == var_num: - continue - - drifters = [i for i, x in enumerate(cluster) if x != norm_cluster] - + drifters = detectDeviants(data, metric, norm_spread, norm_frac, linkage_method) for var in drifters: flagger = flagger.setFlags(fields[var], loc=segment[1].index, **kwargs) diff --git a/saqc/funcs/modelling.py b/saqc/funcs/modelling.py index 0b7151e0d337654f3f8d8fc62b3e9a11369c57f7..5bdf5db0aa322f8cf5d28256ae2cc88b67ac62bd 100644 --- a/saqc/funcs/modelling.py +++ b/saqc/funcs/modelling.py @@ -528,7 +528,7 @@ def modelling_clusterByChangePoints(data, field, flagger, stat_func, thresh_func detected.shape[0]) det_index = det_index[detected] - cluster = pd.Series(False, index=data_ser.index) + cluster = pd.Series(False, index=data[field].index) cluster[det_index] = True cluster = cluster.cumsum() data[field] = cluster diff --git a/saqc/funcs/proc_functions.py b/saqc/funcs/proc_functions.py index ac3b19499ca458d918b334fa5fd8a89e6c58d4b4..3b09242742a980151ae324f1bc1c3fbac1b1e589 100644 --- a/saqc/funcs/proc_functions.py +++ b/saqc/funcs/proc_functions.py @@ -10,6 +10,7 @@ import dios import functools from scipy.optimize import curve_fit from sklearn.linear_model import LinearRegression +from sklearn.utils import resample ORIGINAL_SUFFIX = "_original" @@ -968,6 +969,12 @@ def proc_seefoExpDriftCorrecture(data, field, flagger, maint_data_field, cal_mea return data, flagger +@register(masking='all') +def proc_flagOffsets(data, field, flagger, stat, regime_cluster): + pass + + + @register def proc_seefoLinearDriftCorrecture(data, field, flagger, x_field, y_field, **kwargs): """ diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index fb6e4bffc995c08582db328a32a6a55fef996f5a..b0d58f17a1140cacd67a7eadf739312ae14fbc47 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -4,13 +4,16 @@ import re from typing import Sequence, Union, Any, Iterator +import itertools import numpy as np import numba as nb import pandas as pd import logging import dios +import collections from pandas.api.indexers import BaseIndexer from pandas._libs.window.indexers import calculate_variable_window_bounds +from scipy.cluster.hierarchy import linkage, fcluster # from saqc.flagger import BaseFlagger @@ -523,3 +526,38 @@ def customRolling(to_roll, winsz, func, roll_mask=None, min_periods=1, center=Fa return pd.Series(i_roll.values, index=to_roll.index) + +def detectDeviants(data, metric, norm_spread, norm_frac, linkage_method='single'): + """Helper function for carrying out the repeatedly upcoming task, + to detect variables that significantly differ from the 'Norm'. + + "Normality" is determined in terms of a maximum spreading distance, that members of a normal group must not exceed. + In addition, only a group is considered "normal" if it contains more then `norm_frac` percent of the + variables in "fields". + + + """ + var_num = len(data.columns) + dist_mat = np.zeros((var_num, var_num)) + combs = list(itertools.combinations(range(0, var_num), 2)) + for i, j in combs: + dist = metric(data.iloc[:, i].values, data.iloc[:, j].values) + dist_mat[i, j] = dist + + condensed = np.abs(dist_mat[tuple(zip(*combs))]) + Z = linkage(condensed, method=linkage_method) + cluster = fcluster(Z, norm_spread, criterion='distance') + counts = collections.Counter(cluster) + norm_cluster = -1 + + for item in counts.items(): + if item[1] > norm_frac * var_num: + norm_cluster = item[0] + break + + if norm_cluster == -1 or counts[norm_cluster] == var_num: + return [] + else: + return [i for i, x in enumerate(cluster) if x != norm_cluster] + +