From 9f3601d1b1f5fb45eccc2651787acbfa7afdfaf3 Mon Sep 17 00:00:00 2001
From: Juliane Geller <gellerj>
Date: Thu, 28 May 2020 09:34:29 +0200
Subject: [PATCH 1/2] Added Pattern rec

---
 saqc/funcs/functions.py | 111 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 111 insertions(+)

diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py
index 9209e9ae8..16f31f5e0 100644
--- a/saqc/funcs/functions.py
+++ b/saqc/funcs/functions.py
@@ -3,6 +3,10 @@
 
 import numpy as np
 import pandas as pd
+import dtw
+import pywt
+from mlxtend.evaluate import permutation_test
+import datetime
 
 from saqc.lib.tools import groupConsecutives, sesonalMask
 
@@ -51,6 +55,113 @@ def flagRange(data, field, flagger, min, max, **kwargs):
     return data, flagger
 
 
+
+
+@register()
+def flagPattern(data, field, flagger, ref_datafield, sample_freq = '15 Min', method = 'dtw', min_distance = None, **kwargs):
+    # To Do: Bisher geht der Algo davon aus, dass sich das Pattern über einen Tag erstreckt -> noch erweitern auf > 1 Tag bzw. < 1 Tag (z.B. alle zwei Stunden ein Pattern)
+
+    test_field = data[field].copy()
+    ref_field  = data[ref_datafield].copy()
+    # Test : Referenzhut ist der 30. Juni 2019
+    ref_field = ref_field['2019-06-30 11:00:00':'2019-06-30 18:00:00']
+
+    # Get start and end time from reference field
+    Start_date = ref_field.head(1).index.item()
+    Start_time = datetime.datetime.time(Start_date)
+    End_date = ref_field.tail(1).index.item()
+    End_time = datetime.datetime.time(End_date)
+
+    # Count length of partition
+    partition_count = End_date.day - Start_date.day + 1
+    partition_freq = "%d D" % partition_count
+
+    # Harmonization
+    test_field = test_field.resample(sample_freq).first().interpolate('time')
+    ref_field = ref_field.resample(sample_freq).first().interpolate('time')
+
+
+
+
+    # Calculate Partition
+#    if not partition_freq:
+#        partition_freq = test_field.shape[0]
+
+#    if isinstance(partition_freq, str):
+
+    partitions = test_field.groupby(pd.Grouper(freq=partition_freq))
+
+#    else:
+#        grouper_series = pd.Series(data=np.arange(0, test_field.shape[0]), index=test_field.index)
+#        grouper_series = grouper_series.transform(lambda x: int(np.floor(x / partition_freq)))
+#        partitions = test_field.groupby(grouper_series)
+
+    # Initialize the chosen pattern method
+    # DTW as standard
+    if (not method) or (method != 'dtw' and method != 'wavelet'):
+        method = 'dtw'
+    # Initializing DTW
+    if method == 'dtw':
+        # Set minimal distance
+        if not min_distance:
+            min_distance = 4.5
+    # Initializing Wavelets
+    if method == 'wavelet':
+        # calculate reference wavelet transform
+        ref_field_wl = ref_field.values.ravel()
+        # Widths lambda as in Ann Maharaj
+        widths = [1, 2, 4, 8]
+        cwtmat_ref, freqs = pywt.cwt(ref_field_wl, widths, 'mexh')
+        # Square of matrix elements as Power sum of the matrix
+        wavepower_ref = np.power(cwtmat_ref, 2)
+
+
+
+    flags = pd.Series(data=0, index=test_field.index)
+    # calculate flags for every partition
+    partition_min = 0
+    # Set time frames for partition
+    if not Start_time:
+        Start_time = '00:00'
+    if not End_time:
+        End_time = '23:59'
+    for _, partition in partitions:
+        if partition.empty | (partition.shape[0] < partition_min):
+            continue
+        # Use only the given time frame
+        pattern = partition.between_time(Start_time, End_time)
+        # Choose method
+        if method == 'dtw':
+            distance = dtw.dtw(pattern, ref_field, open_end=True, distance_only=True).normalizedDistance
+            # Partition labeled as pattern by dtw
+            if distance < min_distance:
+                flags[partition.index] = 1
+        elif method == 'wavelet':
+            # calculate reference wavelet transform
+            test_field_wl = pattern.values.ravel()
+            cwtmat_test, freqs = pywt.cwt(test_field_wl, widths, 'mexh')
+            # Square of matrix elements as Power sum of the matrix
+            wavepower_test = np.power(cwtmat_test, 2)
+
+            # Permutation test on Powersum of matrix
+            p_value = []
+            for i in range(len(widths)):
+                x = wavepower_ref[i]
+                y = wavepower_test[i]
+                pval = permutation_test(x, y, method='approximate', num_rounds=200, func=lambda x, y: x.sum() / y.sum(),
+                                        seed=0)
+                p_value.append(min(pval, 1 - pval))
+            # Partition labeled as pattern by wavelet
+            if min(p_value) >= 0.01:
+                flags[partition.index] = 1
+
+    mask = (flags == 1)
+
+    flagger = flagger.setFlags(field, mask, **kwargs)
+    return data, flagger
+
+
+
 @register()
 def flagMissing(data, field, flagger, nodata=np.nan, **kwargs):
     datacol = data[field]
-- 
GitLab


From 6486433b200cb572e74755292951da90c0bf4ea3 Mon Sep 17 00:00:00 2001
From: gellerj <juliane.geller@ufz.de>
Date: Mon, 6 Jul 2020 14:06:23 +0200
Subject: [PATCH 2/2] Pattern recognition wavelets/dtw

---
 dios                                |   2 +-
 requirements.txt                    |   4 +
 ressources/data/config.csv          |   6 --
 ressources/data/config_ci.csv       |   7 --
 saqc/funcs/breaks_detection.py      |   2 +-
 saqc/funcs/functions.py             | 114 +++++++++++-----------------
 saqc/funcs/spikes_detection.py      |   4 +-
 test/funcs/conftest.py              |  94 +++++++++++++++++++++++
 test/funcs/test_functions.py        |  37 +++++++++
 test/funcs/test_spikes_detection.py |   5 +-
 10 files changed, 188 insertions(+), 87 deletions(-)
 delete mode 100644 ressources/data/config.csv
 delete mode 100644 ressources/data/config_ci.csv

diff --git a/dios b/dios
index 8264b2540..ac9f446e7 160000
--- a/dios
+++ b/dios
@@ -1 +1 @@
-Subproject commit 8264b2540025709465b2db0c23a074d97e31182c
+Subproject commit ac9f446e70ab06d3a6bd3b56a8b6789e56f2e861
diff --git a/requirements.txt b/requirements.txt
index 654dadb61..c20b21134 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,11 +1,14 @@
 attrs==19.3.0
 Click==7.0
 cycler==0.10.0
+datetime==4.3
+dtw==1.4.0
 importlib-metadata==1.5.0
 joblib==0.14.1
 kiwisolver==1.1.0
 llvmlite==0.31.0
 matplotlib==3.1.3
+mlxtend==0.17.2
 more-itertools==8.2.0
 numba==0.48.0
 numpy==1.18.1
@@ -20,6 +23,7 @@ pytest==5.3.5
 python-dateutil==2.8.1
 python-intervals==1.10.0
 pytz==2019.3
+PyWavelets==1.0.0
 scikit-learn==0.22.1
 scipy==1.4.1
 six==1.14.0
