diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index db0e5f7348dce9ce0a1d97e92f9cbd6f1d2c6b83..63c30a3be29213abe57002bbf04e17219c026b52 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -15,7 +15,7 @@ from mlxtend.evaluate import permutation_test from scipy.cluster.hierarchy import linkage, fcluster -from saqc.lib.tools import groupConsecutives, sesonalMask +from saqc.lib.tools import groupConsecutives, sesonalMask, FreqIndexer, customRolling from saqc.core.register import register from saqc.core.visitor import ENVIRONMENT @@ -1045,4 +1045,62 @@ def flagDriftFromReference(data, field, flagger, fields, segment_freq, thresh, if dist > thresh: flagger = flagger.setFlags(fields[i], loc=segment[1].index, **kwargs) + return data, flagger + +@register(masking='field') +def flagChangePoints(data, field, flagger, stat_func, thresh_func, bwd_window, min_periods_bwd, + fwd_window=None, min_periods_fwd=None, closed='both'): + """ + Function for change point detection based on sliding window search. + + The function provides general basic architecture for applying two-sided t-test, + max-likelyhood modelling or piecewise regression modelling in order to detect changepoints + via a sliding "twin window" search. + + See examples in the examples section to get an idea of the interface and functionality. + + Parameters + ---------- + data : dios.DictOfSeries + A dictionary of pandas.Series, holding all the data. + field : str + The reference variable, the deviation from wich determines the flagging. + flagger : saqc.flagger + A flagger object, holding flags and additional informations related to `data`. + stat_func : {Callable[numpy.array], Callable[numpy.array, numpy.array]} + thresh_func : {float, Callable[numpy.array, numpy.array]} + bwd_window : str + min_periods_bwd : {str, int} + fwd_window : {None, str}, default None + min_periods_fwd : {None, str, int}, default None + closed : {'right', 'left', 'both', 'neither'}, default 'both' + + Returns + ------- + + """ + + data_ser = data[field] + center = False + var_len = data_ser.shape[0] + FI = FreqIndexer() + FI.index_array=data_ser.index.to_numpy(int) + FI.win_points = np.array([True]*var_len) + FI.window_size = int(pd.Timedelta(bwd_window).total_seconds() * 10 ** 9) + FI.forward = False + bwd_start, bwd_end = FI.get_window_bounds(var_len, min_periods_bwd, center, closed) + + FI.window_size = int(pd.Timedelta(fwd_window).total_seconds() * 10 ** 9) + FI.forward = True + fwd_start, fwd_end = FI.get_window_bounds(var_len, min_periods_fwd, center, closed) + fwd_start, fwd_end = np.roll(fwd_start, -1), np.roll(fwd_end, -1) + + data_arr = data_ser.values + result_arr = np.zeros(len(data_arr) - 1) + for win_i in range(len(data_arr) - 1): + x = data_arr[bwd_start[win_i]:bwd_end[win_i]] + y = data_arr[fwd_start[win_i]:fwd_end[win_i]] + result_arr[win_i] = stat_func(x, y) > thresh_func(x, y) + + flagger = flagger.setFlags(field, loc=result_arr[result_arr]) return data, flagger \ No newline at end of file