diff --git a/saqc/funcs/breaks_detection.py b/saqc/funcs/breaks_detection.py index 3dd756860d25368cbe0c7456f14bd9195153e341..41cd5dbd920be8c20a7581e2eb39acd381f83c8f 100644 --- a/saqc/funcs/breaks_detection.py +++ b/saqc/funcs/breaks_detection.py @@ -3,11 +3,73 @@ import numpy as np import pandas as pd +import dios from scipy.signal import savgol_filter from saqc.core.register import register -from saqc.lib.tools import retrieveTrustworthyOriginal +from saqc.lib.tools import retrieveTrustworthyOriginal, detectDeviants + + +@register(masking='all') +def breaks_flagRegimeAnomaly(data, field, flagger, cluster_field, norm_spread, + metric=lambda x, y: np.abs(np.nanmean(x) - np.nanmean(y)), + norm_frac=0.5, **kwargs): + """ + A function to flag values belonging to an anomalous regimes of field. + + "Normality" is determined in terms of a maximum spreading distance, regimes must not exceed in respect + to a certain metric. + + In addition, only a range of regimes is considered "normal", if it models more then `norm_frac` percentage of + the valid samples in "field". + + Note, that you must detect the regime changepoints prior to calling this function. + + Note, that it is possible to perform hypothesis tests for regime equality by passing the metric + a function for p-value calculation. + + Parameters + ---------- + data : dios.DictOfSeries + A dictionary of pandas.Series, holding all the data. + field : str + The fieldname of the column, holding the data-to-be-flagged. + flagger : saqc.flagger + A flagger object, holding flags and additional Informations related to `data`. + cluster_field : str + The name of the column in data, holding the cluster labels for the samples in field. (has to be indexed + equal to field) + norm_spread : float + A threshold denoting the distance, members of the "normal" group must not exceed to each other (in terms of the + metric passed) to qualify their group as the "normal" group. + metric : Callable[[numpy.array, numpy.array], float], default lambda x, y: np.abs(np.nanmean(x) - np.nanmean(y)) + A metric function for calculating the dissimilarity between 2 regimes. Defaults to just the difference in mean. + norm_frac : float + Has to be in [0,1]. Determines the minimum percentage of samples, + the "normal" group has to comprise to be the normal group actually. + kwargs + + Returns + ------- + + data : dios.DictOfSeries + A dictionary of pandas.Series, holding all the data. + flagger : saqc.flagger + The flagger object, holding flags and additional informations related to `data`. + Flags values may have changed, relatively to the flagger input. + + """ + + clusterser = data[cluster_field] + cluster_num = clusterser.max() + 1 + cluster_dios = dios.DictOfSeries({i: data[field][clusterser == i] for i in range(cluster_num)}) + plateaus = detectDeviants(cluster_dios, metric, norm_spread, norm_frac, 'single', 'samples') + + for p in plateaus: + flagger = flagger.setFlags(data.iloc[:, p].index, **kwargs) + + return data, flagger @register(masking='field') diff --git a/saqc/funcs/modelling.py b/saqc/funcs/modelling.py index 5bdf5db0aa322f8cf5d28256ae2cc88b67ac62bd..53d82b5dfd0c3478bee1620e0d0dafa9c6b74454 100644 --- a/saqc/funcs/modelling.py +++ b/saqc/funcs/modelling.py @@ -429,7 +429,6 @@ def _reduceCPCluster(stat_arr, thresh_arr, start, end, obj_func, num_val): return out_arr - @register(masking='field') def modelling_clusterByChangePoints(data, field, flagger, stat_func, thresh_func, bwd_window, min_periods_bwd, fwd_window=None, min_periods_fwd=None, closed='both', try_to_jit=True, @@ -523,7 +522,8 @@ def modelling_clusterByChangePoints(data, field, flagger, stat_func, thresh_func det_index = masked_index[result_arr] detected = pd.Series(True, index=det_index) if reduce_window is not False: - start, end = customRolling(detected, reduce_window, count, closed='both', min_periods=1, center=True, index_only=True) + start, end = customRolling(detected, reduce_window, count, closed='both', min_periods=1, center=True, + index_only=True) detected = _reduceCPCluster(stat_arr[result_arr], thresh_arr[result_arr], start, end, reduce_func, detected.shape[0]) det_index = det_index[detected] @@ -532,5 +532,5 @@ def modelling_clusterByChangePoints(data, field, flagger, stat_func, thresh_func cluster[det_index] = True cluster = cluster.cumsum() data[field] = cluster - flagger = flagger.setFlags(field, flag=flagger.UNFLAGGED) + flagger = flagger.setFlags(field, flag=flagger.UNFLAGGED, force=True, **kwargs) return data, flagger \ No newline at end of file diff --git a/saqc/funcs/proc_functions.py b/saqc/funcs/proc_functions.py index 78e89f37ad325e6358ac5a645ae40e3e6f9dced2..e7885aa37cf59cd8bf7617c279309b192121dc9b 100644 --- a/saqc/funcs/proc_functions.py +++ b/saqc/funcs/proc_functions.py @@ -970,22 +970,6 @@ def proc_seefoExpDriftCorrecture(data, field, flagger, maint_data_field, cal_mea return data, flagger -@register(masking='all') -def proc_flagRegimeAnomaly(data, field, flagger, cluster_field, norm_spread, - metric=lambda x, y: np.abs(np.nanmean(x) - np.nanmean(y)), - norm_frac=0.5, **kwargs): - - clusterser = data[cluster_field] - cluster_num = clusterser.max() + 1 - cluster_dios = dios.DictOfSeries({i: data[field][clusterser == i] for i in range(cluster_num)}) - plateaus = detectDeviants(cluster_dios, metric, norm_spread, norm_frac, 'single', 'samples') - - for p in plateaus: - flagger = flagger.setFlags(data.iloc[:, p].index, **kwargs) - - return data, flagger - - @register def proc_seefoLinearDriftCorrecture(data, field, flagger, x_field, y_field, **kwargs): """ diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index daf91464f28933d88679d4c6f9d831ebe0ed2961..350d935b7b38dad1fa5ae87237760566fac122bf 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -528,13 +528,39 @@ def customRolling(to_roll, winsz, func, roll_mask=None, min_periods=1, center=Fa def detectDeviants(data, metric, norm_spread, norm_frac, linkage_method='single', population='variables'): - """Helper function for carrying out the repeatedly upcoming task, - to detect variables that significantly differ from the 'Norm'. + """ + Helper function for carrying out the repeatedly upcoming task, + of detecting variables a group of variables. "Normality" is determined in terms of a maximum spreading distance, that members of a normal group must not exceed. In addition, only a group is considered "normal" if it contains more then `norm_frac` percent of the variables in "fields". + Note, that the function also can be used to detect anormal regimes in a variable by assigning the different regimes + dios.DictOfSeries columns and passing this dios. + + Parameters + ---------- + data : {pandas.DataFrame, dios.DictOfSeries} + Input data + metric : Callable[[numpy.array, numpy.array], float] + A metric function that for calculating the dissimilarity between 2 variables. + norm_spread : float + A threshold denoting the distance, members of the "normal" group must not exceed to each other (in terms of the + metric passed) to qualify their group as the "normal" group. + norm_frac : float, default 0.5 + Has to be in [0,1]. Determines the minimum percentage of variables or samples, + the "normal" group has to comprise to be the normal group actually. + linkage_method : {"single", "complete", "average", "weighted", "centroid", "median", "ward"}, default "single" + The linkage method used for hierarchical (agglomerative) clustering of the variables. + population : {"variables", "samples"} + Wheather to relate the minimum percentage of values needed to form a normal group, to the total number of + variables or to the total number of samples. + + Returns + ------- + deviants : List + A list containing the the column positions of deviant variables in the input frame/dios. """ var_num = len(data.columns)