From f188a38ca853f9b78043698e881da58791236821 Mon Sep 17 00:00:00 2001
From: Bert Palm <bert.palm@ufz.de>
Date: Thu, 26 Nov 2020 02:32:31 +0100
Subject: [PATCH] advanced test suit... failing tests have reasons eg. skew
 need 3 min_periods, kurt at least 4, etc. count produce NaN, where
 pandas.rolling produce 0.0, due to a pandas internal function call to .count
 if a indexer is given (freaky!)..

---
 saqc/lib/rolling.py      | 39 ++++++++--------
 test/lib/test_rolling.py | 97 ++++++++++++++++++++++++++++------------
 2 files changed, 87 insertions(+), 49 deletions(-)

diff --git a/saqc/lib/rolling.py b/saqc/lib/rolling.py
index ba529fa53..337f59916 100644
--- a/saqc/lib/rolling.py
+++ b/saqc/lib/rolling.py
@@ -75,8 +75,6 @@ class _CustomBaseIndexer(BaseIndexer):
             self.skip = ~self.skip
 
     def get_window_bounds(self, num_values=0, min_periods=None, center=False, closed=None):
-        if min_periods is not None:
-            self.min_periods = max(self.min_periods, min_periods)
         num_values = self.num_values
         min_periods = self.min_periods
         center = self.center
@@ -134,17 +132,18 @@ class _FixedWindowDirectionIndexer(_CustomBaseIndexer):
 
     def validate(self) -> None:
         super().validate()
-        # if self.closed is not None:
-        #     raise ValueError("closed only implemented for datetimelike and offset based windows")
+        if self.min_periods is None:
+            self.min_periods = self.window_size
 
     def _get_bounds(self, num_values=0, min_periods=None, center=False, closed=None):
+        # closed is always ignored and handled as 'both' other cases not implemented
         offset = calculate_center_offset(self.window_size) if center else 0
         num_values += offset
 
         if self.forward:
-            start, end = self._fw(num_values, min_periods, center, closed, offset)
+            start, end = self._fw(num_values, offset)
         else:
-            start, end = self._bw(num_values, min_periods, center, closed, offset)
+            start, end = self._bw(num_values, offset)
 
         if center:
             start, end = self._center_result(start, end, offset)
@@ -156,6 +155,8 @@ class _FixedWindowDirectionIndexer(_CustomBaseIndexer):
         return start, end
 
     def _center_result(self, start, end, offset):
+        # cut N values at the front that was inserted in _fw()
+        # or cut N values at the end if _bw()
         if offset > 0:
             if self.forward:
                 start = start[:-offset]
@@ -178,20 +179,14 @@ class _FixedWindowDirectionIndexer(_CustomBaseIndexer):
 
         return start, end
 
-    def _bw(self, num_values=0, min_periods=None, center=False, closed=None, offset=0):
-        # code taken from pd.core.windows.indexer.FixedWindowIndexer
-        start_s = np.zeros(self.window_size, dtype="int64")
-        start_e = (np.arange(self.window_size, num_values, dtype="int64") - self.window_size + 1)
-        start = np.concatenate([start_s, start_e])[:num_values]
-
-        end_s = np.arange(self.window_size, dtype="int64") + 1
-        end_e = start_e + self.window_size
-        end = np.concatenate([end_s, end_e])[:num_values]
-        # end stolen code
+    def _bw(self, num_values=0, offset=0):
+        start = np.arange(-self.window_size, num_values + offset, dtype="int64") + 1
+        end = start + self.window_size
+        start[:self.window_size] = 0
         return start, end
 
-    def _fw(self, num_values=0, min_periods=None, center=False, closed=None, offset=0):
-        start = np.arange(-offset, num_values, dtype="int64")[:num_values]
+    def _fw(self, num_values=0, offset=0):
+        start = np.arange(-offset, num_values, dtype="int64")
         end = start + self.window_size
         start[:offset] = 0
         return start, end
@@ -208,6 +203,8 @@ class _VariableWindowDirectionIndexer(_CustomBaseIndexer):
         super().validate()
         if self.min_periods is None:
             self.min_periods = 1
+            if self.window_size == 0:
+                self.min_periods = 0
 
     def _get_bounds(self, num_values=0, min_periods=None, center=False, closed=None):
         ws_bw, ws_fw = self._get_center_window_sizes(self.window_size)
@@ -250,12 +247,12 @@ class _VariableWindowDirectionIndexer(_CustomBaseIndexer):
 
         return start, end
 
-    def _bw(self, num_values, window_size, closed=None):
+    def _bw(self, num_values, window_size, closed):
         arr = self.index_array
         start, end = calculate_variable_window_bounds(num_values, window_size, None, None, closed, arr)
         return start, end
 
-    def _fw(self, num_values, window_size, closed=None):
+    def _fw(self, num_values, window_size, closed):
         arr = self.index_array[::-1]
         s, _ = calculate_variable_window_bounds(num_values, window_size, None, None, closed, arr)
         start = np.arange(num_values)
@@ -375,7 +372,7 @@ def customRoller(obj, window, min_periods=None,  # aka minimum non-nan values
     indexer = indexer(index_array=x._on.asi8, window_size=x.window, **ours)
 
     # center offset is calculated from min_periods if a indexer is passed to rolling().
-    # if instead a normal window is passed, it is used for offset calculation.
+    # if instead a normal (dt or num) window is passed, it is used for offset calculation.
     # also if we pass min_periods == None or 0, all values will Nan in the result even if
     # start[i]<end[i] as expected. So we cannot pass `center` to rolling. Instead we manually do the centering
     # in the Indexer. To calculate min_periods (!) including NaN count (!) we need to pass min_periods, but
diff --git a/test/lib/test_rolling.py b/test/lib/test_rolling.py
index 03da38f7c..fc49a7c49 100644
--- a/test/lib/test_rolling.py
+++ b/test/lib/test_rolling.py
@@ -1,9 +1,16 @@
 import pytest
 
-from saqc.lib.rolling import customRoller
+from saqc.lib.rolling import customRoller, Rolling
 import pandas as pd
 import numpy as np
 
+FUNCTS = ['count', 'sum', 'mean', 'median', 'var', 'std', 'min', 'max', 'corr', 'cov', 'skew', 'kurt', ]
+
+OTHA = ['apply',
+        'aggregate',  # needs param func eg. func='min'
+        'quantile',  # needs param quantile=0.5 (0<=q<=1)
+        ]
+
 
 @pytest.fixture
 def data():
@@ -19,26 +26,39 @@ def data_():
     return s
 
 
+def data__():
+    s1 = pd.Series(1., index=pd.date_range("1999/12", periods=4, freq='1M') + pd.Timedelta('1d'))
+    s2 = pd.Series(1., index=pd.date_range('2000/05/15', periods=2, freq='1d'))
+    s = pd.concat([s1, s2]).sort_index()
+    s.name = 's'
+    s[5] = np.nan
+    return s
+
+
 len_s = len(data_())
 
 
 def make_num_kws():
     l = []
-    for window in range(len_s + 2):
-        for min_periods in [None] + list(range(window + 1)):
+    n = [0, 1, 2, 10, 20, 30]
+    mp = [0, 1, 2, 10, 20, 30]
+    for window in n:
+        for min_periods in [None] + mp:
+            if min_periods is not None and min_periods > window:
+                continue
             for center in [False, True]:
-                for closed in [None] + ['left', 'right', 'both', 'neither']:
-                    l.append(dict(window=window, min_periods=min_periods, center=center, closed=closed))
+                l.append(dict(window=window, min_periods=min_periods, center=center))
     return l
 
 
 def make_dt_kws():
     l = []
-    for closed in [None] + ['right', 'both', 'neither', 'left']:
-        for window in range(1, len_s + 3):
-            for min_periods in [None] + list(range(window + 1)):
-                for win in [f'{window}d', f'{window * 31}d']:
-                    l.append(dict(window=win, min_periods=min_periods, closed=closed))
+    n = [0, 1, 2, 10, 32, 70, 120]
+    mp = [0, 1, 2, 10, 20, 30]
+    for closed in ['right', 'both', 'neither', 'left']:
+        for window in n:
+            for min_periods in [None] + mp:
+                l.append(dict(window=f'{window}d', min_periods=min_periods, closed=closed))
     return l
 
 
@@ -60,20 +80,37 @@ def print_diff(s, result, expected):
     print(df)
 
 
-def runtest_for_kw_combi(s, kws):
+def runtest_for_kw_combi(s, kws, func='sum'):
     print(kws)
     forward = kws.pop('forward', False)
+
+    def calc(roller):
+        if isinstance(func, str):
+            return getattr(roller, func)()
+        else:
+            return getattr(roller, 'apply')(func)
+
     if forward:
-        result = customRoller(s, forward=True, **kws).sum()
-        expected = pd.Series(reversed(s), reversed(s.index)).rolling(**kws).sum()[::-1]
+        expR = pd.Series(reversed(s), reversed(s.index)).rolling(**kws)
+        resR = customRoller(s, forward=True, **kws)
+        try:
+            expected = calc(expR)[::-1]
+        except Exception:
+            pytest.skip("pandas faild")
+        result = calc(resR)
 
         success = check_series(result, expected)
         if not success:
             print_diff(s, result, expected)
             assert False, f"forward=True !! {kws}"
     else:
-        result = customRoller(s, **kws).sum()
-        expected = s.rolling(**kws).sum()
+        expR = s.rolling(**kws)
+        resR = customRoller(s, **kws)
+        try:
+            expected = calc(expR)
+        except Exception:
+            pytest.skip("pandas faild")
+        result = calc(resR)
 
         success = check_series(result, expected)
         if not success:
@@ -81,26 +118,30 @@ def runtest_for_kw_combi(s, kws):
             assert False
 
 
-@pytest.mark.parametrize("kws", make_num_kws())
-def test_pandas_conform_num(data, kws):
-    runtest_for_kw_combi(data, kws)
+@pytest.mark.parametrize("kws", make_dt_kws(), ids=lambda x: str(x))
+@pytest.mark.parametrize("func", FUNCTS)
+def test_pandas_conform_dt(data, kws, func):
+    runtest_for_kw_combi(data, kws, func=func)
 
 
-@pytest.mark.parametrize("kws", make_dt_kws())
-def test_pandas_conform_dt(data, kws):
-    runtest_for_kw_combi(data, kws)
+@pytest.mark.parametrize("kws", make_num_kws(), ids=lambda x: str(x))
+@pytest.mark.parametrize("func", FUNCTS)
+def test_pandas_conform_num(data, kws, func):
+    runtest_for_kw_combi(data, kws, func=func)
 
 
-@pytest.mark.parametrize("kws", make_num_kws())
-def test_forward_num(data, kws):
+@pytest.mark.parametrize("kws", make_dt_kws(), ids=lambda x: str(x))
+@pytest.mark.parametrize("func", FUNCTS)
+def test_forward_dt(data, kws, func):
     kws.update(forward=True)
-    runtest_for_kw_combi(data, kws)
+    runtest_for_kw_combi(data, kws, func=func)
 
 
-@pytest.mark.parametrize("kws", make_dt_kws())
-def test_forward_dt(data, kws):
+@pytest.mark.parametrize("kws", make_num_kws(), ids=lambda x: str(x))
+@pytest.mark.parametrize("func", FUNCTS)
+def test_forward_num(data, kws, func):
     kws.update(forward=True)
-    runtest_for_kw_combi(data, kws)
+    runtest_for_kw_combi(data, kws, func=func)
 
 
 def dt_center_kws():
@@ -111,7 +152,7 @@ def dt_center_kws():
     return l
 
 
-@pytest.mark.parametrize("kws", dt_center_kws())
+@pytest.mark.parametrize("kws", make_num_kws(), ids=lambda x: str(x))
 def test_centering_w_dtindex(kws):
     print(kws)
     s = pd.Series(0., index=pd.date_range("2000", periods=10, freq='1H'))
-- 
GitLab