diff --git a/saqc/funcs/spikes_detection.py b/saqc/funcs/spikes_detection.py index 3f5f714a5d09ed0d54bd34be226c1f15e7897dc3..6d766466b499eb56654121515f14a2d474ff49c3 100644 --- a/saqc/funcs/spikes_detection.py +++ b/saqc/funcs/spikes_detection.py @@ -695,7 +695,8 @@ def spikes_flagSpektrumBased( return data, flagger -def spikes_flagGrubbs(data, field, flagger, winsz, alpha=0.05, min_periods=8, **kwargs): +def spikes_flagGrubbs(data, field, flagger, winsz, alpha=0.05, min_periods=8, check_lagged=False, + **kwargs): """ The function flags values that are regarded outliers due to the grubbs test. @@ -722,25 +723,44 @@ def spikes_flagGrubbs(data, field, flagger, winsz, alpha=0.05, min_periods=8, ** min_periods : Integer The minimum number of values present in a testing interval for a grubbs test result to be accepted. Only makes sence in case "winsz" is an offset string. - - Returns - ------- + check_lagged: boolean, default False + If True, every value gets checked twice for being an outlier, ones in the initial rolling window and one more time + in a rolling window that is lagged by half the windows delimeter (winsz/2). Recommended for avoiding false + positives at the window edges. Only available when rolling with integer defined window size. """ data = data.copy() datcol = data[field] to_group = pd.DataFrame(data={'ts': datcol.index, 'data': datcol}) + to_flag = pd.Series(False, index=datcol.index) if isinstance(winsz, int): # period number defined test intervals grouper_series = pd.Series(data=np.arange(0, datcol.shape[0]), index=datcol.index) + grouper_series_lagged = grouper_series + (winsz/2) grouper_series = grouper_series.transform(lambda x: int(np.floor(x / winsz))) + grouper_series_lagged = grouper_series_lagged.transform(lambda x: int(np.floor(x / winsz))) partitions = to_group.groupby(grouper_series) + partitions_lagged = to_group.groupby(grouper_series_lagged) else: # offset defined test intervals: partitions = to_group.groupby(pd.Grouper(freq=winsz)) for _, partition in partitions: if partition.shape[0] > min_periods: - to_flag = smirnov_grubbs.two_sided_test_indices(partition['data'].values, alpha=alpha) - to_flag = partition['ts'].iloc[to_flag] - flagger = flagger.setFlags(field, loc=to_flag, **kwargs) - return data, flagger + detected = smirnov_grubbs.two_sided_test_indices(partition['data'].values, alpha=alpha) + detected = partition['ts'].iloc[detected] + to_flag[detected.index] = True + + if check_lagged & isinstance(winsz, int): + to_flag_lagged = pd.Series(False, index=datcol.index) + for _, partition in partitions_lagged: + if partition.shape[0] > min_periods: + detected = smirnov_grubbs.two_sided_test_indices(partition['data'].values, alpha=alpha) + detected = partition['ts'].iloc[detected] + if not detected.empty: + print(detected) + print('stop') + to_flag_lagged[detected.index] = True + to_flag = to_flag & to_flag_lagged + + flagger = flagger.setFlags(field, loc=to_flag, **kwargs) + return data, flagger \ No newline at end of file