diff --git a/ressources/data/config.csv b/ressources/data/config.csv
deleted file mode 100644
index e1e00c54e..000000000
--- a/ressources/data/config.csv
+++ /dev/null
@@ -1,6 +0,0 @@
-varname    ; test                                ; plot
-#----------;-------------------------------------;------
-SM2        ; harm_shift2Grid(freq="15Min")       ; False
-SM2        ; flagMissing(nodata=NAN)             ; False
-'SM(1|2)+' ; flagRange(min=10, max=60)           ; False
-SM2        ; spikes_flagMad(window="30d", z=3.5) ; True
diff --git a/ressources/data/config_ci.csv b/ressources/data/config_ci.csv
deleted file mode 100644
index f613a9385..000000000
--- a/ressources/data/config_ci.csv
+++ /dev/null
@@ -1,7 +0,0 @@
-varname;test;plot
-SM2;harm_shift2Grid(freq="15Min");False
-SM1;flagRange(min=10, max=60);False
-SM2;flagMissing(nodata=NAN);False
-SM2;flagRange(min=10, max=60);False
-SM2;spikes_flagMad(window="30d", z=3.5);False
-Dummy;flagGeneric(func=(isflagged(SM1) | isflagged(SM2)))
diff --git a/saqc/funcs/breaks_detection.py b/saqc/funcs/breaks_detection.py
index e8b5d1a79..7bc7823da 100644
--- a/saqc/funcs/breaks_detection.py
+++ b/saqc/funcs/breaks_detection.py
@@ -27,7 +27,7 @@ def breaks_flagSpektrumBased(
     **kwargs
 ):
 
-    """ This Function is an generalization of the Spectrum based break flagging mechanism as presented in:
+    """ This Function is a generalization of the Spectrum based break flagging mechanism as presented in:
 
     Dorigo,W,.... Global Automated Quality Control of In Situ Soil Moisture Data from the international
     Soil Moisture Network. 2013. Vadoze Zone J. doi:10.2136/vzj2012.0097.
diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py
index 16f31f5e0..2a33e2d59 100644
--- a/saqc/funcs/functions.py
+++ b/saqc/funcs/functions.py
@@ -58,91 +58,69 @@ def flagRange(data, field, flagger, min, max, **kwargs):
 
 
 @register()
-def flagPattern(data, field, flagger, ref_datafield, sample_freq = '15 Min', method = 'dtw', min_distance = None, **kwargs):
-    # To Do: Bisher geht der Algo davon aus, dass sich das Pattern über einen Tag erstreckt -> noch erweitern auf > 1 Tag bzw. < 1 Tag (z.B. alle zwei Stunden ein Pattern)
-
-    test_field = data[field].copy()
-    ref_field  = data[ref_datafield].copy()
-    # Test : Referenzhut ist der 30. Juni 2019
-    ref_field = ref_field['2019-06-30 11:00:00':'2019-06-30 18:00:00']
-
-    # Get start and end time from reference field
-    Start_date = ref_field.head(1).index.item()
-    Start_time = datetime.datetime.time(Start_date)
-    End_date = ref_field.tail(1).index.item()
-    End_time = datetime.datetime.time(End_date)
-
-    # Count length of partition
-    partition_count = End_date.day - Start_date.day + 1
-    partition_freq = "%d D" % partition_count
-
-    # Harmonization
-    test_field = test_field.resample(sample_freq).first().interpolate('time')
-    ref_field = ref_field.resample(sample_freq).first().interpolate('time')
-
-
-
-
-    # Calculate Partition
-#    if not partition_freq:
-#        partition_freq = test_field.shape[0]
-
-#    if isinstance(partition_freq, str):
-
-    partitions = test_field.groupby(pd.Grouper(freq=partition_freq))
-
-#    else:
-#        grouper_series = pd.Series(data=np.arange(0, test_field.shape[0]), index=test_field.index)
-#        grouper_series = grouper_series.transform(lambda x: int(np.floor(x / partition_freq)))
-#        partitions = test_field.groupby(grouper_series)
+def flagPattern(data, field, flagger, reference_field, method = 'dtw', partition_freq = "days", partition_offset = 0, max_distance = 0.03, normalized_distance = True, widths = [1,2,4,8], waveform = 'mexh', **kwargs):
+
+    test = data[field].copy()
+    ref = data[reference_field].copy()
+    #ref = ref['2019-06-29 11:00:00':'2019-06-29 18:00:00']
+    Pattern_start_date = ref.index[0]
+    Pattern_start_time = datetime.datetime.time(Pattern_start_date)
+    Pattern_end_date = ref.index[-1]
+    Pattern_end_time = datetime.datetime.time(Pattern_end_date)
+
+    ### Extract partition frequency from pattern if needed
+    if not isinstance(partition_freq, str):
+        raise ValueError('Partition frequency has to be given in string format.')
+    elif partition_freq == "days" or partition_freq == "months":
+            # Get partition frequency from reference field
+            partition_count = (Pattern_end_date - Pattern_start_date).days
+            partitions = test.groupby(pd.Grouper(freq="%d D" % (partition_count + 1)))
+    else:
+        partitions = test.groupby(pd.Grouper(freq=partition_freq))
 
