diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index 699f91bef47e62591c20a2f71e88965e2f1cd050..d23bb34ed9b0f6c1e2a3264a4fab640269eb51ae 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -1049,34 +1049,34 @@ def flagDriftFromReference(data, field, flagger, fields, segment_freq, thresh, return data, flagger -#@numba.jit(parallel=True) -def _slidingWindowSearch(data_arr, bwd_start, fwd_end, stat_func, thresh_func, num_val): +@numba.jit(parallel=True) +def _slidingWindowSearch(data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, num_val): stat_arr = np.zeros(num_val) thresh_arr = np.zeros(num_val) - for win_i in numba.prange(1, num_val): - x = data_arr[bwd_start[win_i - 1]:win_i] - y = data_arr[win_i:fwd_end[win_i - 1]] - stat_arr[win_i - 1] = stat_func(x, y) - thresh_arr[win_i - 1] = thresh_func(x, y) + for win_i in numba.prange(0, num_val-1): + x = data_arr[bwd_start[win_i]:split[win_i]] + y = data_arr[split[win_i]:fwd_end[win_i]] + stat_arr[win_i] = stat_func(x, y) + thresh_arr[win_i] = thresh_func(x, y) return stat_arr, thresh_arr -@numba.jit(nopython=True, parallel=True) -def _reduceCPCluster(stat_arr, thresh_arr, start, end, obj_func, num_val, out_arr): +def _reduceCPCluster(stat_arr, thresh_arr, start, end, obj_func, num_val): + out_arr = np.zeros(shape=num_val, dtype=bool) for win_i in numba.prange(0, num_val): s, e = start[win_i], end[win_i] x = stat_arr[s:e] y = thresh_arr[s:e] pos = s + obj_func(x, y) out_arr[pos] = True - return out_arr + @register(masking='field') def flagChangePoints(data, field, flagger, stat_func, thresh_func, bwd_window, min_periods_bwd, fwd_window=None, min_periods_fwd=None, closed='both', try_to_jit=True, - agg_range=None, reduce_func=lambda x, y: x.argmax(), **kwargs): + reduce_window=None, reduce_func=lambda x, y: x.argmax(), **kwargs): """ Function for change point detection based on sliding window search. @@ -1093,7 +1093,7 @@ def flagChangePoints(data, field, flagger, stat_func, thresh_func, bwd_window, m stat_func : Callable[numpy.array, numpy.array] A function that assigns a value to every twin window. Left window content will be passed to first variable, right window content will be passed to the second. - thresh_func : {float, Callable[numpy.array, numpy.array]} + thresh_func : Callable[numpy.array, numpy.array] A function that determines the value level, exceeding wich qualifies a timestamps stat func value as denoting a changepoint. bwd_window : str @@ -1107,6 +1107,19 @@ def flagChangePoints(data, field, flagger, stat_func, thresh_func, bwd_window, m Minimum number of periods that have to be present in a forward facing window, for a changepoint test to be performed. closed : {'right', 'left', 'both', 'neither'}, default 'both' + Determines the closure of the sliding windows. + reduce_window : {None, False, str}, default None + The sliding window search method is not an exact CP search method and usually there wont be + detected a single changepoint, but a "region" of change around a changepoint. + If `agg_range` is not False, for every window of size `agg_range`, there + will be selected the value with index `reduce_func(x, y)` and the others will be dropped. + If `reduce_window` is None, the reduction window size equals the + twin window size the changepoints have been detected with. + reduce_func : Callable[numpy.array, numpy.array], default lambda x, y: x.argmax() + A function that must return an index value upon input of two arrays x and y. + First input parameter will hold the result from the stat_func evaluation for every + reduction window. Second input parameter holds the result from the thresh_func evaluation. + The default reduction function just selects the value that maximizes the stat_func. Returns ------- @@ -1120,12 +1133,11 @@ def flagChangePoints(data, field, flagger, stat_func, thresh_func, bwd_window, m fwd_window = bwd_window if min_periods_fwd is None: min_periods_fwd = min_periods_bwd - if agg_range is None: - agg_range = f"{int(pd.Timedelta(bwd_window).total_seconds() + pd.Timedelta(fwd_window).total_seconds())}s" + if reduce_window is None: + reduce_window = f"{int(pd.Timedelta(bwd_window).total_seconds() + pd.Timedelta(fwd_window).total_seconds())}s" if try_to_jit: stat_func = numba.jit(stat_func) thresh_func = numba.jit(thresh_func) - reduce_func = numba.jit(reduce_func) indexer = FreqIndexer() indexer.index_array = data_ser.index.to_numpy(int) @@ -1140,17 +1152,23 @@ def flagChangePoints(data, field, flagger, stat_func, thresh_func, bwd_window, m fwd_start, fwd_end = indexer.get_window_bounds(var_len, min_periods_fwd, center, closed) fwd_start, fwd_end = np.roll(fwd_start, -1), np.roll(fwd_end, -1) + min_mask = ~((fwd_end - fwd_start <= min_periods_fwd) | (bwd_end - bwd_start <= min_periods_bwd)) + fwd_end = fwd_end[min_mask] + split = bwd_end[min_mask] + bwd_start = bwd_start[min_mask] + masked_index = data_ser.index[min_mask] + check_len = len(fwd_end) + data_arr = data_ser.values - stat_arr, thresh_arr = _slidingWindowSearch(data_arr, bwd_start, fwd_end, stat_func, thresh_func, var_len) + stat_arr, thresh_arr = _slidingWindowSearch(data_arr, bwd_start, fwd_end, split, stat_func, thresh_func, check_len) result_arr = stat_arr > thresh_arr - det_index = data_ser[result_arr].index + det_index = masked_index[result_arr] detected = pd.Series(True, index=det_index) - start, end = customRolling(detected, agg_range, count, closed='both', min_periods=1, center=True, index_only=True) - out_arr = np.zeros(shape=detected.shape[0], dtype=bool) - detected = _reduceCPCluster(stat_arr[result_arr], thresh_arr[result_arr], start, end, reduce_func, - detected.shape[0], out_arr) - - det_index = det_index[detected] + if reduce_window is not False: + start, end = customRolling(detected, reduce_window, count, closed='both', min_periods=1, center=True, index_only=True) + detected = _reduceCPCluster(stat_arr[result_arr], thresh_arr[result_arr], start, end, reduce_func, + detected.shape[0]) + det_index = det_index[detected] flagger = flagger.setFlags(field, loc=det_index, **kwargs) return data, flagger \ No newline at end of file diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index 4987dae119b784f0d2a3dfabd036d4281bc5c82c..fb6e4bffc995c08582db328a32a6a55fef996f5a 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -506,7 +506,10 @@ def customRolling(to_roll, winsz, func, roll_mask=None, min_periods=1, center=Fa if index_only: num_values = to_roll.shape[0] - return indexer.get_window_bounds(num_values, min_periods, center, closed) + if num_values == 0: + return np.array([]), np.array([]) + else: + return indexer.get_window_bounds(num_values, min_periods, center, closed) i_roller = i_roll.rolling(indexer, min_periods=min_periods,