diff --git a/saqc/lib/rolling.py b/saqc/lib/rolling.py index ba529fa5359f1e9707d12510939b524d0f6d4e65..337f599161b316001ed93a6d7ff07cf8862c0330 100644 --- a/saqc/lib/rolling.py +++ b/saqc/lib/rolling.py @@ -75,8 +75,6 @@ class _CustomBaseIndexer(BaseIndexer): self.skip = ~self.skip def get_window_bounds(self, num_values=0, min_periods=None, center=False, closed=None): - if min_periods is not None: - self.min_periods = max(self.min_periods, min_periods) num_values = self.num_values min_periods = self.min_periods center = self.center @@ -134,17 +132,18 @@ class _FixedWindowDirectionIndexer(_CustomBaseIndexer): def validate(self) -> None: super().validate() - # if self.closed is not None: - # raise ValueError("closed only implemented for datetimelike and offset based windows") + if self.min_periods is None: + self.min_periods = self.window_size def _get_bounds(self, num_values=0, min_periods=None, center=False, closed=None): + # closed is always ignored and handled as 'both' other cases not implemented offset = calculate_center_offset(self.window_size) if center else 0 num_values += offset if self.forward: - start, end = self._fw(num_values, min_periods, center, closed, offset) + start, end = self._fw(num_values, offset) else: - start, end = self._bw(num_values, min_periods, center, closed, offset) + start, end = self._bw(num_values, offset) if center: start, end = self._center_result(start, end, offset) @@ -156,6 +155,8 @@ class _FixedWindowDirectionIndexer(_CustomBaseIndexer): return start, end def _center_result(self, start, end, offset): + # cut N values at the front that was inserted in _fw() + # or cut N values at the end if _bw() if offset > 0: if self.forward: start = start[:-offset] @@ -178,20 +179,14 @@ class _FixedWindowDirectionIndexer(_CustomBaseIndexer): return start, end - def _bw(self, num_values=0, min_periods=None, center=False, closed=None, offset=0): - # code taken from pd.core.windows.indexer.FixedWindowIndexer - start_s = np.zeros(self.window_size, dtype="int64") - start_e = (np.arange(self.window_size, num_values, dtype="int64") - self.window_size + 1) - start = np.concatenate([start_s, start_e])[:num_values] - - end_s = np.arange(self.window_size, dtype="int64") + 1 - end_e = start_e + self.window_size - end = np.concatenate([end_s, end_e])[:num_values] - # end stolen code + def _bw(self, num_values=0, offset=0): + start = np.arange(-self.window_size, num_values + offset, dtype="int64") + 1 + end = start + self.window_size + start[:self.window_size] = 0 return start, end - def _fw(self, num_values=0, min_periods=None, center=False, closed=None, offset=0): - start = np.arange(-offset, num_values, dtype="int64")[:num_values] + def _fw(self, num_values=0, offset=0): + start = np.arange(-offset, num_values, dtype="int64") end = start + self.window_size start[:offset] = 0 return start, end @@ -208,6 +203,8 @@ class _VariableWindowDirectionIndexer(_CustomBaseIndexer): super().validate() if self.min_periods is None: self.min_periods = 1 + if self.window_size == 0: + self.min_periods = 0 def _get_bounds(self, num_values=0, min_periods=None, center=False, closed=None): ws_bw, ws_fw = self._get_center_window_sizes(self.window_size) @@ -250,12 +247,12 @@ class _VariableWindowDirectionIndexer(_CustomBaseIndexer): return start, end - def _bw(self, num_values, window_size, closed=None): + def _bw(self, num_values, window_size, closed): arr = self.index_array start, end = calculate_variable_window_bounds(num_values, window_size, None, None, closed, arr) return start, end - def _fw(self, num_values, window_size, closed=None): + def _fw(self, num_values, window_size, closed): arr = self.index_array[::-1] s, _ = calculate_variable_window_bounds(num_values, window_size, None, None, closed, arr) start = np.arange(num_values) @@ -375,7 +372,7 @@ def customRoller(obj, window, min_periods=None, # aka minimum non-nan values indexer = indexer(index_array=x._on.asi8, window_size=x.window, **ours) # center offset is calculated from min_periods if a indexer is passed to rolling(). - # if instead a normal window is passed, it is used for offset calculation. + # if instead a normal (dt or num) window is passed, it is used for offset calculation. # also if we pass min_periods == None or 0, all values will Nan in the result even if # start[i]<end[i] as expected. So we cannot pass `center` to rolling. Instead we manually do the centering # in the Indexer. To calculate min_periods (!) including NaN count (!) we need to pass min_periods, but diff --git a/test/lib/test_rolling.py b/test/lib/test_rolling.py index 03da38f7c0d08144627b151dd86a8cf4c4938b2f..fc49a7c491e9d8567d8fa1095d972ad4d3bed79f 100644 --- a/test/lib/test_rolling.py +++ b/test/lib/test_rolling.py @@ -1,9 +1,16 @@ import pytest -from saqc.lib.rolling import customRoller +from saqc.lib.rolling import customRoller, Rolling import pandas as pd import numpy as np +FUNCTS = ['count', 'sum', 'mean', 'median', 'var', 'std', 'min', 'max', 'corr', 'cov', 'skew', 'kurt', ] + +OTHA = ['apply', + 'aggregate', # needs param func eg. func='min' + 'quantile', # needs param quantile=0.5 (0<=q<=1) + ] + @pytest.fixture def data(): @@ -19,26 +26,39 @@ def data_(): return s +def data__(): + s1 = pd.Series(1., index=pd.date_range("1999/12", periods=4, freq='1M') + pd.Timedelta('1d')) + s2 = pd.Series(1., index=pd.date_range('2000/05/15', periods=2, freq='1d')) + s = pd.concat([s1, s2]).sort_index() + s.name = 's' + s[5] = np.nan + return s + + len_s = len(data_()) def make_num_kws(): l = [] - for window in range(len_s + 2): - for min_periods in [None] + list(range(window + 1)): + n = [0, 1, 2, 10, 20, 30] + mp = [0, 1, 2, 10, 20, 30] + for window in n: + for min_periods in [None] + mp: + if min_periods is not None and min_periods > window: + continue for center in [False, True]: - for closed in [None] + ['left', 'right', 'both', 'neither']: - l.append(dict(window=window, min_periods=min_periods, center=center, closed=closed)) + l.append(dict(window=window, min_periods=min_periods, center=center)) return l def make_dt_kws(): l = [] - for closed in [None] + ['right', 'both', 'neither', 'left']: - for window in range(1, len_s + 3): - for min_periods in [None] + list(range(window + 1)): - for win in [f'{window}d', f'{window * 31}d']: - l.append(dict(window=win, min_periods=min_periods, closed=closed)) + n = [0, 1, 2, 10, 32, 70, 120] + mp = [0, 1, 2, 10, 20, 30] + for closed in ['right', 'both', 'neither', 'left']: + for window in n: + for min_periods in [None] + mp: + l.append(dict(window=f'{window}d', min_periods=min_periods, closed=closed)) return l @@ -60,20 +80,37 @@ def print_diff(s, result, expected): print(df) -def runtest_for_kw_combi(s, kws): +def runtest_for_kw_combi(s, kws, func='sum'): print(kws) forward = kws.pop('forward', False) + + def calc(roller): + if isinstance(func, str): + return getattr(roller, func)() + else: + return getattr(roller, 'apply')(func) + if forward: - result = customRoller(s, forward=True, **kws).sum() - expected = pd.Series(reversed(s), reversed(s.index)).rolling(**kws).sum()[::-1] + expR = pd.Series(reversed(s), reversed(s.index)).rolling(**kws) + resR = customRoller(s, forward=True, **kws) + try: + expected = calc(expR)[::-1] + except Exception: + pytest.skip("pandas faild") + result = calc(resR) success = check_series(result, expected) if not success: print_diff(s, result, expected) assert False, f"forward=True !! {kws}" else: - result = customRoller(s, **kws).sum() - expected = s.rolling(**kws).sum() + expR = s.rolling(**kws) + resR = customRoller(s, **kws) + try: + expected = calc(expR) + except Exception: + pytest.skip("pandas faild") + result = calc(resR) success = check_series(result, expected) if not success: @@ -81,26 +118,30 @@ def runtest_for_kw_combi(s, kws): assert False -@pytest.mark.parametrize("kws", make_num_kws()) -def test_pandas_conform_num(data, kws): - runtest_for_kw_combi(data, kws) +@pytest.mark.parametrize("kws", make_dt_kws(), ids=lambda x: str(x)) +@pytest.mark.parametrize("func", FUNCTS) +def test_pandas_conform_dt(data, kws, func): + runtest_for_kw_combi(data, kws, func=func) -@pytest.mark.parametrize("kws", make_dt_kws()) -def test_pandas_conform_dt(data, kws): - runtest_for_kw_combi(data, kws) +@pytest.mark.parametrize("kws", make_num_kws(), ids=lambda x: str(x)) +@pytest.mark.parametrize("func", FUNCTS) +def test_pandas_conform_num(data, kws, func): + runtest_for_kw_combi(data, kws, func=func) -@pytest.mark.parametrize("kws", make_num_kws()) -def test_forward_num(data, kws): +@pytest.mark.parametrize("kws", make_dt_kws(), ids=lambda x: str(x)) +@pytest.mark.parametrize("func", FUNCTS) +def test_forward_dt(data, kws, func): kws.update(forward=True) - runtest_for_kw_combi(data, kws) + runtest_for_kw_combi(data, kws, func=func) -@pytest.mark.parametrize("kws", make_dt_kws()) -def test_forward_dt(data, kws): +@pytest.mark.parametrize("kws", make_num_kws(), ids=lambda x: str(x)) +@pytest.mark.parametrize("func", FUNCTS) +def test_forward_num(data, kws, func): kws.update(forward=True) - runtest_for_kw_combi(data, kws) + runtest_for_kw_combi(data, kws, func=func) def dt_center_kws(): @@ -111,7 +152,7 @@ def dt_center_kws(): return l -@pytest.mark.parametrize("kws", dt_center_kws()) +@pytest.mark.parametrize("kws", make_num_kws(), ids=lambda x: str(x)) def test_centering_w_dtindex(kws): print(kws) s = pd.Series(0., index=pd.date_range("2000", periods=10, freq='1H'))