diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index 3cd681221b37cfa17eb580128becd98a1fc774f2..a596ed57c471a1927c830dc889b24b9692425f83 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -8,8 +8,6 @@ import dios import numpy as np import pandas as pd import scipy -import dtw -import pywt import itertools import collections import numba @@ -260,151 +258,6 @@ def flagRange(data, field, flagger, min=-np.inf, max=np.inf, **kwargs): return data, flagger -@register(masking='all') -def flagPattern(data, field, flagger, reference_field, method='dtw', partition_freq="days", partition_offset='0', - max_distance=0.03, normalized_distance=True, open_end=True, widths=(1, 2, 4, 8), - waveform='mexh', **kwargs): - """ - Implementation of two pattern recognition algorithms: - - * Dynamic Time Warping (dtw) [1] - * Pattern recognition via wavelets [2] - - The steps are: - - 1. Get the frequency of partitions, in which the time series has to be divided (for example: a pattern occurs daily, - or every hour) - 2. Compare each partition with the given pattern - 3. Check if the compared partition contains the pattern or not - 4. Flag partition if it contains the pattern - - Parameters - ---------- - data : dios.DictOfSeries - A dictionary of pandas.Series, holding all the data. - field : str - The fieldname of the column, holding the data-to-be-flagged. - flagger : saqc.flagger.BaseFlagger - A flagger object, holding flags and additional Informations related to `data`. - reference_field : str - Fieldname in `data`, that holds the pattern - method : {'dtw', 'wavelets'}, default 'dtw'. - Pattern Recognition method to be used. - partition_freq : str, default 'days' - Frequency, in which the pattern occurs. - Has to be an offset string or one out of {``'days'``, ``'months'``}. If ``'days'`` or ``'months'`` is passed, - then precise length of partition is calculated from pattern length. - partition_offset : str, default '0' - If partition frequency is given by an offset string and the pattern starts after a timely offset, this offset - is given by `partition_offset`. - (e.g., partition frequency is "1h", pattern starts at 10:15, then offset is "15min"). - ax_distance : float, default 0.03 - Only effective if method = 'dtw'. - Maximum dtw-distance between partition and pattern, so that partition is recognized as pattern. - (And thus gets flagged.) - normalized_distance : bool, default True. - For dtw. Normalizing dtw-distance (Doesnt refer to statistical normalization, but to a normalization that - makes comparable dtw-distances for probes of different length, see [1] for more details). - open_end : boolean, default True - Only effective if method = 'dtw'. - Weather or not, the ending of the probe and of the pattern have to be projected onto each other in the search - for the optimal dtw-mapping. Recommendation of [1]. - widths : tuple[int], default (1,2,4,8) - Only effective if `method` = ``'wavelets'``. - Widths for wavelet decomposition. [2] recommends a dyadic scale. - waveform: str, default 'mexh' - Only effective if `method` = ``'wavelets'``. - Wavelet to be used for decomposition. Default: 'mexh' - - Returns - ------- - data : dios.DictOfSeries - A dictionary of pandas.Series, holding all the data. - flagger : saqc.flagger.BaseFlagger - The flagger object, holding flags and additional Informations related to `data`. - Flags values may have changed relatively to the flagger input. - - References - ---------- - [1] https://cran.r-project.org/web/packages/dtw/dtw.pdf - [2] Maharaj, E.A. (2002): Pattern Recognition of Time Series using Wavelets. In: Härdle W., Rönz B. (eds) Compstat. - Physica, Heidelberg, 978-3-7908-1517-7. - """ - - test = data[field].copy() - ref = data[reference_field].copy() - pattern_start_date = ref.index[0].time() - pattern_end_date = ref.index[-1].time() - - # ## Extract partition frequency from pattern if needed - if not isinstance(partition_freq, str): - raise ValueError('Partition frequency has to be given in string format.') - elif partition_freq == "days" or partition_freq == "months": - # Get partition frequency from reference field - partition_count = (pattern_end_date - pattern_start_date).days - partitions = test.groupby(pd.Grouper(freq="%d D" % (partition_count + 1))) - else: - partitions = test.groupby(pd.Grouper(freq=partition_freq)) - - # Initializing Wavelets - if method == 'wavelet': - # calculate reference wavelet transform - ref_wl = ref.values.ravel() - # Widths lambda as in Ann Maharaj - cwtmat_ref, freqs = pywt.cwt(ref_wl, widths, waveform) - # Square of matrix elements as Power sum of the matrix - wavepower_ref = np.power(cwtmat_ref, 2) - elif not method == 'dtw': - # No correct method given - raise ValueError('Unable to interpret {} as method.'.format(method)) - - flags = pd.Series(data=False, index=test.index) - # ## Calculate flags for every partition - partition_min = ref.shape[0] - for _, partition in partitions: - - # Ensuring that partition is at least as long as reference pattern - if partition.empty or (partition.shape[0] < partition_min): - continue - if partition_freq == "days" or partition_freq == "months": - # Use only the time frame given by the pattern - test = partition[pattern_start_date:pattern_start_date] - mask = (partition.index >= test.index[0]) & (partition.index <= test.index[-1]) - test = partition.loc[mask] - else: - # cut partition according to pattern and offset - start_time = pd.Timedelta(partition_offset) + partition.index[0] - end_time = start_time + pd.Timedelta(pattern_end_date - pattern_start_date) - test = partition[start_time:end_time] - # ## Switch method - if method == 'dtw': - distance = dtw.dtw(test, ref, open_end=open_end, distance_only=True).normalizedDistance - if normalized_distance: - distance = distance / ref.var() - # Partition labeled as pattern by dtw - if distance < max_distance: - flags[partition.index] = True - elif method == 'wavelet': - # calculate reference wavelet transform - test_wl = test.values.ravel() - cwtmat_test, freqs = pywt.cwt(test_wl, widths, 'mexh') - # Square of matrix elements as Power sum of the matrix - wavepower_test = np.power(cwtmat_test, 2) - # Permutation test on Powersum of matrix - p_value = [] - for i in range(len(widths)): - x = wavepower_ref[i] - y = wavepower_test[i] - pval = permutation_test(x, y, method='approximate', num_rounds=200, func=lambda x, y: x.sum() / y.sum(), - seed=0) - p_value.append(min(pval, 1 - pval)) - # Partition labeled as pattern by wavelet - if min(p_value) >= 0.01: - flags[partition.index] = True - - flagger = flagger.setFlags(field, mask, **kwargs) - return data, flagger - @register(masking='field') def flagMissing(data, field, flagger, nodata=np.nan, **kwargs): @@ -871,7 +724,6 @@ def flagCrossScoring(data, field, flagger, fields, thresh, cross_stat='modZscore return data, flagger - @register(masking='all') def flagDriftFromNorm(data, field, flagger, fields, segment_freq, norm_spread, norm_frac=0.5, metric=lambda x, y: scipy.spatial.distance.pdist(np.array([x, y]), @@ -895,7 +747,7 @@ def flagDriftFromNorm(data, field, flagger, fields, segment_freq, norm_spread, n flagger : saqc.flagger.BaseFlagger A flagger object, holding flags and additional informations related to `data`. fields : str - List of fieldnames in data, determining wich variables are to be included into the flagging process. + List of fieldnames in data, determining which variables are to be included into the flagging process. segment_freq : str An offset string, determining the size of the seperate datachunks that the algorihm is to be piecewise applied on. diff --git a/saqc/funcs/pattern_rec.py b/saqc/funcs/pattern_rec.py new file mode 100644 index 0000000000000000000000000000000000000000..83f392df76a9c0dfc5a1a1af2e7fce108c92caf9 --- /dev/null +++ b/saqc/funcs/pattern_rec.py @@ -0,0 +1,152 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + +import numpy as np +import pandas as pd +import dtw +import pywt +from mlxtend.evaluate import permutation_test + +from saqc.core.register import register +from saqc.lib.tools import customRoller + + +@register(masking='field') +def flagPattern_wavelet(data, field, flagger, ref_field, widths=(1, 2, 4, 8), waveform='mexh', **kwargs): + """ + Pattern recognition via wavelets. + + The steps are: + 1. work on chunks returned by a moving window + 2. each chunk is compared to the given pattern, using the wavelet algorithm as presented in [1] + 3. if the compared chunk is equal to the given pattern it gets flagged + + Parameters + ---------- + + data : dios.DictOfSeries + A dictionary of pandas.Series, holding all the data. + field : str + The fieldname of the data column, you want to correct. + flagger : saqc.flagger.BaseFlagger + A flagger object, holding flags and additional Informations related to `data`. + ref_field: str + The fieldname in `data' which holds the pattern. + widths: tuple of int + Widths for wavelet decomposition. [1] recommends a dyadic scale. Default: (1,2,4,8) + waveform: str. + Wavelet to be used for decomposition. Default: 'mexh'. See [2] for a list. + + kwargs + + Returns + ------- + data : dios.DictOfSeries + A dictionary of pandas.Series, holding all the data. + Data values may have changed relatively to the data input. + flagger : saqc.flagger.BaseFlagger + The flagger object, holding flags and additional Informations related to `data`. + Flags values may have changed relatively to the flagger input. + + + References + ---------- + + The underlying pattern recognition algorithm using wavelets is documented here: + [1] Maharaj, E.A. (2002): Pattern Recognition of Time Series using Wavelets. In: Härdle W., Rönz B. (eds) Compstat. Physica, Heidelberg, 978-3-7908-1517-7. + + The documentation of the python package used for the wavelt decomposition can be found here: + [2] https://pywavelets.readthedocs.io/en/latest/ref/cwt.html#continuous-wavelet-families + """ + + ref = data[ref_field].to_numpy() + cwtmat_ref, _ = pywt.cwt(ref, widths, waveform) + wavepower_ref = np.power(cwtmat_ref, 2) + len_width = len(widths) + + def func(x, y): + return x.sum() / y.sum() + + def isPattern(chunk): + cwtmat_chunk, _ = pywt.cwt(chunk, widths, waveform) + wavepower_chunk = np.power(cwtmat_chunk, 2) + + # Permutation test on Powersum of matrix + for i in range(len_width): + x = wavepower_ref[i] + y = wavepower_chunk[i] + pval = permutation_test(x, y, method='approximate', num_rounds=200, func=func, seed=0) + if min(pval, 1 - pval) > 0.01: + return True + return False + + dat = data[field] + sz = len(ref) + mask = customRoller(dat, window=sz, min_periods=sz).apply(isPattern, raw=True) + + flagger = flagger.setFlags(field, loc=mask, **kwargs) + return data, flagger + + +@register(masking='field') +def flagPattern_dtw(data, field, flagger, ref_field, max_distance=0.03, normalize=True, **kwargs): + """ Pattern Recognition via Dynamic Time Warping. + + The steps are: + 1. work on chunks returned by a moving window + 2. each chunk is compared to the given pattern, using the dynamic time warping algorithm as presented in [1] + 3. if the compared chunk is equal to the given pattern it gets flagged + + Parameters + ---------- + + data : dios.DictOfSeries + A dictionary of pandas.Series, holding all the data. + field : str + The fieldname of the data column, you want to correct. + flagger : saqc.flagger.BaseFlagger + A flagger object, holding flags and additional Informations related to `data`. + ref_field: str + The fieldname in `data` which holds the pattern. + max_distance: float + Maximum dtw-distance between partition and pattern, so that partition is recognized as pattern. Default: 0.03 + normalize: boolean. + Normalizing dtw-distance (see [1]). Default: True + + + kwargs + + Returns + ------- + data : dios.DictOfSeries + A dictionary of pandas.Series, holding all the data. + Data values may have changed relatively to the data input. + flagger : saqc.flagger.BaseFlagger + The flagger object, holding flags and additional Informations related to `data`. + Flags values may have changed relatively to the flagger input. + + + References + ---------- + Find a nice description of underlying the Dynamic Time Warping Algorithm here: + + [1] https://cran.r-project.org/web/packages/dtw/dtw.pdf + """ + ref = data[ref_field] + ref_var = ref.var() + + def func(a, b): + return np.linalg.norm(a - b) + + def isPattern(chunk): + dist, *_ = dtw.dtw(chunk, ref, func) + if normalize: + dist /= ref_var + return dist < max_distance + + dat = data[field] + sz = len(ref) + mask = customRoller(dat, window=sz, min_periods=sz).apply(isPattern, raw=True) + + flagger = flagger.setFlags(field, loc=mask, **kwargs) + return data, flagger diff --git a/test/funcs/conftest.py b/test/funcs/conftest.py index 32017f6b002e30cd3c3bd50ab74854d026b56d4e..1fd4685e6c0aca0015b8f2cbcb4cf67be9a4ec75 100644 --- a/test/funcs/conftest.py +++ b/test/funcs/conftest.py @@ -83,52 +83,6 @@ def course_2(char_dict): return fix_funk - -@pytest.fixture -def course_pattern_1(char_dict): - # Test trapezium pattern - values , that linearly increase till out_val, stay constant for one more value, and then - # decrease again to 0. Sample frequency is 10 min. - def fix_funk(freq='10min', - initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), out_val=5, char_dict=char_dict): - - t_index = pd.date_range(initial_index, freq=freq, periods=6) - #data = np.linspace(initial_level, final_level, int(np.floor(len(t_index)))) - - data = pd.Series(data=0, index=t_index) - data.iloc[2] = out_val - data.iloc[3] = out_val - char_dict['pattern_1'] = data.index - - data = DictOfSeries(data=data, columns=['pattern_1']) - return data, char_dict - - return fix_funk - - - -@pytest.fixture -def course_pattern_2(char_dict): - # Test trapezium pattern - values , that linearly increase till out_val, stay constant for one more value, and then - # decrease again to 0. Sample frequency is 1 H. - def fix_funk(freq='1 H', - initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), out_val=5, char_dict=char_dict): - - t_index = pd.date_range(initial_index, freq=freq, periods=4) - #data = np.linspace(initial_level, final_level, int(np.floor(len(t_index)))) - - data = pd.Series(data=0, index=t_index) - data.iloc[2] = out_val - data.iloc[3] = out_val - char_dict['pattern_2'] = data.index - - data = DictOfSeries(data=data, columns=['pattern_2']) - return data, char_dict - - return fix_funk - - - - @pytest.fixture def course_test(char_dict): # Test function for pattern detection - same as test pattern for first three values, than constant function diff --git a/test/funcs/test_functions.py b/test/funcs/test_functions.py index d81efd6c9b5ee4e958bd3f4fdb613ebf9ac50cb6..8670e09a2e675c7fa5b9338916e4c62104948090 100644 --- a/test/funcs/test_functions.py +++ b/test/funcs/test_functions.py @@ -32,34 +32,6 @@ def test_flagRange(data, field, flagger): assert (flagged == expected).all() -'''@pytest.mark.parametrize("flagger", TESTFLAGGER) -@pytest.mark.parametrize("method", ['wavelet', 'dtw']) -@pytest.mark.parametrize("pattern", [pytest.lazy_fixture("course_pattern_1"), - pytest.lazy_fixture("course_pattern_2"),] ,) - -def test_flagPattern(course_test, flagger, method, pattern): - pattern_data, dict_pattern = pattern() - - # testing the same pattern sampled at different frequencies - if pattern_data.columns == "pattern1": - test_data, *_ = course_test(freq="10 min") - test_data['pattern_data'] = pattern_data.to_df() - flagger = flagger.initFlags(test_data) - data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="1 H", method=method) - assert flagger.isFlagged("data")[dict_pattern["pattern_1"]].all() - if pattern_data.columns == "pattern2": - test_data, *_ = course_test(freq="1 H") - test_data['pattern_data'] = pattern_data.to_df() - flagger = flagger.initFlags(test_data) - data, flagger = flagPattern(test_data, "data", flagger, reference_field="pattern_data", partition_freq="days", method=method) - assert flagger.isFlagged("data")[dict_pattern["pattern_2"]].all() -''' - - - - - - @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_flagSesonalRange(data, field, flagger): # prepare diff --git a/test/funcs/test_pattern_rec.py b/test/funcs/test_pattern_rec.py new file mode 100644 index 0000000000000000000000000000000000000000..66ebcbfd1fdf13f5cb30cb5bd34a5a457a31dc3d --- /dev/null +++ b/test/funcs/test_pattern_rec.py @@ -0,0 +1,50 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + +import pytest +from dios import dios + +from saqc.funcs.pattern_rec import * +from test.common import initData, TESTFLAGGER + + +@pytest.fixture +def data(): + return initData(cols=1, start_date="2016-01-01", end_date="2018-12-31", freq="1D") + + +@pytest.fixture +def field(data): + return data.columns[0] + + +@pytest.mark.parametrize("flagger", TESTFLAGGER) +def test_flagPattern_wavelet(flagger): + + data = pd.Series(0, index=pd.date_range(start="2000", end='2001', freq='1d')) + data.iloc[2:4] = 7 + pattern = data.iloc[1:6] + + data = dios.DictOfSeries(dict(data=data, pattern_data=pattern)) + + flagger = flagger.initFlags(data) + data, flagger = flagPattern_wavelet(data, "data", flagger, ref_field="pattern_data") + assert (flagger.isFlagged("data")[1:6]).all() + assert (flagger.isFlagged("data")[:1]).any() + assert (flagger.isFlagged("data")[7:]).any() + + +@pytest.mark.parametrize("flagger", TESTFLAGGER) +def test_flagPattern_dtw(flagger): + + data = pd.Series(0, index=pd.date_range(start="2000", end='2001', freq='1d')) + data.iloc[2:4] = 7 + pattern = data.iloc[1:6] + + data = dios.DictOfSeries(dict(data=data, pattern_data=pattern)) + + flagger = flagger.initFlags(data) + data, flagger = flagPattern_dtw(data, "data", flagger, ref_field="pattern_data") + assert (flagger.isFlagged("data")[1:6]).all() + assert (flagger.isFlagged("data")[:1]).any() + assert (flagger.isFlagged("data")[7:]).any()