diff --git a/saqc/funcs/spikes_detection.py b/saqc/funcs/spikes_detection.py index fbfb4289be2ae51c46ced5e6017526961da18ed4..65ddbd58cbebe3010af966b3a3bf9a9346bd969d 100644 --- a/saqc/funcs/spikes_detection.py +++ b/saqc/funcs/spikes_detection.py @@ -16,7 +16,8 @@ from saqc.lib.tools import ( offset2seconds, slidingWindowIndices, findIndex, - toSequence + toSequence, + customRolling ) from outliers import smirnov_grubbs @@ -870,7 +871,7 @@ def spikes_flagMad(data, field, flagger, window, z=3.5, **kwargs): @register(masking='field') -def spikes_flagBasic(data, field, flagger, thresh=7, tolerance=0, window="15min", **kwargs): +def spikes_flagBasic(data, field, flagger, thresh, tolerance, window, numba_kickin=200000, **kwargs): """ A basic outlier test that is designed to work for harmonized and not harmonized data. @@ -902,6 +903,11 @@ def spikes_flagBasic(data, field, flagger, thresh=7, tolerance=0, window="15min" Maximum difference between pre-spike and post-spike values. See condition (2) window : str, default '15min' Maximum length of "spiky" value courses. See condition (3) + numba_kickin : int, default 200000 + When there are detected more than `numba_kickin` incidents of potential spikes, + the pandas.rolling - part of computation gets "jitted" with numba. + Default value hast proven to be around the break even point between "jit-boost" and "jit-costs". + Returns ------- @@ -922,54 +928,45 @@ def spikes_flagBasic(data, field, flagger, thresh=7, tolerance=0, window="15min" dataseries = data[field].dropna() # get all the entries preceding a significant jump - pre_jumps = dataseries.diff(periods=-1).abs() > thresh - pre_jumps = pre_jumps[pre_jumps] - if pre_jumps.empty: + post_jumps = dataseries.diff().abs() > thresh + post_jumps = post_jumps[post_jumps] + if post_jumps.empty: return data, flagger # get all the entries preceeding a significant jump and its successors within "length" range - to_roll = pre_jumps.reindex(dataseries.index, method="ffill", tolerance=window, fill_value=False).dropna() + to_roll = post_jumps.reindex(dataseries.index, method="bfill", tolerance=window, fill_value=False).dropna() # define spike testing function to roll with: - def spike_tester(chunk, pre_jumps_index, thresh, tol): - if not chunk.index[-1] in pre_jumps_index: + def spike_tester(chunk, thresh=thresh, tol=tolerance): + # signum change!!! + chunk_stair = (np.abs(chunk - chunk[-1]) < thresh)[::-1].cumsum() + initial = np.searchsorted(chunk_stair, 2) + if initial == len(chunk): return 0 + if np.abs(chunk[- initial - 1] - chunk[-1]) < tol: + return initial - 1 else: - # signum change!!! - chunk_stair = (abs(chunk - chunk[-1]) < thresh)[::-1].cumsum() - first_return = chunk_stair[(chunk_stair == 2)] - if first_return.sum() == 0: - return 0 - if abs(chunk[first_return.index[0]] - chunk[-1]) < tol: - return (chunk_stair == 1).sum() - 1 - else: - return 0 + return 0 - # since .rolling does neither support windows, defined by left starting points, nor rolling over monotonically - # decreasing indices, we have to trick the method by inverting the timeseries and transforming the resulting index - # to pseudo-increase. to_roll = dataseries[to_roll] - original_index = to_roll.index - to_roll = to_roll[::-1] - pre_jump_reversed_index = to_roll.index[0] - pre_jumps.index - to_roll.index = to_roll.index[0] - to_roll.index - - # now lets roll: - to_roll = ( - to_roll.rolling(window, closed="both") - .apply(spike_tester, args=(pre_jump_reversed_index, thresh, tolerance), raw=False) - .astype(int) - ) - # reconstruct original index and sequence - to_roll = to_roll[::-1] - to_roll.index = original_index - to_write = to_roll[to_roll != 0] - to_flag = pd.Index([]) - # here comes a loop...): - for row in to_write.iteritems(): - loc = to_roll.index.get_loc(row[0]) - to_flag = to_flag.append(to_roll.iloc[loc + 1 : loc + row[1] + 1].index) - - to_flag = to_flag.drop_duplicates(keep="first") + roll_mask = pd.Series(False, index=to_roll.index) + roll_mask[post_jumps.index] = True + engine=None + if roll_mask.sum() > numba_kickin: + engine = 'numba' + result = customRolling(to_roll, window, spike_tester, roll_mask, closed='both', engine=engine) + group_col = np.nancumsum(result) + group_frame = pd.DataFrame({'group_col': group_col[:-1], + 'diff_col': np.diff(group_col).astype(int)}, + index=result.index[:-1]) + + groups = group_frame.groupby('group_col') + + def g_func(x): + r = np.array([False] * x.shape[0]) + r[-x[-1]:] = True + return r + + to_flag = groups['diff_col'].transform(g_func) flagger = flagger.setFlags(field, to_flag, **kwargs) return data, flagger diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index d8383172f5db724609445f0ad2b07ea55fae0a9d..8d21d8dcd95b2ebce770f2c329cdacb9211b5fa8 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -9,6 +9,8 @@ import numba as nb import pandas as pd import logging import dios +from pandas.api.indexers import BaseIndexer +from pandas._libs.window.indexers import calculate_variable_window_bounds # from saqc.flagger import BaseFlagger @@ -357,3 +359,94 @@ def mutateIndex(index, old_name, new_name): index = index.insert(pos, new_name) return index + +class FreqIndexer(BaseIndexer): + def get_window_bounds(self, num_values, min_periods, center, closed): + start, end = calculate_variable_window_bounds(num_values, self.window_size, min_periods, center, closed, + self.index_array) + end[~self.win_points] = 0 + start[~self.win_points] = 0 + return start, end + + +class PeriodsIndexer(BaseIndexer): + def get_window_bounds(self, num_values, min_periods, center, closed): + start_s = np.zeros(self.window_size, dtype="int64") + start_e = ( + np.arange(self.window_size, num_values, dtype="int64") + - self.window_size + + 1 + ) + start = np.concatenate([start_s, start_e])[:num_values] + + end_s = np.arange(self.window_size, dtype="int64") + 1 + end_e = start_e + self.window_size + end = np.concatenate([end_s, end_e])[:num_values] + start[~self.win_points] = 0 + end[~self.win_points] = 0 + return start, end + + +def customRolling(to_roll, winsz, func, roll_mask, min_periods=1, center=False, closed=None, raw=True, engine=None): + """ + A wrapper around pandas.rolling.apply(), that allows for skipping func application on + arbitrary selections of windows. + + Parameters + ---------- + to_roll : pandas.Series + Timeseries to be "rolled over". + winsz : {int, str} + Gets passed on to the window-size parameter of pandas.Rolling. + func : Callable + Function to be rolled with. + roll_mask : numpy.array[bool] + A mask, indicating the rolling windows, `func` shall be applied on. + Has to be of same length as `to_roll`. + roll_mask[i] = False indicates, that the window with right end point to_roll.index[i] shall + be skipped. + min_periods : int, default 1 + Gets passed on to the min_periods parameter of pandas.Rolling. + (Note, that rolling with freq string defined window size and `min_periods`=None, + results in nothing being computed due to some inconsistencies in the interplay of pandas.rolling and its + indexer.) + center : bool, default False + Gets passed on to the center parameter of pandas.Rolling. + closed : {None, 'left', 'right', 'both'}, default None + Gets passed on to the closed parameter of pandas.Rolling. + raw : bool, default True + Gets passed on to the raw parameter of pandas.Rolling.apply. + engine : {None, 'numba'}, default None + Gets passed on to the engine parameter of pandas.Rolling.apply. + + Returns + ------- + result : pandas.Series + The result of the rolling application. + + """ + i_roll = to_roll.copy() + i_roll.index = np.arange(to_roll.shape[0]) + if isinstance(winsz, str): + winsz = int(pd.Timedelta(winsz).total_seconds()*10**9) + indexer = FreqIndexer(window_size=winsz, + win_points=roll_mask, + index_array=to_roll.index.to_numpy(int), + center=center, + closed=closed) + + elif isinstance(winsz, int): + indexer = PeriodsIndexer(window_size=winsz, + win_points=roll_mask, + center=center, + closed=closed) + + i_roll = i_roll.rolling(indexer, + min_periods=min_periods, + center=center, + closed=closed).apply(func, raw=raw, engine=engine) + + return pd.Series(i_roll.values, index=to_roll.index) + + + diff --git a/test/funcs/test_functions.py b/test/funcs/test_functions.py index 711504822f0128a4d01bf41396594a83b21406af..17c646f63dde5d82b57f29558de6d30f2e13fb92 100644 --- a/test/funcs/test_functions.py +++ b/test/funcs/test_functions.py @@ -117,7 +117,7 @@ def test_flagIsolated(data, flagger): data, flagger_result = flagIsolated( data, field, flagger_result, group_window="2D", gap_window="2.1D", continuation_range="1.1D", - ) + )/home/luenensc assert flagger_result.isFlagged(field)[[3, 5, 13, 14]].all() diff --git a/test/funcs/test_spikes_detection.py b/test/funcs/test_spikes_detection.py index 7487cc4a4c45338f82cb396f67820e4fe4ab4a6d..cfdeb79b0a6a5f612f3b2c5a88cdd1e8fdaa61c6 100644 --- a/test/funcs/test_spikes_detection.py +++ b/test/funcs/test_spikes_detection.py @@ -76,7 +76,7 @@ def test_flagSpikesBasic(spiky_data, flagger): data = spiky_data[0] field, *_ = data.columns flagger = flagger.initFlags(data) - data, flagger_result = spikes_flagBasic(data, field, flagger, thresh=60, tolerance=10, window_size="20min") + data, flagger_result = spikes_flagBasic(data, field, flagger, thresh=60, tolerance=10, window="20min") flag_result = flagger_result.getFlags(field) test_sum = (flag_result[spiky_data[1]] == flagger.BAD).sum() assert test_sum == len(spiky_data[1])