From e94027fdfa94ba976b540da44e8be9e992aea656 Mon Sep 17 00:00:00 2001 From: Peter Luenenschloss <peter.luenenschloss@ufz.de> Date: Mon, 28 Sep 2020 08:48:09 +0200 Subject: [PATCH] CPD inner loop runs runs with parallel=True, refactored to match saqc formatting contributions --- saqc/funcs/functions.py | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index 88120b4c3..cffb380bd 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -1048,14 +1048,16 @@ def flagDriftFromReference(data, field, flagger, fields, segment_freq, thresh, return data, flagger -@numba.jit(nopython=True) -def _slidingWindowSearch(data_arr, bwd_start, fwd_end, stat_func, thresh_func, result_arr): - for win_i in range(1, len(data_arr)): +@numba.jit(nopython=True, parallel=True) +def _slidingWindowSearch(data_arr, bwd_start, fwd_end, stat_func, thresh_func, num_val): + stat_arr = np.zeros(num_val) + thresh_arr = np.zeros(num_val) + for win_i in numba.prange(1, len(data_arr)): x = data_arr[bwd_start[win_i - 1]:win_i] y = data_arr[win_i:fwd_end[win_i - 1]] - result_arr[win_i - 1] = stat_func(x, y) > thresh_func(x, y) - - return result_arr + stat_arr[win_i - 1] = stat_func(x, y) + thresh_arr[win_i - 1] = thresh_func(x, y) + return stat_arr, thresh_arr @register(masking='field') @@ -1098,20 +1100,20 @@ def flagChangePoints(data, field, flagger, stat_func, thresh_func, bwd_window, m stat_func = numba.jit(stat_func) thresh_func = numba.jit(thresh_func) - FI = FreqIndexer() - FI.index_array = data_ser.index.to_numpy(int) - FI.win_points = np.array([True]*var_len) - FI.window_size = int(pd.Timedelta(bwd_window).total_seconds() * 10 ** 9) - FI.forward = False - bwd_start, bwd_end = FI.get_window_bounds(var_len, min_periods_bwd, center, closed) + indexer = FreqIndexer() + indexer.index_array = data_ser.index.to_numpy(int) + indexer.win_points = np.array([True]*var_len) + indexer.window_size = int(pd.Timedelta(bwd_window).total_seconds() * 10 ** 9) + indexer.forward = False + bwd_start, bwd_end = indexer.get_window_bounds(var_len, min_periods_bwd, center, closed) - FI.window_size = int(pd.Timedelta(fwd_window).total_seconds() * 10 ** 9) - FI.forward = True - fwd_start, fwd_end = FI.get_window_bounds(var_len, min_periods_fwd, center, closed) + indexer.window_size = int(pd.Timedelta(fwd_window).total_seconds() * 10 ** 9) + indexer.forward = True + fwd_start, fwd_end = indexer.get_window_bounds(var_len, min_periods_fwd, center, closed) fwd_start, fwd_end = np.roll(fwd_start, -1), np.roll(fwd_end, -1) data_arr = data_ser.values - result_arr = np.array([False] * var_len, dtype=bool) - result_arr = _slidingWindowSearch(data_arr, bwd_start, fwd_end, stat_func, thresh_func, result_arr) + stat_arr, thresh_arr = _slidingWindowSearch(data_arr, bwd_start, fwd_end, stat_func, thresh_func, var_len) + flagger = flagger.setFlags(field, loc=result_arr) return data, flagger \ No newline at end of file -- GitLab