-    # Initialize the chosen pattern method
-    # DTW as standard
-    if (not method) or (method != 'dtw' and method != 'wavelet'):
-        method = 'dtw'
-    # Initializing DTW
-    if method == 'dtw':
-        # Set minimal distance
-        if not min_distance:
-            min_distance = 4.5
     # Initializing Wavelets
     if method == 'wavelet':
         # calculate reference wavelet transform
-        ref_field_wl = ref_field.values.ravel()
+        ref_wl = ref.values.ravel()
         # Widths lambda as in Ann Maharaj
-        widths = [1, 2, 4, 8]
-        cwtmat_ref, freqs = pywt.cwt(ref_field_wl, widths, 'mexh')
+        cwtmat_ref, freqs = pywt.cwt(ref_wl, widths, waveform)
         # Square of matrix elements as Power sum of the matrix
         wavepower_ref = np.power(cwtmat_ref, 2)
+    elif not method == 'dtw':
+    # No correct method given
+        raise ValueError('Unable to interpret {} as method.'.format(method))
 
-
-
-    flags = pd.Series(data=0, index=test_field.index)
-    # calculate flags for every partition
-    partition_min = 0
-    # Set time frames for partition
-    if not Start_time:
-        Start_time = '00:00'
-    if not End_time:
-        End_time = '23:59'
+    flags = pd.Series(data=0, index=test.index)
+    ### Calculate flags for every partition
+    partition_min = ref.shape[0]
     for _, partition in partitions:
-        if partition.empty | (partition.shape[0] < partition_min):
+        # Ensuring that partition is at least as long as reference pattern
+        if partition.empty or (partition.shape[0] < partition_min):
             continue
-        # Use only the given time frame
-        pattern = partition.between_time(Start_time, End_time)
-        # Choose method
+        if partition_freq == "days" or partition_freq == "months":
+            # Use only the time frame given by the pattern
+            test = partition.between_time(Pattern_start_time, Pattern_end_time)
+            mask = (partition.index >= test.index[0]) & (partition.index <= test.index[-1])
+            test = partition.loc[mask]
+        else:
+            # cut partition according to pattern and offset
+            start_time = pd.Timedelta(partition_offset) + partition.index[0]
+            end_time = start_time + pd.Timedelta(Pattern_end_date - Pattern_start_date)
+            test = partition[start_time:end_time]
+        ### Switch method
         if method == 'dtw':
-            distance = dtw.dtw(pattern, ref_field, open_end=True, distance_only=True).normalizedDistance
+            distance = dtw.dtw(test, ref, open_end=True, distance_only=True).normalizedDistance
+            if normalized_distance:
+                distance = distance/abs(ref.mean())
             # Partition labeled as pattern by dtw
-            if distance < min_distance:
+            if distance < max_distance:
                 flags[partition.index] = 1
         elif method == 'wavelet':
             # calculate reference wavelet transform
-            test_field_wl = pattern.values.ravel()
-            cwtmat_test, freqs = pywt.cwt(test_field_wl, widths, 'mexh')
+            test_wl = test.values.ravel()
+            cwtmat_test, freqs = pywt.cwt(test_wl, widths, 'mexh')
             # Square of matrix elements as Power sum of the matrix
             wavepower_test = np.power(cwtmat_test, 2)
