Skip to content
Snippets Groups Projects
Commit 3494b6ae authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

sampling rate estimator implemented

parent 8f4fa227
No related branches found
No related tags found
2 merge requests!193Release 1.4,!188Release 1.4
Pipeline #5871 passed with stage
in 6 minutes and 52 seconds
Subproject commit 3d2d5945ef80beab65863f8159e03130ce760f9d
Subproject commit e9a80225b02799fa668882149a39f4a734b4f280
......@@ -7,13 +7,14 @@ from typing import Sequence, Union, Any, Iterator
import numpy as np
import numba as nb
import pandas as pd
import logging
import dios
import inspect
# from saqc.flagger import BaseFlagger
from saqc.lib.types import T
logger = logging.getLogger("SaQC")
def assertScalar(name, value, optional=False):
if (not np.isscalar(value)) and (value is not None) and (optional is True):
......@@ -349,3 +350,97 @@ def mutateIndex(index, old_name, new_name):
index = index.drop(index[pos])
index = index.insert(pos, new_name)
return index
def _sampling_mode_iterator(sub_index_dict, uniformity_dict, sample_rate_dict, x_data, bin_accuracy=60,
min_bandwidth_share=0.1):
"""
the function is called by the "estimate_sampling_rates" function.
Its purpose is to decompose a given index into its different sampling frequencies and return
frequencies and indices belonging to a frequencies sampling.
The "bin_accuracy" parameter refers to the detection accuracy. It has dimension of seconds.
The "min_bandwidth_share" refers to the minimum percentage the values associated with a frequencie must contribute
to the total number of samples, to be considered a significant frequency mode of the index.
(0.1 means, you can have up to 10 different frequencies, consisting of 10 percent of the total values each.)
"""
out_sub_dict = sub_index_dict.copy()
out_uni_dict = uniformity_dict.copy()
out_rate_dict = sample_rate_dict.copy()
for mode in sub_index_dict.keys():
if not uniformity_dict[mode]:
x_data_diff = np.diff(x_data[sub_index_dict[mode]])
q_mask = np.logical_and(np.quantile(x_data_diff, 0.01) - 60 < x_data_diff,
x_data_diff < np.quantile(x_data_diff, 0.99) + 60)
x_cutted_of = x_data_diff[q_mask]
bins = np.arange(30, int(np.ceil(max(x_cutted_of))) + 90)[::bin_accuracy]
bins = np.concatenate((np.array([0]), bins))
hist, bins = np.histogram(x_cutted_of, bins=bins)
sub_modes = np.where(hist > len(x_data) / min_bandwidth_share)[0]
if len(sub_modes) == 1:
out_uni_dict[mode] = True
out_rate_dict[mode] = (bins[sub_modes[0]], bins[sub_modes[0] + 1])
elif len(sub_modes) > 1:
sub_count = 1
for sub_mode in sub_modes:
sub_index = np.where(np.logical_and(bins[sub_mode] < x_data_diff,
x_data_diff < bins[sub_mode + 1]))[0]
new_mode_name = mode + '.' + str(sub_count)
out_sub_dict[new_mode_name] = sub_index
out_uni_dict[new_mode_name] = False
sub_count += 1
out_sub_dict.pop(mode)
out_uni_dict.pop(mode)
return out_sub_dict, out_uni_dict, out_rate_dict
def estimate_sampling_rates(index, freq=None):
"""
Function estimates the sampling rate(s) an index includes.
If freq is passed, additionally a warning is logged, if freq is inconsistent with the sampling rate estimate.
In the current implementation, estimation accuracy is one Minute. (With an extra bin for frequencies < 30 seconds)
So the functions purpose is not to detect slight drifts in the frequencie, but to detect mixing of/changing between
significantly differing sampling rates.
Parameters
----------
index : pd.DatetimeIndex
Index, the sampling modes are estimated of.
freq : Offsetstring or None, default None
Frequencie of wich consistence with the estimate is checked. None (default) skips check.
Returns
-------
sample_rates : set
Set of Tuples (x,y). Any tuple indicates that tthere is a sampling frequency f in the index detectable,
so that "x seconds" < f.seconds < "y seconds".
"""
index_data = index.to_numpy(float)
x_data = index_data * 10 ** (-9)
sub_index_dict = {'mode_1': np.arange(0, len(x_data))}
uniformity_dict = {'mode_1': False}
sample_rate_dict = {}
k = 0
while any(val == False for val in uniformity_dict.values()):
sub_index_dict, uniformity_dict, sample_rate_dict = _sampling_mode_iterator(sub_index_dict, uniformity_dict,
sample_rate_dict, x_data)
if k > 20:
logger.warning('Sample rate estimation failed. Too many iterations while splitting into modes.')
break
sample_rates = set(sample_rate_dict.values())
if len(sample_rates) > 1:
logger.warning('Multiple sampling modes detected: {}'.format(str(sample_rates)
+ ' (min seconds, max seconds)'))
if freq:
t_seconds = pd.Timedelta(freq).total_seconds()
eval_freq = any([True if x < t_seconds < y else False for (x, y) in sample_rates])
if not eval_freq:
logger.warning('Frequency passed does not fit any of the estimated data sampling modes.')
return sample_rates
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment