From 9f3601d1b1f5fb45eccc2651787acbfa7afdfaf3 Mon Sep 17 00:00:00 2001 From: Juliane Geller <gellerj> Date: Thu, 28 May 2020 09:34:29 +0200 Subject: [PATCH 1/2] Added Pattern rec --- saqc/funcs/functions.py | 111 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index 9209e9ae8..16f31f5e0 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -3,6 +3,10 @@ import numpy as np import pandas as pd +import dtw +import pywt +from mlxtend.evaluate import permutation_test +import datetime from saqc.lib.tools import groupConsecutives, sesonalMask @@ -51,6 +55,113 @@ def flagRange(data, field, flagger, min, max, **kwargs): return data, flagger + + +@register() +def flagPattern(data, field, flagger, ref_datafield, sample_freq = '15 Min', method = 'dtw', min_distance = None, **kwargs): + # To Do: Bisher geht der Algo davon aus, dass sich das Pattern über einen Tag erstreckt -> noch erweitern auf > 1 Tag bzw. < 1 Tag (z.B. alle zwei Stunden ein Pattern) + + test_field = data[field].copy() + ref_field = data[ref_datafield].copy() + # Test : Referenzhut ist der 30. Juni 2019 + ref_field = ref_field['2019-06-30 11:00:00':'2019-06-30 18:00:00'] + + # Get start and end time from reference field + Start_date = ref_field.head(1).index.item() + Start_time = datetime.datetime.time(Start_date) + End_date = ref_field.tail(1).index.item() + End_time = datetime.datetime.time(End_date) + + # Count length of partition + partition_count = End_date.day - Start_date.day + 1 + partition_freq = "%d D" % partition_count + + # Harmonization + test_field = test_field.resample(sample_freq).first().interpolate('time') + ref_field = ref_field.resample(sample_freq).first().interpolate('time') + + + + + # Calculate Partition +# if not partition_freq: +# partition_freq = test_field.shape[0] + +# if isinstance(partition_freq, str): + + partitions = test_field.groupby(pd.Grouper(freq=partition_freq)) + +# else: +# grouper_series = pd.Series(data=np.arange(0, test_field.shape[0]), index=test_field.index) +# grouper_series = grouper_series.transform(lambda x: int(np.floor(x / partition_freq))) +# partitions = test_field.groupby(grouper_series) + + # Initialize the chosen pattern method + # DTW as standard + if (not method) or (method != 'dtw' and method != 'wavelet'): + method = 'dtw' + # Initializing DTW + if method == 'dtw': + # Set minimal distance + if not min_distance: + min_distance = 4.5 + # Initializing Wavelets + if method == 'wavelet': + # calculate reference wavelet transform + ref_field_wl = ref_field.values.ravel() + # Widths lambda as in Ann Maharaj + widths = [1, 2, 4, 8] + cwtmat_ref, freqs = pywt.cwt(ref_field_wl, widths, 'mexh') + # Square of matrix elements as Power sum of the matrix + wavepower_ref = np.power(cwtmat_ref, 2) + + + + flags = pd.Series(data=0, index=test_field.index) + # calculate flags for every partition + partition_min = 0 + # Set time frames for partition + if not Start_time: + Start_time = '00:00' + if not End_time: + End_time = '23:59' + for _, partition in partitions: + if partition.empty | (partition.shape[0] < partition_min): + continue + # Use only the given time frame + pattern = partition.between_time(Start_time, End_time) + # Choose method + if method == 'dtw': + distance = dtw.dtw(pattern, ref_field, open_end=True, distance_only=True).normalizedDistance + # Partition labeled as pattern by dtw + if distance < min_distance: + flags[partition.index] = 1 + elif method == 'wavelet': + # calculate reference wavelet transform + test_field_wl = pattern.values.ravel() + cwtmat_test, freqs = pywt.cwt(test_field_wl, widths, 'mexh') + # Square of matrix elements as Power sum of the matrix + wavepower_test = np.power(cwtmat_test, 2) + + # Permutation test on Powersum of matrix + p_value = [] + for i in range(len(widths)): + x = wavepower_ref[i] + y = wavepower_test[i] + pval = permutation_test(x, y, method='approximate', num_rounds=200, func=lambda x, y: x.sum() / y.sum(), + seed=0) + p_value.append(min(pval, 1 - pval)) + # Partition labeled as pattern by wavelet + if min(p_value) >= 0.01: + flags[partition.index] = 1 + + mask = (flags == 1) + + flagger = flagger.setFlags(field, mask, **kwargs) + return data, flagger + + + @register() def flagMissing(data, field, flagger, nodata=np.nan, **kwargs): datacol = data[field] -- GitLab From 6486433b200cb572e74755292951da90c0bf4ea3 Mon Sep 17 00:00:00 2001 From: gellerj <juliane.geller@ufz.de> Date: Mon, 6 Jul 2020 14:06:23 +0200 Subject: [PATCH 2/2] Pattern recognition wavelets/dtw --- dios | 2 +- requirements.txt | 4 + ressources/data/config.csv | 6 -- ressources/data/config_ci.csv | 7 -- saqc/funcs/breaks_detection.py | 2 +- saqc/funcs/functions.py | 114 +++++++++++----------------- saqc/funcs/spikes_detection.py | 4 +- test/funcs/conftest.py | 94 +++++++++++++++++++++++ test/funcs/test_functions.py | 37 +++++++++ test/funcs/test_spikes_detection.py | 5 +- 10 files changed, 188 insertions(+), 87 deletions(-) delete mode 100644 ressources/data/config.csv delete mode 100644 ressources/data/config_ci.csv diff --git a/dios b/dios index 8264b2540..ac9f446e7 160000 --- a/dios +++ b/dios @@ -1 +1 @@ -Subproject commit 8264b2540025709465b2db0c23a074d97e31182c +Subproject commit ac9f446e70ab06d3a6bd3b56a8b6789e56f2e861 diff --git a/requirements.txt b/requirements.txt index 654dadb61..c20b21134 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,14 @@ attrs==19.3.0 Click==7.0 cycler==0.10.0 +datetime==4.3 +dtw==1.4.0 importlib-metadata==1.5.0 joblib==0.14.1 kiwisolver==1.1.0 llvmlite==0.31.0 matplotlib==3.1.3 +mlxtend==0.17.2 more-itertools==8.2.0 numba==0.48.0 numpy==1.18.1 @@ -20,6 +23,7 @@ pytest==5.3.5 python-dateutil==2.8.1 python-intervals==1.10.0 pytz==2019.3 +PyWavelets==1.0.0 scikit-learn==0.22.1 scipy==1.4.1 six==1.14.0 diff --git a/ressources/data/config.csv b/ressources/data/config.csv deleted file mode 100644 index e1e00c54e..000000000 --- a/ressources/data/config.csv +++ /dev/null @@ -1,6 +0,0 @@ -varname ; test ; plot -#----------;-------------------------------------;------ -SM2 ; harm_shift2Grid(freq="15Min") ; False -SM2 ; flagMissing(nodata=NAN) ; False -'SM(1|2)+' ; flagRange(min=10, max=60) ; False -SM2 ; spikes_flagMad(window="30d", z=3.5) ; True diff --git a/ressources/data/config_ci.csv b/ressources/data/config_ci.csv deleted file mode 100644 index f613a9385..000000000 --- a/ressources/data/config_ci.csv +++ /dev/null @@ -1,7 +0,0 @@ -varname;test;plot -SM2;harm_shift2Grid(freq="15Min");False -SM1;flagRange(min=10, max=60);False -SM2;flagMissing(nodata=NAN);False -SM2;flagRange(min=10, max=60);False -SM2;spikes_flagMad(window="30d", z=3.5);False -Dummy;flagGeneric(func=(isflagged(SM1) | isflagged(SM2))) diff --git a/saqc/funcs/breaks_detection.py b/saqc/funcs/breaks_detection.py index e8b5d1a79..7bc7823da 100644 --- a/saqc/funcs/breaks_detection.py +++ b/saqc/funcs/breaks_detection.py @@ -27,7 +27,7 @@ def breaks_flagSpektrumBased( **kwargs ): - """ This Function is an generalization of the Spectrum based break flagging mechanism as presented in: + """ This Function is a generalization of the Spectrum based break flagging mechanism as presented in: Dorigo,W,.... Global Automated Quality Control of In Situ Soil Moisture Data from the international Soil Moisture Network. 2013. Vadoze Zone J. doi:10.2136/vzj2012.0097. diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index 16f31f5e0..2a33e2d59 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -58,91 +58,69 @@ def flagRange(data, field, flagger, min, max, **kwargs): @register() -def flagPattern(data, field, flagger, ref_datafield, sample_freq = '15 Min', method = 'dtw', min_distance = None, **kwargs): - # To Do: Bisher geht der Algo davon aus, dass sich das Pattern über einen Tag erstreckt -> noch erweitern auf > 1 Tag bzw. < 1 Tag (z.B. alle zwei Stunden ein Pattern) - - test_field = data[field].copy() - ref_field = data[ref_datafield].copy() - # Test : Referenzhut ist der 30. Juni 2019 - ref_field = ref_field['2019-06-30 11:00:00':'2019-06-30 18:00:00'] - - # Get start and end time from reference field - Start_date = ref_field.head(1).index.item() - Start_time = datetime.datetime.time(Start_date) - End_date = ref_field.tail(1).index.item() - End_time = datetime.datetime.time(End_date) - - # Count length of partition - partition_count = End_date.day - Start_date.day + 1 - partition_freq = "%d D" % partition_count - - # Harmonization - test_field = test_field.resample(sample_freq).first().interpolate('time') - ref_field = ref_field.resample(sample_freq).first().interpolate('time') - - - - - # Calculate Partition -# if not partition_freq: -# partition_freq = test_field.shape[0] - -# if isinstance(partition_freq, str): - - partitions = test_field.groupby(pd.Grouper(freq=partition_freq)) - -# else: -# grouper_series = pd.Series(data=np.arange(0, test_field.shape[0]), index=test_field.index) -# grouper_series = grouper_series.transform(lambda x: int(np.floor(x / partition_freq))) -# partitions = test_field.groupby(grouper_series) +def flagPattern(data, field, flagger, reference_field, method = 'dtw', partition_freq = "days", partition_offset = 0, max_distance = 0.03, normalized_distance = True, widths = [1,2,4,8], waveform = 'mexh', **kwargs): + + test = data[field].copy() + ref = data[reference_field].copy() + #ref = ref['2019-06-29 11:00:00':'2019-06-29 18:00:00'] + Pattern_start_date = ref.index[0] + Pattern_start_time = datetime.datetime.time(Pattern_start_date) + Pattern_end_date = ref.index[-1] + Pattern_end_time = datetime.datetime.time(Pattern_end_date) + + ### Extract partition frequency from pattern if needed + if not isinstance(partition_freq, str): + raise ValueError('Partition frequency has to be given in string format.') + elif partition_freq == "days" or partition_freq == "months": + # Get partition frequency from reference field + partition_count = (Pattern_end_date - Pattern_start_date).days + partitions = test.groupby(pd.Grouper(freq="%d D" % (partition_count + 1))) + else: + partitions = test.groupby(pd.Grouper(freq=partition_freq)) - # Initialize the chosen pattern method - # DTW as standard - if (not method) or (method != 'dtw' and method != 'wavelet'): - method = 'dtw' - # Initializing DTW - if method == 'dtw': - # Set minimal distance - if not min_distance: - min_distance = 4.5 # Initializing Wavelets if method == 'wavelet': # calculate reference wavelet transform - ref_field_wl = ref_field.values.ravel() + ref_wl = ref.values.ravel() # Widths lambda as in Ann Maharaj - widths = [1, 2, 4, 8] - cwtmat_ref, freqs = pywt.cwt(ref_field_wl, widths, 'mexh') + cwtmat_ref, freqs = pywt.cwt(ref_wl, widths, waveform) # Square of matrix elements as Power sum of the matrix wavepower_ref = np.power(cwtmat_ref, 2) + elif not method == 'dtw': + # No correct method given + raise ValueError('Unable to interpret {} as method.'.format(method)) - - - flags = pd.Series(data=0, index=test_field.index) - # calculate flags for every partition - partition_min = 0 - # Set time frames for partition - if not Start_time: - Start_time = '00:00' - if not End_time: - End_time = '23:59' + flags = pd.Series(data=0, index=test.index) + ### Calculate flags for every partition + partition_min = ref.shape[0] for _, partition in partitions: - if partition.empty | (partition.shape[0] < partition_min): + # Ensuring that partition is at least as long as reference pattern + if partition.empty or (partition.shape[0] < partition_min): continue - # Use only the given time frame - pattern = partition.between_time(Start_time, End_time) - # Choose method + if partition_freq == "days" or partition_freq == "months": + # Use only the time frame given by the pattern + test = partition.between_time(Pattern_start_time, Pattern_end_time) + mask = (partition.index >= test.index[0]) & (partition.index <= test.index[-1]) + test = partition.loc[mask] + else: + # cut partition according to pattern and offset + start_time = pd.Timedelta(partition_offset) + partition.index[0] + end_time = start_time + pd.Timedelta(Pattern_end_date - Pattern_start_date) + test = partition[start_time:end_time] + ### Switch method if method == 'dtw': - distance = dtw.dtw(pattern, ref_field, open_end=True, distance_only=True).normalizedDistance + distance = dtw.dtw(test, ref, open_end=True, distance_only=True).normalizedDistance + if normalized_distance: + distance = distance/abs(ref.mean()) # Partition labeled as pattern by dtw - if distance < min_distance: + if distance < max_distance: flags[partition.index] = 1 elif method == 'wavelet': # calculate reference wavelet transform - test_field_wl = pattern.values.ravel() - cwtmat_test, freqs = pywt.cwt(test_field_wl, widths, 'mexh') + test_wl = test.values.ravel() + cwtmat_test, freqs = pywt.cwt(test_wl, widths, 'mexh') # Square of matrix elements as Power sum of the matrix wavepower_test = np.power(cwtmat_test, 2) - # Permutation test on Powersum of matrix p_value = [] for i in range(len(widths)): diff --git a/saqc/funcs/spikes_detection.py b/saqc/funcs/spikes_detection.py index 2e4c12c39..3cf93fdae 100644 --- a/saqc/funcs/spikes_detection.py +++ b/saqc/funcs/spikes_detection.py @@ -126,8 +126,8 @@ def _expFit(val_frame, scoring_method='kNNMaxGap', n_neighbors=10, iter_start=0. # fitting lambdA, _ = curve_fit(fit_function, xdata=binzenters[upper_tail_index:iter_max_bin_index], - ydata=upper_tail_hist, - p0=[-np.log(alpha/resids[iter_index])]) + ydata=upper_tail_hist, + p0=[-np.log(alpha / resids[iter_index])]) crit_val = neg_log_alpha / lambdA test_val = resids[iter_index] diff --git a/test/funcs/conftest.py b/test/funcs/conftest.py index 933e4539d..d913a2944 100644 --- a/test/funcs/conftest.py +++ b/test/funcs/conftest.py @@ -13,6 +13,7 @@ def char_dict(): "return": pd.DatetimeIndex([])} + @pytest.fixture def course_1(char_dict): # MONOTONOUSLY ASCENDING/DESCENDING @@ -65,6 +66,99 @@ def course_2(char_dict): return fix_funk +@pytest.fixture +def course_2(char_dict): + # SINGLE_SPIKE + # values , that linearly develop over the whole timeseries, from "initial_level" to "final_level", exhibiting + # one "anomalous" or "outlierish" value of magnitude "out_val" at position "periods/2" + # number of periods better be even! + def fix_funk(freq='10min', periods=10, initial_level=0, final_level=0, out_val=5, + initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), char_dict=char_dict): + + t_index = pd.date_range(initial_index, freq=freq, periods=periods) + data = np.linspace(initial_level, final_level, int(np.floor(len(t_index)))) + + data = pd.Series(data=data, index=t_index) + data.iloc[int(np.floor(periods / 2))] = out_val + + if out_val > data.iloc[int(np.floor(periods / 2) - 1)]: + kind = 'raise' + else: + kind = 'drop' + + char_dict[kind] = data.index[int(np.floor(periods / 2))] + char_dict['return'] = data.index[int(np.floor(len(t_index) / 2)) + 1] + + data = DictOfSeries(data=data, columns=['data']) + return data, char_dict + + return fix_funk + + + +@pytest.fixture +def course_pattern_1(char_dict): + # Test spike pattern + def fix_funk(freq='10min', + initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), out_val=5, char_dict=char_dict): + + t_index = pd.date_range(initial_index, freq=freq, periods=6) + #data = np.linspace(initial_level, final_level, int(np.floor(len(t_index)))) + + data = pd.Series(data=0, index=t_index) + data.iloc[2] = out_val + data.iloc[3] = out_val + char_dict['pattern_1'] = data.index + + data = DictOfSeries(data=data, columns=['pattern_1']) + return data, char_dict + + return fix_funk + + + +@pytest.fixture +def course_pattern_2(char_dict): + # Test spike pattern + def fix_funk(freq='1 H', + initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), out_val=5, char_dict=char_dict): + + t_index = pd.date_range(initial_index, freq=freq, periods=4) + #data = np.linspace(initial_level, final_level, int(np.floor(len(t_index)))) + + data = pd.Series(data=0, index=t_index) + data.iloc[2] = out_val + data.iloc[3] = out_val + char_dict['pattern_2'] = data.index + + data = DictOfSeries(data=data, columns=['pattern_2']) + return data, char_dict + + return fix_funk + + + + +@pytest.fixture +def course_test(char_dict): + # Test function for pattern detection - same as test pattern for first three values, than constant function + def fix_funk(freq='1 D', + initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), out_val=5, char_dict=char_dict): + + t_index = pd.date_range(initial_index, freq=freq, periods=100) + #data = np.linspace(initial_level, final_level, int(np.floor(len(t_index)))) + + data = pd.Series(data=0, index=t_index) + data.iloc[2] = out_val + data.iloc[3] = out_val + + + data = DictOfSeries(data=data, columns=['data']) + return data, char_dict + + return fix_funk + + @pytest.fixture def course_3(char_dict): diff --git a/test/funcs/test_functions.py b/test/funcs/test_functions.py index 429e8ea36..c34d7ca6d 100644 --- a/test/funcs/test_functions.py +++ b/test/funcs/test_functions.py @@ -4,15 +4,22 @@ import pytest import numpy as np + from saqc.funcs.functions import ( flagRange, flagSesonalRange, forceFlags, clearFlags, flagIsolated, + flagPattern ) + from test.common import initData, TESTFLAGGER +import pandas as pd +import dios + + @pytest.fixture def data(): @@ -34,6 +41,36 @@ def test_flagRange(data, field, flagger): assert (flagged == expected).all() +@pytest.mark.parametrize("flagger", TESTFLAGGER) +@pytest.mark.parametrize("method", ['wavelet', 'dtw']) +@pytest.mark.parametrize("pattern", [pytest.lazy_fixture("course_pattern_1"), + pytest.lazy_fixture("course_pattern_2"),] ,) + # pytest.lazy_fixture("course_3"), + # pytest.lazy_fixture("course_4"), ], + # ) +def test_flagPattern(course_test, flagger, method, pattern): + pattern_data, dict_pattern = pattern() + + # testing the same pattern sampled at different frequencies + if pattern_data.columns == "pattern1": + test_data, *_ = course_test(freq="10 min") + test_data['pattern_data'] = pattern_data.to_df() + flagger = flagger.initFlags(test_data) + data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="1 H", method=method) + assert flagger.isFlagged("data")[dict_pattern["pattern_1"]].all() + if pattern_data.columns == "pattern2": + test_data, *_ = course_test(freq="1 H") + test_data['pattern_data'] = pattern_data.to_df() + flagger = flagger.initFlags(test_data) + data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="days", method=method) + assert flagger.isFlagged("data")[dict_pattern["pattern_2"]].all() + + + + + + + @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_flagSesonalRange(data, field, flagger): # prepare diff --git a/test/funcs/test_spikes_detection.py b/test/funcs/test_spikes_detection.py index 77f729c2a..daac97d2c 100644 --- a/test/funcs/test_spikes_detection.py +++ b/test/funcs/test_spikes_detection.py @@ -100,9 +100,10 @@ def test_flagSpikesLimitRaise(dat, flagger): # see test/functs/conftest.py for the 'course_N' @pytest.mark.parametrize("flagger", TESTFLAGGER) -@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_3")]) +@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_pattern")]) def test_flagSpikesOddWater(dat, flagger): - data1, characteristics = dat(periods=1000, initial_level=5, final_level=15, out_val=50) + #data1, characteristics = dat(periods=100, initial_level=5, final_level=15, out_val=50) + data1, characteristics = dat() data2, characteristics = dat(periods=1000, initial_level=20, final_level=1, out_val=30) field = "dummy" fields = ["data1", "data2"] -- GitLab