Skip to content
Snippets Groups Projects
Commit 2688e56d authored by Bert Palm's avatar Bert Palm 🎇
Browse files

Merge branch 'pattern_newmodule' into 'develop'

Pattern newmodule

See merge request !105
parents 4d742223 fc3df6d3
No related branches found
No related tags found
3 merge requests!193Release 1.4,!188Release 1.4,!105Pattern newmodule
Pipeline #11862 failed with stages
in 11 minutes and 28 seconds
......@@ -8,8 +8,6 @@ import dios
import numpy as np
import pandas as pd
import scipy
import dtw
import pywt
import itertools
import collections
import numba
......@@ -260,151 +258,6 @@ def flagRange(data, field, flagger, min=-np.inf, max=np.inf, **kwargs):
return data, flagger
@register(masking='all')
def flagPattern(data, field, flagger, reference_field, method='dtw', partition_freq="days", partition_offset='0',
max_distance=0.03, normalized_distance=True, open_end=True, widths=(1, 2, 4, 8),
waveform='mexh', **kwargs):
"""
Implementation of two pattern recognition algorithms:
* Dynamic Time Warping (dtw) [1]
* Pattern recognition via wavelets [2]
The steps are:
1. Get the frequency of partitions, in which the time series has to be divided (for example: a pattern occurs daily,
or every hour)
2. Compare each partition with the given pattern
3. Check if the compared partition contains the pattern or not
4. Flag partition if it contains the pattern
Parameters
----------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
field : str
The fieldname of the column, holding the data-to-be-flagged.
flagger : saqc.flagger.BaseFlagger
A flagger object, holding flags and additional Informations related to `data`.
reference_field : str
Fieldname in `data`, that holds the pattern
method : {'dtw', 'wavelets'}, default 'dtw'.
Pattern Recognition method to be used.
partition_freq : str, default 'days'
Frequency, in which the pattern occurs.
Has to be an offset string or one out of {``'days'``, ``'months'``}. If ``'days'`` or ``'months'`` is passed,
then precise length of partition is calculated from pattern length.
partition_offset : str, default '0'
If partition frequency is given by an offset string and the pattern starts after a timely offset, this offset
is given by `partition_offset`.
(e.g., partition frequency is "1h", pattern starts at 10:15, then offset is "15min").
ax_distance : float, default 0.03
Only effective if method = 'dtw'.
Maximum dtw-distance between partition and pattern, so that partition is recognized as pattern.
(And thus gets flagged.)
normalized_distance : bool, default True.
For dtw. Normalizing dtw-distance (Doesnt refer to statistical normalization, but to a normalization that
makes comparable dtw-distances for probes of different length, see [1] for more details).
open_end : boolean, default True
Only effective if method = 'dtw'.
Weather or not, the ending of the probe and of the pattern have to be projected onto each other in the search
for the optimal dtw-mapping. Recommendation of [1].
widths : tuple[int], default (1,2,4,8)
Only effective if `method` = ``'wavelets'``.
Widths for wavelet decomposition. [2] recommends a dyadic scale.
waveform: str, default 'mexh'
Only effective if `method` = ``'wavelets'``.
Wavelet to be used for decomposition. Default: 'mexh'
Returns
-------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
flagger : saqc.flagger.BaseFlagger
The flagger object, holding flags and additional Informations related to `data`.
Flags values may have changed relatively to the flagger input.
References
----------
[1] https://cran.r-project.org/web/packages/dtw/dtw.pdf
[2] Maharaj, E.A. (2002): Pattern Recognition of Time Series using Wavelets. In: Härdle W., Rönz B. (eds) Compstat.
Physica, Heidelberg, 978-3-7908-1517-7.
"""
test = data[field].copy()
ref = data[reference_field].copy()
pattern_start_date = ref.index[0].time()
pattern_end_date = ref.index[-1].time()
# ## Extract partition frequency from pattern if needed
if not isinstance(partition_freq, str):
raise ValueError('Partition frequency has to be given in string format.')
elif partition_freq == "days" or partition_freq == "months":
# Get partition frequency from reference field
partition_count = (pattern_end_date - pattern_start_date).days
partitions = test.groupby(pd.Grouper(freq="%d D" % (partition_count + 1)))
else:
partitions = test.groupby(pd.Grouper(freq=partition_freq))
# Initializing Wavelets
if method == 'wavelet':
# calculate reference wavelet transform
ref_wl = ref.values.ravel()
# Widths lambda as in Ann Maharaj
cwtmat_ref, freqs = pywt.cwt(ref_wl, widths, waveform)
# Square of matrix elements as Power sum of the matrix
wavepower_ref = np.power(cwtmat_ref, 2)
elif not method == 'dtw':
# No correct method given
raise ValueError('Unable to interpret {} as method.'.format(method))
flags = pd.Series(data=False, index=test.index)
# ## Calculate flags for every partition
partition_min = ref.shape[0]
for _, partition in partitions:
# Ensuring that partition is at least as long as reference pattern
if partition.empty or (partition.shape[0] < partition_min):
continue
if partition_freq == "days" or partition_freq == "months":
# Use only the time frame given by the pattern
test = partition[pattern_start_date:pattern_start_date]
mask = (partition.index >= test.index[0]) & (partition.index <= test.index[-1])
test = partition.loc[mask]
else:
# cut partition according to pattern and offset
start_time = pd.Timedelta(partition_offset) + partition.index[0]
end_time = start_time + pd.Timedelta(pattern_end_date - pattern_start_date)
test = partition[start_time:end_time]
# ## Switch method
if method == 'dtw':
distance = dtw.dtw(test, ref, open_end=open_end, distance_only=True).normalizedDistance
if normalized_distance:
distance = distance / ref.var()
# Partition labeled as pattern by dtw
if distance < max_distance:
flags[partition.index] = True
elif method == 'wavelet':
# calculate reference wavelet transform
test_wl = test.values.ravel()
cwtmat_test, freqs = pywt.cwt(test_wl, widths, 'mexh')
# Square of matrix elements as Power sum of the matrix
wavepower_test = np.power(cwtmat_test, 2)
# Permutation test on Powersum of matrix
p_value = []
for i in range(len(widths)):
x = wavepower_ref[i]
y = wavepower_test[i]
pval = permutation_test(x, y, method='approximate', num_rounds=200, func=lambda x, y: x.sum() / y.sum(),
seed=0)
p_value.append(min(pval, 1 - pval))
# Partition labeled as pattern by wavelet
if min(p_value) >= 0.01:
flags[partition.index] = True
flagger = flagger.setFlags(field, mask, **kwargs)
return data, flagger
@register(masking='field')
def flagMissing(data, field, flagger, nodata=np.nan, **kwargs):
......@@ -871,7 +724,6 @@ def flagCrossScoring(data, field, flagger, fields, thresh, cross_stat='modZscore
return data, flagger
@register(masking='all')
def flagDriftFromNorm(data, field, flagger, fields, segment_freq, norm_spread, norm_frac=0.5,
metric=lambda x, y: scipy.spatial.distance.pdist(np.array([x, y]),
......@@ -895,7 +747,7 @@ def flagDriftFromNorm(data, field, flagger, fields, segment_freq, norm_spread, n
flagger : saqc.flagger.BaseFlagger
A flagger object, holding flags and additional informations related to `data`.
fields : str
List of fieldnames in data, determining wich variables are to be included into the flagging process.
List of fieldnames in data, determining which variables are to be included into the flagging process.
segment_freq : str
An offset string, determining the size of the seperate datachunks that the algorihm is to be piecewise
applied on.
......
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import dtw
import pywt
from mlxtend.evaluate import permutation_test
from saqc.core.register import register
from saqc.lib.tools import customRoller
@register(masking='field')
def flagPattern_wavelet(data, field, flagger, ref_field, widths=(1, 2, 4, 8), waveform='mexh', **kwargs):
"""
Pattern recognition via wavelets.
The steps are:
1. work on chunks returned by a moving window
2. each chunk is compared to the given pattern, using the wavelet algorithm as presented in [1]
3. if the compared chunk is equal to the given pattern it gets flagged
Parameters
----------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
field : str
The fieldname of the data column, you want to correct.
flagger : saqc.flagger.BaseFlagger
A flagger object, holding flags and additional Informations related to `data`.
ref_field: str
The fieldname in `data' which holds the pattern.
widths: tuple of int
Widths for wavelet decomposition. [1] recommends a dyadic scale. Default: (1,2,4,8)
waveform: str.
Wavelet to be used for decomposition. Default: 'mexh'. See [2] for a list.
kwargs
Returns
-------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
Data values may have changed relatively to the data input.
flagger : saqc.flagger.BaseFlagger
The flagger object, holding flags and additional Informations related to `data`.
Flags values may have changed relatively to the flagger input.
References
----------
The underlying pattern recognition algorithm using wavelets is documented here:
[1] Maharaj, E.A. (2002): Pattern Recognition of Time Series using Wavelets. In: Härdle W., Rönz B. (eds) Compstat. Physica, Heidelberg, 978-3-7908-1517-7.
The documentation of the python package used for the wavelt decomposition can be found here:
[2] https://pywavelets.readthedocs.io/en/latest/ref/cwt.html#continuous-wavelet-families
"""
ref = data[ref_field].to_numpy()
cwtmat_ref, _ = pywt.cwt(ref, widths, waveform)
wavepower_ref = np.power(cwtmat_ref, 2)
len_width = len(widths)
def func(x, y):
return x.sum() / y.sum()
def isPattern(chunk):
cwtmat_chunk, _ = pywt.cwt(chunk, widths, waveform)
wavepower_chunk = np.power(cwtmat_chunk, 2)
# Permutation test on Powersum of matrix
for i in range(len_width):
x = wavepower_ref[i]
y = wavepower_chunk[i]
pval = permutation_test(x, y, method='approximate', num_rounds=200, func=func, seed=0)
if min(pval, 1 - pval) > 0.01:
return True
return False
dat = data[field]
sz = len(ref)
mask = customRoller(dat, window=sz, min_periods=sz).apply(isPattern, raw=True)
flagger = flagger.setFlags(field, loc=mask, **kwargs)
return data, flagger
@register(masking='field')
def flagPattern_dtw(data, field, flagger, ref_field, max_distance=0.03, normalize=True, **kwargs):
""" Pattern Recognition via Dynamic Time Warping.
The steps are:
1. work on chunks returned by a moving window
2. each chunk is compared to the given pattern, using the dynamic time warping algorithm as presented in [1]
3. if the compared chunk is equal to the given pattern it gets flagged
Parameters
----------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
field : str
The fieldname of the data column, you want to correct.
flagger : saqc.flagger.BaseFlagger
A flagger object, holding flags and additional Informations related to `data`.
ref_field: str
The fieldname in `data` which holds the pattern.
max_distance: float
Maximum dtw-distance between partition and pattern, so that partition is recognized as pattern. Default: 0.03
normalize: boolean.
Normalizing dtw-distance (see [1]). Default: True
kwargs
Returns
-------
data : dios.DictOfSeries
A dictionary of pandas.Series, holding all the data.
Data values may have changed relatively to the data input.
flagger : saqc.flagger.BaseFlagger
The flagger object, holding flags and additional Informations related to `data`.
Flags values may have changed relatively to the flagger input.
References
----------
Find a nice description of underlying the Dynamic Time Warping Algorithm here:
[1] https://cran.r-project.org/web/packages/dtw/dtw.pdf
"""
ref = data[ref_field]
ref_var = ref.var()
def func(a, b):
return np.linalg.norm(a - b)
def isPattern(chunk):
dist, *_ = dtw.dtw(chunk, ref, func)
if normalize:
dist /= ref_var
return dist < max_distance
dat = data[field]
sz = len(ref)
mask = customRoller(dat, window=sz, min_periods=sz).apply(isPattern, raw=True)
flagger = flagger.setFlags(field, loc=mask, **kwargs)
return data, flagger
......@@ -83,52 +83,6 @@ def course_2(char_dict):
return fix_funk
@pytest.fixture
def course_pattern_1(char_dict):
# Test trapezium pattern - values , that linearly increase till out_val, stay constant for one more value, and then
# decrease again to 0. Sample frequency is 10 min.
def fix_funk(freq='10min',
initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), out_val=5, char_dict=char_dict):
t_index = pd.date_range(initial_index, freq=freq, periods=6)
#data = np.linspace(initial_level, final_level, int(np.floor(len(t_index))))
data = pd.Series(data=0, index=t_index)
data.iloc[2] = out_val
data.iloc[3] = out_val
char_dict['pattern_1'] = data.index
data = DictOfSeries(data=data, columns=['pattern_1'])
return data, char_dict
return fix_funk
@pytest.fixture
def course_pattern_2(char_dict):
# Test trapezium pattern - values , that linearly increase till out_val, stay constant for one more value, and then
# decrease again to 0. Sample frequency is 1 H.
def fix_funk(freq='1 H',
initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), out_val=5, char_dict=char_dict):
t_index = pd.date_range(initial_index, freq=freq, periods=4)
#data = np.linspace(initial_level, final_level, int(np.floor(len(t_index))))
data = pd.Series(data=0, index=t_index)
data.iloc[2] = out_val
data.iloc[3] = out_val
char_dict['pattern_2'] = data.index
data = DictOfSeries(data=data, columns=['pattern_2'])
return data, char_dict
return fix_funk
@pytest.fixture
def course_test(char_dict):
# Test function for pattern detection - same as test pattern for first three values, than constant function
......
......@@ -32,34 +32,6 @@ def test_flagRange(data, field, flagger):
assert (flagged == expected).all()
'''@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("method", ['wavelet', 'dtw'])
@pytest.mark.parametrize("pattern", [pytest.lazy_fixture("course_pattern_1"),
pytest.lazy_fixture("course_pattern_2"),] ,)
def test_flagPattern(course_test, flagger, method, pattern):
pattern_data, dict_pattern = pattern()
# testing the same pattern sampled at different frequencies
if pattern_data.columns == "pattern1":
test_data, *_ = course_test(freq="10 min")
test_data['pattern_data'] = pattern_data.to_df()
flagger = flagger.initFlags(test_data)
data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="1 H", method=method)
assert flagger.isFlagged("data")[dict_pattern["pattern_1"]].all()
if pattern_data.columns == "pattern2":
test_data, *_ = course_test(freq="1 H")
test_data['pattern_data'] = pattern_data.to_df()
flagger = flagger.initFlags(test_data)
data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="days", method=method)
assert flagger.isFlagged("data")[dict_pattern["pattern_2"]].all()
'''
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagSesonalRange(data, field, flagger):
# prepare
......
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
from dios import dios
from saqc.funcs.pattern_rec import *
from test.common import initData, TESTFLAGGER
@pytest.fixture
def data():
return initData(cols=1, start_date="2016-01-01", end_date="2018-12-31", freq="1D")
@pytest.fixture
def field(data):
return data.columns[0]
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagPattern_wavelet(flagger):
data = pd.Series(0, index=pd.date_range(start="2000", end='2001', freq='1d'))
data.iloc[2:4] = 7
pattern = data.iloc[1:6]
data = dios.DictOfSeries(dict(data=data, pattern_data=pattern))
flagger = flagger.initFlags(data)
data, flagger = flagPattern_wavelet(data, "data", flagger, ref_field="pattern_data")
assert (flagger.isFlagged("data")[1:6]).all()
assert (flagger.isFlagged("data")[:1]).any()
assert (flagger.isFlagged("data")[7:]).any()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagPattern_dtw(flagger):
data = pd.Series(0, index=pd.date_range(start="2000", end='2001', freq='1d'))
data.iloc[2:4] = 7
pattern = data.iloc[1:6]
data = dios.DictOfSeries(dict(data=data, pattern_data=pattern))
flagger = flagger.initFlags(data)
data, flagger = flagPattern_dtw(data, "data", flagger, ref_field="pattern_data")
assert (flagger.isFlagged("data")[1:6]).all()
assert (flagger.isFlagged("data")[:1]).any()
assert (flagger.isFlagged("data")[7:]).any()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment