diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index d23bb34ed9b0f6c1e2a3264a4fab640269eb51ae..c8588f6407d488bb691110aae29d6891faa0de05 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -986,7 +986,7 @@ def flagDriftFromNorm(data, field, flagger, fields, segment_freq, norm_spread, n def flagDriftFromReference(data, field, flagger, fields, segment_freq, thresh, metric=lambda x, y: scipy.spatial.distance.pdist(np.array([x, y]), - metric='cityblock')/len(x), + metric='cityblock')/len(x), **kwargs): """ The function flags value courses that deviate from a reference course by a margin exceeding a certain threshold. @@ -1049,126 +1049,3 @@ def flagDriftFromReference(data, field, flagger, fields, segment_freq, thresh, return data, flagger -@numba.jit(parallel=True) -def _slidingWindowSearch(data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, num_val): - stat_arr = np.zeros(num_val) - thresh_arr = np.zeros(num_val) - for win_i in numba.prange(0, num_val-1): - x = data_arr[bwd_start[win_i]:split[win_i]] - y = data_arr[split[win_i]:fwd_end[win_i]] - stat_arr[win_i] = stat_func(x, y) - thresh_arr[win_i] = thresh_func(x, y) - return stat_arr, thresh_arr - - -def _reduceCPCluster(stat_arr, thresh_arr, start, end, obj_func, num_val): - out_arr = np.zeros(shape=num_val, dtype=bool) - for win_i in numba.prange(0, num_val): - s, e = start[win_i], end[win_i] - x = stat_arr[s:e] - y = thresh_arr[s:e] - pos = s + obj_func(x, y) - out_arr[pos] = True - return out_arr - - - -@register(masking='field') -def flagChangePoints(data, field, flagger, stat_func, thresh_func, bwd_window, min_periods_bwd, - fwd_window=None, min_periods_fwd=None, closed='both', try_to_jit=True, - reduce_window=None, reduce_func=lambda x, y: x.argmax(), **kwargs): - """ - Function for change point detection based on sliding window search. - - The function provides basic architecture for basic sliding window changepoint detection. - - Parameters - ---------- - data : dios.DictOfSeries - A dictionary of pandas.Series, holding all the data. - field : str - The reference variable, the deviation from wich determines the flagging. - flagger : saqc.flagger - A flagger object, holding flags and additional informations related to `data`. - stat_func : Callable[numpy.array, numpy.array] - A function that assigns a value to every twin window. Left window content will be passed to first variable, - right window content will be passed to the second. - thresh_func : Callable[numpy.array, numpy.array] - A function that determines the value level, exceeding wich qualifies a timestamps stat func value as denoting a - changepoint. - bwd_window : str - The left (backwards facing) windows temporal extension (freq-string). - min_periods_bwd : {str, int} - Minimum number of periods that have to be present in a backwards facing window, for a changepoint test to be - performed. - fwd_window : {None, str}, default None - The right (forward facing) windows temporal extension (freq-string). - min_periods_fwd : {None, str, int}, default None - Minimum number of periods that have to be present in a forward facing window, for a changepoint test to be - performed. - closed : {'right', 'left', 'both', 'neither'}, default 'both' - Determines the closure of the sliding windows. - reduce_window : {None, False, str}, default None - The sliding window search method is not an exact CP search method and usually there wont be - detected a single changepoint, but a "region" of change around a changepoint. - If `agg_range` is not False, for every window of size `agg_range`, there - will be selected the value with index `reduce_func(x, y)` and the others will be dropped. - If `reduce_window` is None, the reduction window size equals the - twin window size the changepoints have been detected with. - reduce_func : Callable[numpy.array, numpy.array], default lambda x, y: x.argmax() - A function that must return an index value upon input of two arrays x and y. - First input parameter will hold the result from the stat_func evaluation for every - reduction window. Second input parameter holds the result from the thresh_func evaluation. - The default reduction function just selects the value that maximizes the stat_func. - - Returns - ------- - - """ - - data_ser = data[field].dropna() - center = False - var_len = data_ser.shape[0] - if fwd_window is None: - fwd_window = bwd_window - if min_periods_fwd is None: - min_periods_fwd = min_periods_bwd - if reduce_window is None: - reduce_window = f"{int(pd.Timedelta(bwd_window).total_seconds() + pd.Timedelta(fwd_window).total_seconds())}s" - if try_to_jit: - stat_func = numba.jit(stat_func) - thresh_func = numba.jit(thresh_func) - - indexer = FreqIndexer() - indexer.index_array = data_ser.index.to_numpy(int) - indexer.win_points = np.array([True]*var_len) - indexer.window_size = int(pd.Timedelta(bwd_window).total_seconds() * 10 ** 9) - indexer.forward = False - indexer.center = False - bwd_start, bwd_end = indexer.get_window_bounds(var_len, min_periods_bwd, center, closed) - - indexer.window_size = int(pd.Timedelta(fwd_window).total_seconds() * 10 ** 9) - indexer.forward = True - fwd_start, fwd_end = indexer.get_window_bounds(var_len, min_periods_fwd, center, closed) - fwd_start, fwd_end = np.roll(fwd_start, -1), np.roll(fwd_end, -1) - - min_mask = ~((fwd_end - fwd_start <= min_periods_fwd) | (bwd_end - bwd_start <= min_periods_bwd)) - fwd_end = fwd_end[min_mask] - split = bwd_end[min_mask] - bwd_start = bwd_start[min_mask] - masked_index = data_ser.index[min_mask] - check_len = len(fwd_end) - - data_arr = data_ser.values - stat_arr, thresh_arr = _slidingWindowSearch(data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, check_len) - - result_arr = stat_arr > thresh_arr - det_index = masked_index[result_arr] - detected = pd.Series(True, index=det_index) - if reduce_window is not False: - start, end = customRolling(detected, reduce_window, count, closed='both', min_periods=1, center=True, index_only=True) - detected = _reduceCPCluster(stat_arr[result_arr], thresh_arr[result_arr], start, end, reduce_func, - detected.shape[0]) - det_index = det_index[detected] - flagger = flagger.setFlags(field, loc=det_index, **kwargs) - return data, flagger \ No newline at end of file diff --git a/saqc/funcs/modelling.py b/saqc/funcs/modelling.py index 2b3ceee8c6aaaa2925c696bbfeb00dbc42d17649..0b7151e0d337654f3f8d8fc62b3e9a11369c57f7 100644 --- a/saqc/funcs/modelling.py +++ b/saqc/funcs/modelling.py @@ -3,6 +3,7 @@ import pandas as pd import numpy as np +import numba from saqc.core.register import register from saqc.lib.ts_operators import ( polyRoller, @@ -10,8 +11,9 @@ from saqc.lib.ts_operators import ( polyRollerNumba, polyRollerNoMissingNumba, polyRollerIrregular, + count ) -from saqc.lib.tools import seasonalMask +from saqc.lib.tools import seasonalMask, customRolling, FreqIndexer @register(masking='field') @@ -401,4 +403,134 @@ def modelling_mask(data, field, flagger, mode, mask_var=None, season_start=None, data[field] = datcol flagger = flagger.setFlags(field, loc=datcol.index[to_mask], flag=flags_to_block, force=True) + return data, flagger + + +@numba.jit(parallel=True) +def _slidingWindowSearch(data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, num_val): + stat_arr = np.zeros(num_val) + thresh_arr = np.zeros(num_val) + for win_i in numba.prange(0, num_val-1): + x = data_arr[bwd_start[win_i]:split[win_i]] + y = data_arr[split[win_i]:fwd_end[win_i]] + stat_arr[win_i] = stat_func(x, y) + thresh_arr[win_i] = thresh_func(x, y) + return stat_arr, thresh_arr + + +def _reduceCPCluster(stat_arr, thresh_arr, start, end, obj_func, num_val): + out_arr = np.zeros(shape=num_val, dtype=bool) + for win_i in numba.prange(0, num_val): + s, e = start[win_i], end[win_i] + x = stat_arr[s:e] + y = thresh_arr[s:e] + pos = s + obj_func(x, y) + out_arr[pos] = True + return out_arr + + + +@register(masking='field') +def modelling_clusterByChangePoints(data, field, flagger, stat_func, thresh_func, bwd_window, min_periods_bwd, + fwd_window=None, min_periods_fwd=None, closed='both', try_to_jit=True, + reduce_window=None, reduce_func=lambda x, y: x.argmax(), **kwargs): + """ + Assigns label to the data, aiming to reflect continous regimes of the processes the data is assumed to be + generated by. + The regime change points detection is based on a sliding window search. + + Parameters + ---------- + data : dios.DictOfSeries + A dictionary of pandas.Series, holding all the data. + field : str + The reference variable, the deviation from wich determines the flagging. + flagger : saqc.flagger + A flagger object, holding flags and additional informations related to `data`. + stat_func : Callable[numpy.array, numpy.array] + A function that assigns a value to every twin window. Left window content will be passed to first variable, + right window content will be passed to the second. + thresh_func : Callable[numpy.array, numpy.array] + A function that determines the value level, exceeding wich qualifies a timestamps stat func value as denoting a + changepoint. + bwd_window : str + The left (backwards facing) windows temporal extension (freq-string). + min_periods_bwd : {str, int} + Minimum number of periods that have to be present in a backwards facing window, for a changepoint test to be + performed. + fwd_window : {None, str}, default None + The right (forward facing) windows temporal extension (freq-string). + min_periods_fwd : {None, str, int}, default None + Minimum number of periods that have to be present in a forward facing window, for a changepoint test to be + performed. + closed : {'right', 'left', 'both', 'neither'}, default 'both' + Determines the closure of the sliding windows. + reduce_window : {None, False, str}, default None + The sliding window search method is not an exact CP search method and usually there wont be + detected a single changepoint, but a "region" of change around a changepoint. + If `agg_range` is not False, for every window of size `agg_range`, there + will be selected the value with index `reduce_func(x, y)` and the others will be dropped. + If `reduce_window` is None, the reduction window size equals the + twin window size the changepoints have been detected with. + reduce_func : Callable[numpy.array, numpy.array], default lambda x, y: x.argmax() + A function that must return an index value upon input of two arrays x and y. + First input parameter will hold the result from the stat_func evaluation for every + reduction window. Second input parameter holds the result from the thresh_func evaluation. + The default reduction function just selects the value that maximizes the stat_func. + + Returns + ------- + + """ + + data_ser = data[field].dropna() + center = False + var_len = data_ser.shape[0] + if fwd_window is None: + fwd_window = bwd_window + if min_periods_fwd is None: + min_periods_fwd = min_periods_bwd + if reduce_window is None: + reduce_window = f"{int(pd.Timedelta(bwd_window).total_seconds() + pd.Timedelta(fwd_window).total_seconds())}s" + if try_to_jit: + stat_func = numba.jit(stat_func) + thresh_func = numba.jit(thresh_func) + + indexer = FreqIndexer() + indexer.index_array = data_ser.index.to_numpy(int) + indexer.win_points = np.array([True]*var_len) + indexer.window_size = int(pd.Timedelta(bwd_window).total_seconds() * 10 ** 9) + indexer.forward = False + indexer.center = False + bwd_start, bwd_end = indexer.get_window_bounds(var_len, min_periods_bwd, center, closed) + + indexer.window_size = int(pd.Timedelta(fwd_window).total_seconds() * 10 ** 9) + indexer.forward = True + fwd_start, fwd_end = indexer.get_window_bounds(var_len, min_periods_fwd, center, closed) + fwd_start, fwd_end = np.roll(fwd_start, -1), np.roll(fwd_end, -1) + + min_mask = ~((fwd_end - fwd_start <= min_periods_fwd) | (bwd_end - bwd_start <= min_periods_bwd)) + fwd_end = fwd_end[min_mask] + split = bwd_end[min_mask] + bwd_start = bwd_start[min_mask] + masked_index = data_ser.index[min_mask] + check_len = len(fwd_end) + + data_arr = data_ser.values + stat_arr, thresh_arr = _slidingWindowSearch(data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, check_len) + + result_arr = stat_arr > thresh_arr + det_index = masked_index[result_arr] + detected = pd.Series(True, index=det_index) + if reduce_window is not False: + start, end = customRolling(detected, reduce_window, count, closed='both', min_periods=1, center=True, index_only=True) + detected = _reduceCPCluster(stat_arr[result_arr], thresh_arr[result_arr], start, end, reduce_func, + detected.shape[0]) + det_index = det_index[detected] + + cluster = pd.Series(False, index=data_ser.index) + cluster[det_index] = True + cluster = cluster.cumsum() + data[field] = cluster + flagger = flagger.setFlags(field, flag=flagger.UNFLAGGED) return data, flagger \ No newline at end of file