diff --git a/saqc/funcs/proc_functions.py b/saqc/funcs/proc_functions.py index 26dc78a7f35ca831aecb267603ed8f5118566945..d6a1f2cf81c0daca8faa11d343acd70edff05982 100644 --- a/saqc/funcs/proc_functions.py +++ b/saqc/funcs/proc_functions.py @@ -41,9 +41,11 @@ def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total=Non flag_agg_func='max', **kwargs): data = data.copy() datcol = data[field] + d_start = datcol.index[0].floor(freq) + d_end = datcol.index[-1].ceil(freq) # filter data for invalid patterns - if (max_invalid_total is None) | (max_invalid_consec is None): + if (max_invalid_total is not None) | (max_invalid_consec is not None): if not max_invalid_total: max_invalid_total = np.inf if not max_invalid_consec: @@ -72,6 +74,11 @@ def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total=Non flag_agg_func = composeFunction(flag_agg_func) datflags = flagsresampler.apply(flag_agg_func) + # insert freqgrid (for consistency reasons -> in above step, start and ending chunks can get lost due to invalid + # intervals): + grid = pd.date_range(d_start, d_end, freq=freq) + datcol = datcol.reindex(grid) + datflags = datflags.reindex(grid) # data/flags reshaping: data[field] = datcol reshape_flagger = flagger.initFlags(datcol).setFlags(field, flag=datflags, force=True, **kwargs) @@ -83,5 +90,8 @@ def proc_resample(data, field, flagger, freq, func="mean", max_invalid_total=Non def proc_transform(data, field, flagger, func, **kwargs): data = data.copy() func = composeFunction(func) - data[field] = data[field].transform(func) + # NOTE: avoiding pd.Series.transform() in the line below, because transform does process columns element wise + # (so interpolations wouldn't work) + new_col = pd.Series(func(data[field]), index=data[field].index) + data[field] = new_col return data, flagger \ No newline at end of file diff --git a/saqc/funcs/spikes_detection.py b/saqc/funcs/spikes_detection.py index 666f97da7f4c395e55f0b2d0129d461b4cde3484..c8ba91e7b37d44aa2f9fb742214d6a563376c5e5 100644 --- a/saqc/funcs/spikes_detection.py +++ b/saqc/funcs/spikes_detection.py @@ -158,7 +158,9 @@ def spikes_flagMultivarScores(data, field, flagger, fields, trafo='normScale', a ) val_frame.dropna(inplace=True) - val_frame = val_frame.transform(trafo_dict) + for field in val_frame.columns: + val_frame[field] = trafo_dict[field](val_frame[field]) + if threshing == 'stray': to_flag_index = _stray(val_frame, diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index 9825b39342389deabf2cd7e872b0a428c878050c..70c32da8f297bd3beebcf1c4f493fc3894043914 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -84,6 +84,7 @@ def evalFuncString(full_func_string): kwarg_dict = {} if len(paras) > 0: + paras = [float(x) if x.isnumeric() else x for x in paras] para_names = inspect.getfullargspec(func).args[1:1 + len(paras)] kwarg_dict.update(dict(zip(para_names, paras))) diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index ebfa042e07647fdbf39b8563b85ee1f3816ab5f4..2f847387b51b315f5550eaa27f57aaa4be069481 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -167,7 +167,7 @@ def interpolateNANs(data, method, order=2, inter_limit=2, downgrade_interpolatio :return: """ - + inter_limit = int(inter_limit) data = pd.Series(data).copy() gap_mask = (data.rolling(inter_limit, min_periods=0).apply(lambda x: np.sum(np.isnan(x)), raw=True)) != inter_limit diff --git a/test/funcs/conftest.py b/test/funcs/conftest.py index 430aaae5159d260141e73223a8cf9723f14bdba1..c19280c01d8af2beb7881d718684b5cf556e9ecc 100644 --- a/test/funcs/conftest.py +++ b/test/funcs/conftest.py @@ -128,22 +128,20 @@ def course_4(char_dict): @pytest.fixture def course_5(char_dict): - # NAN_holes values , that remain on value level "base_level" and than begin exposing an outlierish or - # spikey value of magnitude "out_val" every second timestep, starting at periods/2, with the first spike. number + # NAN_holes values , that ascend from initial_level to final_level linearly and have missing data(=nan) + # at posiiotns "nan_slice", (=a slice or a list, for iloc indexing) # periods better be even! # periods better be greater 5 - def fix_funk(freq='10min', periods=100, nan_slice=slice(0, None, 5), initial_level=0, final_level=10, + def fix_funk(freq='10min', periods=10, nan_slice=slice(0, None, 5), initial_level=0, final_level=10, initial_index=pd.Timestamp(2000, 1, 1, 0, 0, 0), char_dict=char_dict): t_index = pd.date_range(initial_index, freq=freq, periods=periods) values = np.linspace(initial_level, final_level, periods) s = pd.Series(values, index=t_index) - s[nan_slice] = np.nan - char_dict['missing'] = s[nan_slice].index + s.iloc[nan_slice] = np.nan + char_dict['missing'] = s.iloc[nan_slice].index data = DictOfSeries(data=s, columns=['data']) return data, char_dict return fix_funk - - return fix_funk \ No newline at end of file diff --git a/test/funcs/test_proc_functions.py b/test/funcs/test_proc_functions.py index 48777e7da7bfb9a0e3b8694c7e731d6399edadb9..5b9ef99c7e55c633543035c45806f4c65f9729f3 100644 --- a/test/funcs/test_proc_functions.py +++ b/test/funcs/test_proc_functions.py @@ -15,7 +15,45 @@ from saqc.funcs.proc_functions import ( from test.common import TESTFLAGGER @pytest.mark.parametrize("flagger", TESTFLAGGER) -def test_interpolateMissing(course_1, flagger): - data, *_ = course_1(periods=100) - data[1] = np.nan - data[] +def test_interpolateMissing(course_5, flagger): + data, characteristics = course_5(periods=10, nan_slice=[5]) + field = data.columns[0] + data = dios.DictOfSeries(data) + flagger = flagger.initFlags(data) + dataLin, *_ = proc_interpolateMissing(data, field, flagger, method='linear') + dataPoly, *_ = proc_interpolateMissing(data, field, flagger, method='polynomial') + assert dataLin[field][characteristics['missing']].notna().all() + assert dataPoly[field][characteristics['missing']].notna().all() + data, characteristics = course_5(periods=10, nan_slice=[5, 6, 7]) + dataLin1, *_ = proc_interpolateMissing(data, field, flagger, method='linear', inter_limit=2) + dataLin2, *_ = proc_interpolateMissing(data, field, flagger, method='linear', inter_limit=3) + dataLin3, *_ = proc_interpolateMissing(data, field, flagger, method='linear', inter_limit=4) + assert dataLin1[field][characteristics['missing']].isna().all() + assert dataLin2[field][characteristics['missing']].isna().all() + assert dataLin3[field][characteristics['missing']].notna().all() + + +@pytest.mark.parametrize("flagger", TESTFLAGGER) +def test_transform(course_5, flagger): + data, characteristics = course_5(periods=10, nan_slice=[5, 6]) + field = data.columns[0] + data = dios.DictOfSeries(data) + flagger = flagger.initFlags(data) + data1, *_ = proc_transform(data, field, flagger, func='linear') + assert data1[field][characteristics['missing']].isna().all() + data1, *_ = proc_transform(data, field, flagger, func='linear$3') + assert data1[field][characteristics['missing']].notna().all() + data1, *_ = proc_transform(data, field, flagger, func='polynomial$3$3') + assert data1[field][characteristics['missing']].notna().all() + + +@pytest.mark.parametrize("flagger", TESTFLAGGER) +def test_resample(course_5, flagger): + data, characteristics = course_5(freq='1min', periods=30, nan_slice=[1, 11, 12, 22, 24, 26]) + field = data.columns[0] + data = dios.DictOfSeries(data) + flagger = flagger.initFlags(data) + data1, *_ = proc_resample(data, field, flagger, '10min', 'mean', max_invalid_total=2, max_invalid_consec=1) + assert ~np.isnan(data1[field].iloc[0]) + assert np.isnan(data1[field].iloc[1]) + assert np.isnan(data1[field].iloc[2]) \ No newline at end of file diff --git a/test/funcs/test_spikes_detection.py b/test/funcs/test_spikes_detection.py index 77f729c2adc27d0ec98eb4f971593d29c681f5ba..3688f07e66f8692b7d072c13b7f6c28a8048d41e 100644 --- a/test/funcs/test_spikes_detection.py +++ b/test/funcs/test_spikes_detection.py @@ -101,7 +101,7 @@ def test_flagSpikesLimitRaise(dat, flagger): # see test/functs/conftest.py for the 'course_N' @pytest.mark.parametrize("flagger", TESTFLAGGER) @pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_3")]) -def test_flagSpikesOddWater(dat, flagger): +def test_flagMultivarScores(dat, flagger): data1, characteristics = dat(periods=1000, initial_level=5, final_level=15, out_val=50) data2, characteristics = dat(periods=1000, initial_level=20, final_level=1, out_val=30) field = "dummy"