-
             # Permutation test on Powersum of matrix
             p_value = []
             for i in range(len(widths)):
diff --git a/saqc/funcs/spikes_detection.py b/saqc/funcs/spikes_detection.py
index 2e4c12c39..3cf93fdae 100644
--- a/saqc/funcs/spikes_detection.py
+++ b/saqc/funcs/spikes_detection.py
@@ -126,8 +126,8 @@ def _expFit(val_frame, scoring_method='kNNMaxGap', n_neighbors=10, iter_start=0.
 
         # fitting
         lambdA, _ = curve_fit(fit_function, xdata=binzenters[upper_tail_index:iter_max_bin_index],
-                                ydata=upper_tail_hist,
-                                p0=[-np.log(alpha/resids[iter_index])])
+                              ydata=upper_tail_hist,
+                              p0=[-np.log(alpha / resids[iter_index])])
 
         crit_val = neg_log_alpha / lambdA
         test_val = resids[iter_index]
diff --git a/test/funcs/conftest.py b/test/funcs/conftest.py
index 933e4539d..d913a2944 100644
--- a/test/funcs/conftest.py
+++ b/test/funcs/conftest.py
@@ -13,6 +13,7 @@ def char_dict():
             "return": pd.DatetimeIndex([])}
 
 
+
 @pytest.fixture
 def course_1(char_dict):
     # MONOTONOUSLY ASCENDING/DESCENDING
@@ -65,6 +66,99 @@ def course_2(char_dict):
 
     return fix_funk
 
+@pytest.fixture
+def course_2(char_dict):
+    # SINGLE_SPIKE
+    # values , that linearly  develop over the whole timeseries, from "initial_level" to "final_level", exhibiting
+    # one "anomalous" or "outlierish" value of magnitude "out_val" at position "periods/2"
+    # number of periods better be even!
+    def fix_funk(freq='10min', periods=10, initial_level=0, final_level=0, out_val=5,
+                 initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), char_dict=char_dict):
+
+        t_index = pd.date_range(initial_index, freq=freq, periods=periods)
+        data = np.linspace(initial_level, final_level, int(np.floor(len(t_index))))
+
+        data = pd.Series(data=data, index=t_index)
+        data.iloc[int(np.floor(periods / 2))] = out_val
+
+        if out_val > data.iloc[int(np.floor(periods / 2) - 1)]:
+            kind = 'raise'
+        else:
+            kind = 'drop'
+
+        char_dict[kind] = data.index[int(np.floor(periods / 2))]
+        char_dict['return'] = data.index[int(np.floor(len(t_index) / 2)) + 1]
+
+        data = DictOfSeries(data=data, columns=['data'])
+        return data, char_dict
+
+    return fix_funk
+
+
+
+@pytest.fixture
+def course_pattern_1(char_dict):
+    # Test spike pattern
+    def fix_funk(freq='10min',
+                 initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), out_val=5, char_dict=char_dict):
+
+        t_index = pd.date_range(initial_index, freq=freq, periods=6)
+        #data = np.linspace(initial_level, final_level, int(np.floor(len(t_index))))
+
+        data = pd.Series(data=0, index=t_index)
+        data.iloc[2] = out_val
+        data.iloc[3] = out_val
+        char_dict['pattern_1'] = data.index
+
+        data = DictOfSeries(data=data, columns=['pattern_1'])
+        return data, char_dict
+
+    return fix_funk
+
+
+
+@pytest.fixture
+def course_pattern_2(char_dict):
+    # Test spike pattern
+    def fix_funk(freq='1 H',
+                 initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), out_val=5, char_dict=char_dict):
+
+        t_index = pd.date_range(initial_index, freq=freq, periods=4)
+        #data = np.linspace(initial_level, final_level, int(np.floor(len(t_index))))
+
+        data = pd.Series(data=0, index=t_index)
+        data.iloc[2] = out_val
+        data.iloc[3] = out_val
+        char_dict['pattern_2'] = data.index
+
+        data = DictOfSeries(data=data, columns=['pattern_2'])
+        return data, char_dict
+
+    return fix_funk
+
+
+
+
+@pytest.fixture
+def course_test(char_dict):
+    # Test function for pattern detection - same as test pattern for first three values, than constant function
+    def fix_funk(freq='1 D',
+                 initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), out_val=5, char_dict=char_dict):
+
+        t_index = pd.date_range(initial_index, freq=freq, periods=100)
+        #data = np.linspace(initial_level, final_level, int(np.floor(len(t_index))))
+
+        data = pd.Series(data=0, index=t_index)
+        data.iloc[2] = out_val
+        data.iloc[3] = out_val
+
+
+        data = DictOfSeries(data=data, columns=['data'])
+        return data, char_dict
+
+    return fix_funk
+
+
 
 @pytest.fixture
 def course_3(char_dict):
diff --git a/test/funcs/test_functions.py b/test/funcs/test_functions.py
index 429e8ea36..c34d7ca6d 100644
--- a/test/funcs/test_functions.py
+++ b/test/funcs/test_functions.py
@@ -4,15 +4,22 @@
 import pytest
 import numpy as np
 
+
 from saqc.funcs.functions import (
     flagRange,
     flagSesonalRange,
     forceFlags,
     clearFlags,
     flagIsolated,
+    flagPattern
 )
+
 from test.common import initData, TESTFLAGGER
 
+import pandas as pd
+import dios
+
+
 
 @pytest.fixture
 def data():
@@ -34,6 +41,36 @@ def test_flagRange(data, field, flagger):
     assert (flagged == expected).all()
 
 
+@pytest.mark.parametrize("flagger", TESTFLAGGER)
+@pytest.mark.parametrize("method", ['wavelet', 'dtw'])
+@pytest.mark.parametrize("pattern", [pytest.lazy_fixture("course_pattern_1"),
+                                     pytest.lazy_fixture("course_pattern_2"),] ,)
+                            #     pytest.lazy_fixture("course_3"),
+                            #     pytest.lazy_fixture("course_4"), ],
+                                           #  )
+def test_flagPattern(course_test, flagger, method, pattern):
+    pattern_data, dict_pattern = pattern()
+
+    # testing the same pattern sampled at different frequencies
+    if pattern_data.columns == "pattern1":
+        test_data, *_ = course_test(freq="10 min")
+        test_data['pattern_data'] = pattern_data.to_df()
+        flagger = flagger.initFlags(test_data)
+        data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="1 H", method=method)
+        assert flagger.isFlagged("data")[dict_pattern["pattern_1"]].all()
+    if pattern_data.columns == "pattern2":
+        test_data, *_ = course_test(freq="1 H")
+        test_data['pattern_data'] = pattern_data.to_df()
+        flagger = flagger.initFlags(test_data)
+        data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="days", method=method)
+        assert flagger.isFlagged("data")[dict_pattern["pattern_2"]].all()
+
+
+
+
+
+
+
 @pytest.mark.parametrize("flagger", TESTFLAGGER)
 def test_flagSesonalRange(data, field, flagger):
     # prepare
diff --git a/test/funcs/test_spikes_detection.py b/test/funcs/test_spikes_detection.py
index 77f729c2a..daac97d2c 100644
--- a/test/funcs/test_spikes_detection.py
+++ b/test/funcs/test_spikes_detection.py
@@ -100,9 +100,10 @@ def test_flagSpikesLimitRaise(dat, flagger):
 
 # see test/functs/conftest.py for the 'course_N'
 @pytest.mark.parametrize("flagger", TESTFLAGGER)
-@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_3")])
+@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_pattern")])
 def test_flagSpikesOddWater(dat, flagger):
-    data1, characteristics = dat(periods=1000, initial_level=5, final_level=15, out_val=50)
+    #data1, characteristics = dat(periods=100, initial_level=5, final_level=15, out_val=50)
+    data1, characteristics = dat()
     data2, characteristics = dat(periods=1000, initial_level=20, final_level=1, out_val=30)
     field = "dummy"
     fields = ["data1", "data2"]
-- 
GitLab