Skip to content
Snippets Groups Projects
Commit fd3890b2 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

added testmodule - fixed minor bugs encountered while testmoduling

parent be7208a1
No related branches found
No related tags found
4 merge requests!193Release 1.4,!188Release 1.4,!49Dataprocessing features,!44Dataprocessing features
Pipeline #3637 passed with stage
in 14 minutes and 3 seconds
......@@ -41,9 +41,11 @@ def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total=Non
flag_agg_func='max', **kwargs):
data = data.copy()
datcol = data[field]
d_start = datcol.index[0].floor(freq)
d_end = datcol.index[-1].ceil(freq)
# filter data for invalid patterns
if (max_invalid_total is None) | (max_invalid_consec is None):
if (max_invalid_total is not None) | (max_invalid_consec is not None):
if not max_invalid_total:
max_invalid_total = np.inf
if not max_invalid_consec:
......@@ -72,6 +74,11 @@ def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total=Non
flag_agg_func = composeFunction(flag_agg_func)
datflags = flagsresampler.apply(flag_agg_func)
# insert freqgrid (for consistency reasons -> in above step, start and ending chunks can get lost due to invalid
# intervals):
grid = pd.date_range(d_start, d_end, freq=freq)
datcol = datcol.reindex(grid)
datflags = datflags.reindex(grid)
# data/flags reshaping:
data[field] = datcol
reshape_flagger = flagger.initFlags(datcol).setFlags(field, flag=datflags, force=True, **kwargs)
......@@ -83,5 +90,8 @@ def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total=Non
def proc_transform(data, field, flagger, func, **kwargs):
data = data.copy()
func = composeFunction(func)
data[field] = data[field].transform(func)
# NOTE: avoiding pd.Series.transform() in the line below, because transform does process columns element wise
# (so interpolations wouldn't work)
new_col = pd.Series(func(data[field]), index=data[field].index)
data[field] = new_col
return data, flagger
\ No newline at end of file
......@@ -158,7 +158,9 @@ def spikes_flagMultivarScores(data, field, flagger, fields, trafo='normScale', a
)
val_frame.dropna(inplace=True)
val_frame = val_frame.transform(trafo_dict)
for field in val_frame.columns:
val_frame[field] = trafo_dict[field](val_frame[field])
if threshing == 'stray':
to_flag_index = _stray(val_frame,
......
......@@ -84,6 +84,7 @@ def evalFuncString(full_func_string):
kwarg_dict = {}
if len(paras) > 0:
paras = [float(x) if x.isnumeric() else x for x in paras]
para_names = inspect.getfullargspec(func).args[1:1 + len(paras)]
kwarg_dict.update(dict(zip(para_names, paras)))
......
......@@ -167,7 +167,7 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio
:return:
"""
inter_limit = int(inter_limit)
data = pd.Series(data).copy()
gap_mask = (data.rolling(inter_limit, min_periods=0).apply(lambda x: np.sum(np.isnan(x)), raw=True)) != inter_limit
......
......@@ -128,22 +128,20 @@ def course_4(char_dict):
@pytest.fixture
def course_5(char_dict):
# NAN_holes values , that remain on value level "base_level" and than begin exposing an outlierish or
# spikey value of magnitude "out_val" every second timestep, starting at periods/2, with the first spike. number
# NAN_holes values , that ascend from initial_level to final_level linearly and have missing data(=nan)
# at posiiotns "nan_slice", (=a slice or a list, for iloc indexing)
# periods better be even!
# periods better be greater 5
def fix_funk(freq='10min', periods=100, nan_slice=slice(0, None, 5), initial_level=0, final_level=10,
def fix_funk(freq='10min', periods=10, nan_slice=slice(0, None, 5), initial_level=0, final_level=10,
initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), char_dict=char_dict):
t_index = pd.date_range(initial_index, freq=freq, periods=periods)
values = np.linspace(initial_level, final_level, periods)
s = pd.Series(values, index=t_index)
s[nan_slice] = np.nan
char_dict['missing'] = s[nan_slice].index
s.iloc[nan_slice] = np.nan
char_dict['missing'] = s.iloc[nan_slice].index
data = DictOfSeries(data=s, columns=['data'])
return data, char_dict
return fix_funk
return fix_funk
\ No newline at end of file
......@@ -15,7 +15,45 @@ from saqc.funcs.proc_functions import (
from test.common import TESTFLAGGER
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_interpolateMissing(course_1, flagger):
data, *_ = course_1(periods=100)
data[1] = np.nan
data[]
def test_interpolateMissing(course_5, flagger):
data, characteristics = course_5(periods=10, nan_slice=[5])
field = data.columns[0]
data = dios.DictOfSeries(data)
flagger = flagger.initFlags(data)
dataLin, *_ = proc_interpolateMissing(data, field, flagger, method='linear')
dataPoly, *_ = proc_interpolateMissing(data, field, flagger, method='polynomial')
assert dataLin[field][characteristics['missing']].notna().all()
assert dataPoly[field][characteristics['missing']].notna().all()
data, characteristics = course_5(periods=10, nan_slice=[5, 6, 7])
dataLin1, *_ = proc_interpolateMissing(data, field, flagger, method='linear', inter_limit=2)
dataLin2, *_ = proc_interpolateMissing(data, field, flagger, method='linear', inter_limit=3)
dataLin3, *_ = proc_interpolateMissing(data, field, flagger, method='linear', inter_limit=4)
assert dataLin1[field][characteristics['missing']].isna().all()
assert dataLin2[field][characteristics['missing']].isna().all()
assert dataLin3[field][characteristics['missing']].notna().all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_transform(course_5, flagger):
data, characteristics = course_5(periods=10, nan_slice=[5, 6])
field = data.columns[0]
data = dios.DictOfSeries(data)
flagger = flagger.initFlags(data)
data1, *_ = proc_transform(data, field, flagger, func='linear')
assert data1[field][characteristics['missing']].isna().all()
data1, *_ = proc_transform(data, field, flagger, func='linear$3')
assert data1[field][characteristics['missing']].notna().all()
data1, *_ = proc_transform(data, field, flagger, func='polynomial$3$3')
assert data1[field][characteristics['missing']].notna().all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_resample(course_5, flagger):
data, characteristics = course_5(freq='1min', periods=30, nan_slice=[1, 11, 12, 22, 24, 26])
field = data.columns[0]
data = dios.DictOfSeries(data)
flagger = flagger.initFlags(data)
data1, *_ = proc_resample(data, field, flagger, '10min', 'mean', max_invalid_total=2, max_invalid_consec=1)
assert ~np.isnan(data1[field].iloc[0])
assert np.isnan(data1[field].iloc[1])
assert np.isnan(data1[field].iloc[2])
\ No newline at end of file
......@@ -101,7 +101,7 @@ def test_flagSpikesLimitRaise(dat, flagger):
# see test/functs/conftest.py for the 'course_N'
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_3")])
def test_flagSpikesOddWater(dat, flagger):
def test_flagMultivarScores(dat, flagger):
data1, characteristics = dat(periods=1000, initial_level=5, final_level=15, out_val=50)
data2, characteristics = dat(periods=1000, initial_level=20, final_level=1, out_val=30)
field = "dummy"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment