From 1e967201e1ebc2592470e8208a8617787262a376 Mon Sep 17 00:00:00 2001
From: Peter Luenenschloss <peter.luenenschloss@ufz.de>
Date: Mon, 8 Jul 2019 18:29:40 +0200
Subject: [PATCH] flag spikes function for soil moisture variable implemented
 at working level

---
 saqc/funcs/functions.py | 79 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 75 insertions(+), 4 deletions(-)

diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py
index 64d4907f6..31e2dc217 100644
--- a/saqc/funcs/functions.py
+++ b/saqc/funcs/functions.py
@@ -3,6 +3,7 @@
 
 import numpy as np
 import pandas as pd
+from scipy.signal import savgol_filter
 
 from ..lib.tools import valueRange, slidingWindowIndices, inferFrequency, estimateSamplingRate, \
     retrieveTrustworthyOriginal
@@ -108,7 +109,7 @@ def flagMad(data, flags, field, flagger, length, z, freq=None, **kwargs):
     return data, flags
 
 
-def flagSoilMoistureBySoilFrost(data, flags, field, flagger, soil_temp_reference, tolerated_deviation,
+def flagSoilMoistureBySoilFrost(data, flags, field, flagger, soil_temp_reference, tolerated_deviation='1h',
                                 frost_level=0, **kwargs):
     """Function flags Soil moisture measurements by evaluating the soil-frost-level in the moment of measurement.
     Soil temperatures below "frost_level" are regarded as denoting frozen soil state.
@@ -211,7 +212,7 @@ def flagSoilMoistureByPrecipitationEvents(data, flags, field, flagger, prec_refe
     prec_count = prec_count.resample(input_rate).pad()
 
     # now we can: project precipitation onto dataseries sampling (and stack result to be able to apply df.rolling())
-    eval_frame = pd.merge(dataseries, prec_count, how='left', left_index=True, right_index=True).stack().reset_index()
+    eval_frame = pd.merge(dataseries, prec_count, how='left', left_index=True, right_index=True).stack(dropna=False).reset_index()
 
     # following reshaping operations make all columns available to a function applied on rolling windows (rolling will
     # only eat one column of a dataframe at a time and doesnt like multi indexes as well)
@@ -220,11 +221,11 @@ def flagSoilMoistureByPrecipitationEvents(data, flags, field, flagger, prec_refe
 
     # make raise and std. dev tester function (returns False for values that
     # should be flagged bad and True respectively. (must be this way, since np.nan gets casted to True)))
-    def prec_test(x):
+    def prec_test(x, std_fac=std_factor):
         x_moist = x[0::2]
         x_rain = x[1::2]
         if x_moist[-1] > x_moist[-2]:
-            if (x_moist[-1] - x_moist[0]) > std_factor*x_moist.std():
+            if (x_moist[-1] - x_moist[0]) > std_fac*x_moist.std():
                 return ~(x_rain[-1] <= (sensor_meas_depth*soil_porosity*sensor_accuracy))
             else:
                 return True
@@ -244,3 +245,73 @@ def flagSoilMoistureByPrecipitationEvents(data, flags, field, flagger, prec_refe
     # set Flags
     flags.loc[invalid_indices, field] = flagger.setFlag(flags.loc[invalid_indices, field], **kwargs)
     return data, flags
+
+
+def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, raise_factor=0.15, filter_window_size='3h',
+                                     normalized_data=True):
+    """Function
+
+       NOTE1: You should run less complex tests, especially range-tests, the flag-by-precipitation-test and the
+       flag-by-frost test previously to this one, since the spike check for any potential, unflagged spike,
+       is relatively costly (1 x smoothing + 2 x deviating + 2 x condition application).
+
+       NOTE2: Test will only provide meaningful results, if dataseries input of soilmoisture data is projected onto
+       [0,1] interval
+
+
+
+
+       :param data:                        The pandas dataframe holding the data-to-be flagged, as well as the reference
+                                           series. Data must be indexed by a datetime series and be harmonized onto a
+                                           time raster with seconds precision.
+       :param flags:                       A dataframe holding the flags/flag-entries associated with "data".
+       :param field:                       Fieldname of the Soil moisture measurements field in data.
+       :param flagger:                     A flagger - object. (saqc.flagger.X)
+
+    """
+
+    # retrieve data series input at its original sampling rate
+    # (Note: case distinction for pure series input to avoid error resulting from trying to access pd.Series[field]
+    if isinstance(data, pd.Series):
+        dataseries, data_rate = retrieveTrustworthyOriginal(data, flags, flagger)
+    else:
+        dataseries, data_rate = retrieveTrustworthyOriginal(data[field], flags[field], flagger)
+    # abort processing if original series has no valid entries!
+    if data_rate is np.nan:
+        return data, flags
+    # normalize data if nessecary
+    if ~normalized_data:
+        dataseries=dataseries/100
+
+    quotient_series = dataseries/dataseries.shift(-1)
+    spikes = (quotient_series > (1 + raise_factor)) | (quotient_series < (1 - raise_factor))
+    spikes = spikes[spikes == True]
+    # loop through spikes: (loop may sound ugly - but since the number of spikes is supposed to not exceed the
+    # thousands for year data - a loop going through all the spikes instances is much faster than
+    # a rolling window, rolling all through a stacked year dataframe )
+    filter_window_seconds = pd.Timedelta.total_seconds(pd.Timedelta(filter_window_size))
+    smoothing_periods = int(np.ceil((filter_window_seconds / data_rate.n)))
+    if smoothing_periods % 2 == 0:
+        smoothing_periods += 1
+    for spike in spikes.index:
+        start_slice = spike - pd.Timedelta(filter_window_size)
+        end_slice = spike + pd.Timedelta(filter_window_size)
+        scnd_derivate = savgol_filter(dataseries[start_slice:end_slice],
+                               window_length=smoothing_periods,
+                               polyorder=2,
+                               deriv=2)
+        length = scnd_derivate.size
+        test_ratio_1 = np.abs(scnd_derivate[int((length-1) / 2)] / scnd_derivate[int((length+1) / 2)])
+        if 0.8 < test_ratio_1 < 1.2:
+            start_slice = spike - pd.Timedelta('12h')
+            end_slice = spike + pd.Timedelta('12h')
+            test_slice = dataseries[start_slice:end_slice]
+            test_ratio_2 = np.abs(test_slice.var() / test_slice.mean())
+            if test_ratio_2 > 1:
+                spikes[spike] = False
+        else:
+            spikes[spike] = False
+
+    spikes = spikes[spikes == True]
+    flags.loc[spikes.index, field] = flagger.setFlag(flags.loc[spikes.index, field], **kwargs)
+    return data, flags
-- 
GitLab