Skip to content
Snippets Groups Projects
Commit eb1aa14d authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

changed functions package structure

parent 61ffcfc9
No related branches found
No related tags found
No related merge requests found
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
from .functions import (
register)
from ..lib.tools import (
retrieveTrustworthyOriginal,
getPandasVarNames,
getPandasData,
offset2periods,
checkQCParameters)
@register("Breaks_SpektrumBased")
def flagBreaks_SpektrumBased(data, flags, field, flagger, diff_method='raw', filter_window_size='3h',
rel_change_rate_min=0.1, abs_change_min=0.01, first_der_factor=10,
first_der_window_size='12h', scnd_der_ratio_margin_1=0.05,
scnd_der_ratio_margin_2=10, smooth_poly_order=2, **kwargs):
""" This Function is an generalization of the Spectrum based break flagging mechanism as presented in:
Dorigo,W,.... Global Automated Quality Control of In Situ Soil Moisture Data from the international
Soil Moisture Network. 2013. Vadoze Zone J. doi:10.2136/vzj2012.0097.
The function flags breaks (jumps/drops) in input measurement series by evaluating its derivatives.
A measurement y_t is flagged a, break, if:
(1) y_t is changing relatively to its preceeding value by at least (100*rel_change_rate_min) percent
(2) y_(t-1) is difffering from its preceeding value, by a margin of at least "abs_change_min"
(3) Absolute first derivative |(y_t)'| has to be at least "first_der_factor" times as big as the arithmetic middle
over all the first derivative values within a 2 times "first_der_window_size" hours window, centered at t.
(4) The ratio of the second derivatives at t and t+1 has to be "aproximately" 1.
([1-scnd__der_ration_margin_1, 1+scnd_ratio_margin_1])
(5) The ratio of the second derivatives at t+1 and t+2 has to be larger than scnd_der_ratio_margin_2
NOTE 1: As no reliable statement about the plausibility of the meassurements before and after the jump is possible,
only the jump itself is flagged. For flagging constant values following upon a jump, use a flagConstants test.
NOTE 2: All derivatives in the reference publication are obtained by applying a Savitzky-Golay filter to the data
before differentiating. However, i was not able to reproduce satisfaction of all the conditions for synthetically
constructed breaks.
Especially condition [4] and [5]! This is because smoothing distributes the harshness of the break over the
smoothing window. Since just taking the differences as derivatives did work well for my empirical data set,
the parameter "diff_method" defaults to "raw". That means, that derivatives will be obtained by just using the
differences series.
You are free of course, to change this parameter to "savgol" and play around with the associated filter options.
(see parameter description below)
:param data: The pandas dataframe holding the data-to-be flagged.
Data must be indexed by a datetime series and be harmonized onto a
time raster with seconds precision (skips allowed).
:param flags: A dataframe holding the flags/flag-entries associated with "data".
:param field: Fieldname of the Soil moisture measurements field in data.
:param flagger: A flagger - object. (saqc.flagger.X)
:param diff_method: String. Method for obtaining dataseries' derivatives.
'raw': Just take series step differences (default)
'savgol': Smooth data with a Savitzky Golay Filter before differentiating.
:param filter_window_size: Offset string. Size of the filter window, used to calculate the derivatives.
(relevant only, if: diff_method='savgol')
:param smooth_poly_order: Integer. Polynomial order, used for smoothing with savitzk golay filter.
(relevant only, if: diff_method='savgol')
:param rel_change_rate_min Float in [0,1]. See (1) of function descritpion above to learn more
:param abs_change_min Float > 0. See (2) of function descritpion above to learn more.
:param first_der_factor Float > 0. See (3) of function descritpion above to learn more.
:param first_der_window_size Offset_String. See (3) of function description to learn more.
:param scnd_der_ratio_margin_1 Float in [0,1]. See (4) of function descritpion above to learn more.
:param scnd_der_ratio_margin_2 Float in [0,1]. See (5) of function descritpion above to learn more.
"""
# retrieve data series input at its original sampling rate
para_check_1 = checkQCParameters({'data': {'value': data,
'type': [pd.Series, pd.DataFrame],
'tests': {'harmonized': lambda x: pd.infer_freq(x.index) is not None}},
'flags': {'value': flags,
'type': [pd.Series, pd.DataFrame]},
'field': {'value': field,
'type': [str],
'tests': {'scheduled in data': lambda x: x in
getPandasVarNames(data)}}},
kwargs['func_name'])
dataseries, data_rate = retrieveTrustworthyOriginal(getPandasData(data, field), getPandasData(flags, flags),
flagger)
para_check_2 = checkQCParameters({'diff_method': {'value': diff_method,
'member': ['raw', 'savgol']},
'filter_window_size': {'value': filter_window_size,
'type': [str],
'tests': {'Valid Offset String': lambda x: pd.Timedelta(
x).total_seconds() % 1 == 0}},
'first_der_window_size': {'value': first_der_window_size,
'type': [str],
'tests': {'Valid Offset String': lambda x: pd.Timedelta(
x).total_seconds() % 1 == 0}},
'smooth_poly_order': {'value': smooth_poly_order,
'type': [int],
'range': [0, np.inf]},
'rel_change_rate_min': {'value': rel_change_rate_min,
'type': [int, float],
'range': [0, 1]},
'scnd_der_ratio_margin_1': {'value': scnd_der_ratio_margin_1,
'type': [int, float],
'range': [0, 1]},
'scnd_der_ratio_margin_2': {'value': scnd_der_ratio_margin_1,
'type': [int, float],
'range': [0, 1]},
'abs_change_min': {'value': abs_change_min,
'type': [int, float],
'range': [0, np.inf]},
'first_der_factor': {'value': first_der_factor,
'type': [int, float],
'range': [0, np.inf]}},
kwargs['func_name'])
if (para_check_1 < 0) | (para_check_2 < 0):
logging.warning('test {} will be skipped because not all input parameters satisfied '
'the requirements'.format(kwargs['func_name']))
return data, flags
# relative - change - break criteria testing:
abs_change = np.abs(dataseries.shift(+1) - dataseries)
breaks = (abs_change > abs_change_min) & (abs_change / dataseries > rel_change_rate_min)
breaks = breaks[breaks == True]
# First derivative criterion
smoothing_periods = int(np.ceil(offset2periods(filter_window_size, data_rate)))
if smoothing_periods % 2 == 0:
smoothing_periods += 1
for brake in breaks.index:
# slice out slice-to-be-filtered (with some safety extension of 3 hours)
slice_start = brake - pd.Timedelta(first_der_window_size) - pd.Timedelta('3h')
slice_end = brake + pd.Timedelta(first_der_window_size) + pd.Timedelta('3h')
data_slice = dataseries[slice_start:slice_end]
# obtain first derivative:
if diff_method == 'savgol':
first_deri_series = pd.Series(data=savgol_filter(data_slice,
window_length=smoothing_periods,
polyorder=smooth_poly_order,
deriv=1),
index=data_slice.index)
if diff_method == 'raw':
first_deri_series = data_slice.diff()
# condition constructing and testing:
test_slice = first_deri_series[brake - pd.Timedelta(first_der_window_size):
brake + pd.Timedelta(first_der_window_size)]
test_sum = abs((test_slice.sum()*first_der_factor) / test_slice.size)
if abs(first_deri_series[brake]) > test_sum:
# second derivative criterion:
slice_start = brake - pd.Timedelta('3h')
slice_end = brake + pd.Timedelta('3h')
data_slice = data_slice[slice_start:slice_end]
# obtain second derivative:
if diff_method == 'savgol':
second_deri_series = pd.Series(data=savgol_filter(data_slice,
window_length=smoothing_periods,
polyorder=smooth_poly_order,
deriv=2),
index=data_slice.index)
if diff_method == 'raw':
second_deri_series = data_slice.diff().diff()
# criterion evaluation:
first_second = (1 - scnd_der_ratio_margin_1) < \
abs((second_deri_series.shift(-1)[brake] / second_deri_series.shift(-2)[brake])) < \
1 + scnd_der_ratio_margin_1
second_second = abs(second_deri_series.shift(-1)[brake] / second_deri_series.shift(-2)[brake]) > \
scnd_der_ratio_margin_2
if (~ first_second) | (~ second_second):
breaks[brake] = False
else:
breaks[brake] = False
breaks = breaks[breaks == True]
if isinstance(flags, pd.Series):
flags.loc[breaks.index] = flagger.setFlag(flags.loc[breaks.index], **kwargs)
else:
flags.loc[breaks.index, field] = flagger.setFlag(flags.loc[breaks.index, field], **kwargs)
return data, flags
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import numpy as np
import pandas as pd
from .functions import (
register)
from ..lib.tools import (
valueRange,
slidingWindowIndices,
retrieveTrustworthyOriginal,
getPandasVarNames,
getPandasData,
offset2periods,
checkQCParameters)
@register("constant")
def flagConstant(data, flags, field, flagger, eps,
length, thmin=None, **kwargs):
datacol = data[field]
flagcol = flags[field]
length = ((pd.to_timedelta(length) - data.index.freq)
.to_timedelta64()
.astype(np.int64))
values = (datacol
.mask((datacol < thmin) | datacol.isnull())
.values
.astype(np.int64))
dates = datacol.index.values.astype(np.int64)
mask = np.isfinite(values)
for start_idx, end_idx in slidingWindowIndices(datacol.index, length):
mask_chunk = mask[start_idx:end_idx]
values_chunk = values[start_idx:end_idx][mask_chunk]
dates_chunk = dates[start_idx:end_idx][mask_chunk]
# we might have removed dates from the start/end of the
# chunk resulting in a period shorter than 'length'
# print (start_idx, end_idx)
if valueRange(dates_chunk) < length:
continue
if valueRange(values_chunk) < eps:
flagcol[start_idx:end_idx] = flagger.setFlags(flagcol[start_idx:end_idx], **kwargs)
data[field] = datacol
flags[field] = flagcol
return data, flags
@register("constants_varianceBased")
def flagConstants_VarianceBased(data, flags, field, flagger, plateau_window_min='12h', plateau_var_limit=0.0005,
**kwargs):
"""Function flags plateaus/series of constant values. Any interval of values y(t),..y(t+n) is flagged, if:
(1) n > "plateau_interval_min"
(2) variance(y(t),...,y(t+n) < plateau_var_limit
:param data: The pandas dataframe holding the data-to-be flagged.
Data must be indexed by a datetime series and be harmonized onto a
time raster with seconds precision (skips allowed).
:param flags: A dataframe holding the flags/flag-entries associated with "data".
:param field: Fieldname of the Soil moisture measurements field in data.
:param flagger: A flagger - object. (saqc.flagger.X)
:param plateau_window_min: Offset String. Only intervals of minimum size "plateau_window_min" have the
chance to get flagged as constant intervals
:param plateau_var_limit: Float. The upper barrier, the variance of an interval mus not exceed, if the
interval wants to be flagged a plateau.
"""
para_check_1 = checkQCParameters({'data': {'value': data,
'type': [pd.Series, pd.DataFrame],
'tests': {'harmonized': lambda x: pd.infer_freq(x.index) is not None}},
'flags': {'value': flags,
'type': [pd.Series, pd.DataFrame]},
'field': {'value': field,
'type': [str],
'tests': {'scheduled in data':
lambda x: x in getPandasVarNames(data)}}},
kwargs['func_name'])
dataseries, data_rate = retrieveTrustworthyOriginal(getPandasData(data, field), getPandasData(flags, field), flagger)
para_check_2 = checkQCParameters({'plateau_window_min': {'value': plateau_window_min,
'type': [str],
'tests': {'Valid Offset String': lambda x: pd.Timedelta(x).total_seconds() % 1 == 0}},
'plateau_var_limit': {'value': plateau_var_limit,
'type': [int, float],
'range': [0, np.inf]},
'data_rate': {'value': data_rate,
'tests': {'not nan': lambda x: x is not np.nan}}},
kwargs['func_name'])
if (para_check_1 < 0) | (para_check_2 < 0):
logging.warning('test {} will be skipped because not all input parameters satisfied '
'the requirements'.format(kwargs['func_name']))
return data, flags
min_periods = int(offset2periods(plateau_window_min, data_rate))
# identify minimal plateaus:
plateaus = dataseries.rolling(window=plateau_window_min).apply(lambda x: (x.var() > plateau_var_limit) |
(x.size < min_periods), raw=False)
plateaus = (~plateaus.astype(bool))
# are there any candidates for beeing flagged plateau-ish
if plateaus.sum() == 0:
return data, flags
# nice reverse trick to cover total interval size
plateaus_reverse = pd.Series(np.flip(plateaus.values), index=plateaus.index)
reverse_check = plateaus_reverse.rolling(window=plateau_window_min).apply(
lambda x: True if True in x.values else False, raw=False).astype(bool)
# result:
plateaus = pd.Series(np.flip(reverse_check.values), index=plateaus.index)
if isinstance(flags, pd.Series):
flags.loc[plateaus.index, field] = flagger.setFlag(flags.loc[plateaus.index, field], **kwargs)
else:
flags.loc[plateaus.index] = flagger.setFlag(flags.loc[plateaus.index], **kwargs)
return data, flags
This diff is collapsed.
This diff is collapsed.
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
from .functions import (
register)
from ..lib.tools import (
inferFrequency,
retrieveTrustworthyOriginal,
getPandasVarNames,
getPandasData,
offset2seconds,
checkQCParameters)
@register("mad")
def flagMad(data, flags, field, flagger, length, z, freq=None, **kwargs):
d = data[field].copy()
freq = inferFrequency(d) if freq is None else freq
if freq is None:
raise ValueError("freqency cannot inferred, provide `freq` as a param to mad().")
winsz = int(pd.to_timedelta(length) / freq)
median = d.rolling(window=winsz, center=True, closed='both').median()
diff = abs(d - median)
mad = diff.rolling(window=winsz, center=True, closed='both').median()
mask = (mad > 0) & (0.6745 * diff > z * mad)
flags.loc[mask, field] = flagger.setFlag(flags.loc[mask, field], **kwargs)
return data, flags
@register("Spikes_SpektrumBased")
def flagSpikes_SpektrumBased(data, flags, field, flagger, diff_method='raw', filter_window_size='3h',
raise_factor=0.15, dev_cont_factor=0.2, noise_barrier=1, noise_window_size='12h',
noise_statistic='CoVar', smooth_poly_order=2, **kwargs):
"""This Function is an generalization of the Spectrum based Spike flagging mechanism as presented in:
Dorigo,W,.... Global Automated Quality Control of In Situ Soil Moisture Data from the international
Soil Moisture Network. 2013. Vadoze Zone J. doi:10.2136/vzj2012.0097.
Function detects and flags spikes in input data series by evaluating its derivatives and applying some
conditions to it. A datapoint is considered a spike, if:
(1) the quotient to its preceeding datapoint exceeds a certain bound
(controlled by param "raise_factor")
(2) the quotient of the datas second derivate at the preceeding and subsequent timestamps is close enough to 1.
(controlled by param "dev_cont_factor")
(3) the surrounding data is not too noisy. (Coefficient of Variation[+/- noise_window] < 1)
(controlled by param "noise_barrier")
Some things you should be conscious about when applying this test:
NOTE1: You should run less complex tests, especially range-tests, or absolute spike tests previously to this one,
since the spike check for any potential, unflagged spike, is relatively costly
(1 x smoothing + 2 x deviating + 2 x condition application).
NOTE2: Due to inconsistency in the paper that provided the concept of this test [paper:], its not really clear
weather to use the coefficient of variance or the relative variance for noise testing.
Since the relative variance was explicitly denoted in the formulas, the function defaults to relative variance,
but can be switched to coefficient of variance, by assignment to parameter "noise statistic".
NOTE3: All derivatives in the reference publication are obtained by applying a Savitzky-Golay filter to the data
before differentiating. For the break detection algorithm in this publication,
some of the conditions didnt work well with smoothed derivatives.
This is because smoothing distributes the harshness of breaks and jumps over the
smoothing window and makes it "smoother".
Since just taking the differences as derivatives did work well for my empirical data set,
the parameter "diff_method" defaults to "raw". That means, that derivatives will be obtained by just using the
differences series.
You are free of course, to change this parameter to "savgol" and play around with the associated filter options.
(see parameter description below)
:param data: The pandas dataframe holding the data-to-be flagged.
Data must be indexed by a datetime series and be harmonized onto a
time raster with seconds precision.
:param flags: A dataframe holding the flags/flag-entries associated with "data".
:param field: Fieldname of the Soil moisture measurements field in data.
:param flagger: A flagger - object. (saqc.flagger.X)
:param diff_method: String. Method for obtaining dataseries' derivatives.
'raw': Just take series step differences (default)
'savgol': Smooth data with a Savitzky Golay Filter before differentiating.
:param filter_window_size: Offset string. Size of the filter window, used to calculate the derivatives.
(relevant only, if: diff_method='savgol')
:param smooth_poly_order: Integer. Polynomial order, used for smoothing with savitzk golay filter.
(relevant only, if: diff_method='savgol')
:param raise_factor: A float, determinating the bound, the quotient of two consecutive values
has to exceed, to be regarded as potentially spike. A value of 0.x will
trigger the spike test for value y_t, if:
(y_t)/(y_t-1) > 1 + x or:
(y_t)/(y_t-1) < 1 - x.
:param dev_cont_factor: A float, determining the interval, the quotient of the datas second derivate
around a potential spike has to be part of, to trigger spike flagging for
this value. A datapoint y_t will pass this spike condition if,
for dev_cont_factor = 0.x, and the second derivative y'' of y, the condition:
1 - x < abs((y''_t-1)/(y''_t+1)) < 1 + x
holds
:param noise_barrier: A float, determining the bound, the data noisy-ness around a potential spike
must not exceed, in order to guarantee a justifyed judgement:
Therefor the coefficient selected by parameter noise_statistic (COVA),
of all values within t +/- param "noise_window",
but excluding the point y_t itself, is evaluated and tested
for: COVA < noise_barrier.
:param noise_window_size: Offset string, determining the size of the window, the coefficient of
variation is calculated of, to determine data noisy-ness around a potential
spike.
The potential spike y_t will be centered in a window of expansion:
[y_t - noise_window_size, y_t + noise_window_size].
:param noise_statistic: STRING. Determines, wheather to use
"relative variance" or "coefficient of variation" to check against the noise
barrier.
'CoVar' -> "Coefficient of variation"
'rVar' -> "relative Variance"
"""
para_check_1 = checkQCParameters({'data': {'value': data,
'type': [pd.Series, pd.DataFrame],
'tests': {'harmonized': lambda x: pd.infer_freq(x.index) is not None}},
'flags': {'value': flags,
'type': [pd.Series, pd.DataFrame]},
'field': {'value': field,
'type': [str],
'tests': {'scheduled in data': lambda x: x in
getPandasVarNames(data)}}},
kwargs['func_name'])
dataseries, data_rate = retrieveTrustworthyOriginal(getPandasData(data, field), getPandasData(flags, field),
flagger)
para_check_2 = checkQCParameters({'diff_method': {'value': diff_method,
'member': ['raw', 'savgol']},
'noise_statistic': {'value': noise_statistic,
'member': ['CoVar', 'rVar']},
'filter_window_size': {'value': filter_window_size,
'type': [str],
'tests': {'Valid Offset String': lambda x: pd.Timedelta(
x).total_seconds() % 1 == 0}},
'noise_window_size': {'value': noise_window_size,
'type': [str],
'tests': {'Valid Offset String': lambda x: pd.Timedelta(
x).total_seconds() % 1 == 0}},
'smooth_poly_order': {'value': smooth_poly_order,
'type': [int],
'range': [0, np.inf]},
'raise_factor': {'value': raise_factor,
'type': [int, float],
'range': [0, 1]},
'noise_barrier': {'value': noise_barrier,
'type': [int, float],
'range': [0, np.inf]},
'dev_cont_factor': {'value': dev_cont_factor,
'type': [int, float],
'range': [0, 1]}},
kwargs['func_name'])
# retrieve data series input at its original sampling rate
# (Note: case distinction for pure series input to avoid error resulting from trying to access pd.Series[field]
if (para_check_1 < 0) | (para_check_2 < 0):
logging.warning('test {} will be skipped because not all input parameters satisfied '
'the requirements'.format(kwargs['func_name']))
return data, flags
# retrieve noise statistic
if noise_statistic == 'CoVar':
noise_func = pd.Series.var
if noise_statistic == 'rVar':
noise_func = pd.Series.std
quotient_series = dataseries/dataseries.shift(+1)
spikes = (quotient_series > (1 + raise_factor)) | (quotient_series < (1 - raise_factor))
spikes = spikes[spikes == True]
# loop through spikes: (loop may sound ugly - but since the number of spikes is supposed to not exceed the
# thousands for year data - a loop going through all the spikes instances is much faster than
# a rolling window, rolling all through a stacked year dataframe )
# calculate some values, repeatedly needed in the course of the loop:
filter_window_seconds = offset2seconds(filter_window_size)
smoothing_periods = int(np.ceil((filter_window_seconds / data_rate.n)))
lower_dev_bound = 1 - dev_cont_factor
upper_dev_bound = 1 + dev_cont_factor
if smoothing_periods % 2 == 0:
smoothing_periods += 1
for spike in spikes.index:
start_slice = spike - pd.Timedelta(filter_window_size)
end_slice = spike + pd.Timedelta(filter_window_size)
if diff_method == 'savgol':
scnd_derivate = savgol_filter(dataseries[start_slice:end_slice],
window_length=smoothing_periods,
polyorder=smooth_poly_order,
deriv=2)
if diff_method == 'raw':
scnd_derivate = dataseries[start_slice:end_slice].diff().diff()
length = scnd_derivate.size
test_ratio_1 = np.abs(scnd_derivate[int((length-1) / 2)] / scnd_derivate[int((length+1) / 2)])
if lower_dev_bound < test_ratio_1 < upper_dev_bound:
# apply noise condition:
start_slice = spike - pd.Timedelta(noise_window_size)
end_slice = spike + pd.Timedelta(noise_window_size)
test_slice = dataseries[start_slice:end_slice].drop(spike)
test_ratio_2 = np.abs(noise_func(test_slice) / test_slice.mean())
# not a spike, we want to flag, if condition not satisfied:
if test_ratio_2 > noise_barrier:
spikes[spike] = False
# not a spike, we want to flag, if condition not satisfied
else:
spikes[spike] = False
spikes = spikes[spikes == True]
if isinstance(flags, pd.Series):
flags.loc[spikes.index, field] = flagger.setFlag(flags.loc[spikes.index, field], **kwargs)
else:
flags.loc[spikes.index] = flagger.setFlag(flags.loc[spikes.index], **kwargs)
return data, flags
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment