diff --git a/saqc/funcs/spikes_detection.py b/saqc/funcs/spikes_detection.py index 7fafc0ab932f805a8e12a255b4bd8307e4598a36..6bad83f8abf05fba4f734fafb6e44d5c0ade809b 100644 --- a/saqc/funcs/spikes_detection.py +++ b/saqc/funcs/spikes_detection.py @@ -10,6 +10,7 @@ from scipy.optimize import curve_fit from saqc.funcs.register import register import numpy.polynomial.polynomial as poly import numba +import dios import saqc.lib.ts_operators as ts_ops from saqc.lib.tools import ( retrieveTrustworthyOriginal, @@ -134,14 +135,45 @@ def _expFit(val_frame, scoring_method='kNNMaxGap', n_neighbors=10, iter_start=0. return val_frame.index[sorted_i[iter_index:]] -def _reduceMVflags(to_flag_index, val_frame): - return 0 + +def _reduceMVflags(val_frame, fields, flagger, to_flag_frame, reduction_range): + to_flag_frame[:] = False + to_flag_index = to_flag_frame.index + for var in fields: + for index in enumerate(to_flag_index): + index_slice = slice(index[1] - pd.Timedelta(reduction_range), + index[1] + pd.Timedelta(reduction_range)) + + #test_slice = val_frame[var][index_slice].drop(np.delete(to_flag_index, index[0]), errors='ignore') + test_slice = val_frame[var][index_slice].drop(to_flag_index, errors='ignore') + if not test_slice.empty: + x = (test_slice.index.values.astype(float)) + x_0 = x[0] + x = (x - x_0)/10**12 + polyfitted = poly.polyfit(y=test_slice.values, x=x, deg=2) + testval = poly.polyval((float(index[1].to_numpy()) - x_0)/10**12, polyfitted) + testval = val_frame[var][index[1]] - testval + resids = test_slice.values -poly.polyval(x, polyfitted) + med_resids = np.median(resids) + MAD = np.median(np.abs(resids - med_resids)) + crit_val = 0.6745*(abs(med_resids - testval)) / MAD + + + test_slice = dios.DictOfSeries(test_slice) + test_flags = flagger.initFlags(test_slice) + test_slice, test_flags = spikes_flagSlidingZscore(test_slice, var, test_flags, window=reduction_range, + offset='15min', count=1, + polydeg=1, z=3.5, method="modZ") + if test_flags.isFlagged(field=var)[index[1]]: + to_flag_frame.loc[index[1], var] = True + return to_flag_frame + @register() def spikes_flagMultivarScores(data, field, flagger, fields, trafo='normScale', alpha=0.05, n_neighbors=10, scoring_method='kNNMaxGap', iter_start=0.5, threshing='stray', expfit_binning='auto', stray_partition=None, stray_partition_min=0, - post_reduction=None, **kwargs): + post_reduction=None, reduction_range=None, **kwargs): trafo_list = trafo.split(',') if len(trafo_list) == 1: @@ -180,8 +212,14 @@ def spikes_flagMultivarScores(data, field, flagger, fields, trafo='normScale', a alpha=alpha, bin_frac=expfit_binning) + to_flag_frame = pd.DataFrame({var_name: True for var_name in fields}, index=to_flag_index) + if post_reduction: + to_flag_frame = _reduceMVflags(val_frame, fields, flagger, to_flag_frame, reduction_range) + for var in fields: - flagger = flagger.setFlags(var, to_flag_index, **kwargs) + to_flag_ind = to_flag_frame.loc[: ,var] + to_flag_ind = to_flag_ind[to_flag_ind].index + flagger = flagger.setFlags(var, to_flag_ind, **kwargs) return data, flagger