Skip to content
Snippets Groups Projects
Commit 1e967201 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

flag spikes function for soil moisture variable implemented at working level

parent 61673605
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
from ..lib.tools import valueRange, slidingWindowIndices, inferFrequency, estimateSamplingRate, \
retrieveTrustworthyOriginal
......@@ -108,7 +109,7 @@ def flagMad(data, flags, field, flagger, length, z, freq=None, **kwargs):
return data, flags
def flagSoilMoistureBySoilFrost(data, flags, field, flagger, soil_temp_reference, tolerated_deviation,
def flagSoilMoistureBySoilFrost(data, flags, field, flagger, soil_temp_reference, tolerated_deviation='1h',
frost_level=0, **kwargs):
"""Function flags Soil moisture measurements by evaluating the soil-frost-level in the moment of measurement.
Soil temperatures below "frost_level" are regarded as denoting frozen soil state.
......@@ -211,7 +212,7 @@ def flagSoilMoistureByPrecipitationEvents(data, flags, field, flagger, prec_refe
prec_count = prec_count.resample(input_rate).pad()
# now we can: project precipitation onto dataseries sampling (and stack result to be able to apply df.rolling())
eval_frame = pd.merge(dataseries, prec_count, how='left', left_index=True, right_index=True).stack().reset_index()
eval_frame = pd.merge(dataseries, prec_count, how='left', left_index=True, right_index=True).stack(dropna=False).reset_index()
# following reshaping operations make all columns available to a function applied on rolling windows (rolling will
# only eat one column of a dataframe at a time and doesnt like multi indexes as well)
......@@ -220,11 +221,11 @@ def flagSoilMoistureByPrecipitationEvents(data, flags, field, flagger, prec_refe
# make raise and std. dev tester function (returns False for values that
# should be flagged bad and True respectively. (must be this way, since np.nan gets casted to True)))
def prec_test(x):
def prec_test(x, std_fac=std_factor):
x_moist = x[0::2]
x_rain = x[1::2]
if x_moist[-1] > x_moist[-2]:
if (x_moist[-1] - x_moist[0]) > std_factor*x_moist.std():
if (x_moist[-1] - x_moist[0]) > std_fac*x_moist.std():
return ~(x_rain[-1] <= (sensor_meas_depth*soil_porosity*sensor_accuracy))
else:
return True
......@@ -244,3 +245,73 @@ def flagSoilMoistureByPrecipitationEvents(data, flags, field, flagger, prec_refe
# set Flags
flags.loc[invalid_indices, field] = flagger.setFlag(flags.loc[invalid_indices, field], **kwargs)
return data, flags
def flagSoilMoistureBySpikeDetection(data, flags, field, flagger, raise_factor=0.15, filter_window_size='3h',
normalized_data=True):
"""Function
NOTE1: You should run less complex tests, especially range-tests, the flag-by-precipitation-test and the
flag-by-frost test previously to this one, since the spike check for any potential, unflagged spike,
is relatively costly (1 x smoothing + 2 x deviating + 2 x condition application).
NOTE2: Test will only provide meaningful results, if dataseries input of soilmoisture data is projected onto
[0,1] interval
:param data: The pandas dataframe holding the data-to-be flagged, as well as the reference
series. Data must be indexed by a datetime series and be harmonized onto a
time raster with seconds precision.
:param flags: A dataframe holding the flags/flag-entries associated with "data".
:param field: Fieldname of the Soil moisture measurements field in data.
:param flagger: A flagger - object. (saqc.flagger.X)
"""
# retrieve data series input at its original sampling rate
# (Note: case distinction for pure series input to avoid error resulting from trying to access pd.Series[field]
if isinstance(data, pd.Series):
dataseries, data_rate = retrieveTrustworthyOriginal(data, flags, flagger)
else:
dataseries, data_rate = retrieveTrustworthyOriginal(data[field], flags[field], flagger)
# abort processing if original series has no valid entries!
if data_rate is np.nan:
return data, flags
# normalize data if nessecary
if ~normalized_data:
dataseries=dataseries/100
quotient_series = dataseries/dataseries.shift(-1)
spikes = (quotient_series > (1 + raise_factor)) | (quotient_series < (1 - raise_factor))
spikes = spikes[spikes == True]
# loop through spikes: (loop may sound ugly - but since the number of spikes is supposed to not exceed the
# thousands for year data - a loop going through all the spikes instances is much faster than
# a rolling window, rolling all through a stacked year dataframe )
filter_window_seconds = pd.Timedelta.total_seconds(pd.Timedelta(filter_window_size))
smoothing_periods = int(np.ceil((filter_window_seconds / data_rate.n)))
if smoothing_periods % 2 == 0:
smoothing_periods += 1
for spike in spikes.index:
start_slice = spike - pd.Timedelta(filter_window_size)
end_slice = spike + pd.Timedelta(filter_window_size)
scnd_derivate = savgol_filter(dataseries[start_slice:end_slice],
window_length=smoothing_periods,
polyorder=2,
deriv=2)
length = scnd_derivate.size
test_ratio_1 = np.abs(scnd_derivate[int((length-1) / 2)] / scnd_derivate[int((length+1) / 2)])
if 0.8 < test_ratio_1 < 1.2:
start_slice = spike - pd.Timedelta('12h')
end_slice = spike + pd.Timedelta('12h')
test_slice = dataseries[start_slice:end_slice]
test_ratio_2 = np.abs(test_slice.var() / test_slice.mean())
if test_ratio_2 > 1:
spikes[spike] = False
else:
spikes[spike] = False
spikes = spikes[spikes == True]
flags.loc[spikes.index, field] = flagger.setFlag(flags.loc[spikes.index, field], **kwargs)
return data, flags
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment