From a0147299b6c29943950ca538b8dce396358a46f6 Mon Sep 17 00:00:00 2001
From: Bert Palm <bert.palm@ufz.de>
Date: Sat, 20 Mar 2021 02:41:03 +0100
Subject: [PATCH] fixed harmo tests

---
 tests/funcs/test_harm_funcs.py | 273 +++++++++++++--------------------
 1 file changed, 103 insertions(+), 170 deletions(-)

diff --git a/tests/funcs/test_harm_funcs.py b/tests/funcs/test_harm_funcs.py
index 0675b3aeb..f78f8e573 100644
--- a/tests/funcs/test_harm_funcs.py
+++ b/tests/funcs/test_harm_funcs.py
@@ -1,17 +1,13 @@
 #! /usr/bin/env python
 # -*- coding: utf-8 -*-
 
-
-# see test/functs/conftest.py for global fixtures "course_..."
 import pytest
 import numpy as np
 import pandas as pd
 import dios
 
-from test.common import TESTFLAGGER
 from saqc.flagger import Flagger, initFlagsLike
-from saqc.common import BAD
-
+from saqc.constants import BAD, UNFLAGGED
 from saqc.funcs.resampling import (
     linear,
     interpolate,
@@ -20,10 +16,6 @@ from saqc.funcs.resampling import (
     mapToOriginal,
 )
 
-RESHAPERS = ["nshift", "fshift", "bshift", "nagg", "bagg", "fagg", "interpolation"]
-
-INTERPOLATIONS = ["time", "polynomial"]
-
 
 @pytest.fixture
 def data():
@@ -41,184 +33,125 @@ def data():
     return data
 
 
-@pytest.mark.parametrize("flagger", TESTFLAGGER)
-@pytest.mark.parametrize("reshaper", RESHAPERS)
-def test_harmSingleVarIntermediateFlagging(data, flagger, reshaper):
+@pytest.mark.parametrize("reshaper", ["nshift", "fshift", "bshift", "nagg", "bagg", "fagg", "interpolation"])
+def test_harmSingleVarIntermediateFlagging(data, reshaper):
     flagger = initFlagsLike(data)
-    # make pre harm copies:
+    field = 'data'
+
     pre_data = data.copy()
-    pre_flags = flagger['data']
-    freq = "15min"
-    assert len(data.columns) == 1
-    field = data.columns[0]
-    data, flagger = linear(data, "data", flagger, freq)
+    pre_flagger = flagger.copy()
+
+    data, flagger = linear(data, field, flagger, freq="15min")
+
     # flag something bad
-    f_ser = pd.Series(data=[-np.inf] * len(data[field]), index=data[field].index)
-    f_ser[3:4] = BAD
-    flagger[field] = f_ser
-    data, flagger = mapToOriginal(data, "data", flagger, method="inverse_" + reshaper)
-    d = data[field]
-    if reshaper == "nagg":
-        assert flagger.isFlagged(loc=d.index[3:7]).squeeze().all()
-        assert (~flagger.isFlagged(loc=d.index[0:3]).squeeze()).all()
-        assert (~flagger.isFlagged(loc=d.index[7:]).squeeze()).all()
-    if reshaper == "nshift":
-        assert (flagger.isFlagged().squeeze() == [False, False, False, False, True, False, False, False, False]).all()
-    if reshaper == "bagg":
-        assert flagger.isFlagged(loc=d.index[5:7]).squeeze().all()
-        assert (~flagger.isFlagged(loc=d.index[0:5]).squeeze()).all()
-        assert (~flagger.isFlagged(loc=d.index[7:]).squeeze()).all()
-    if reshaper == "bshift":
-        assert (flagger.isFlagged().squeeze() == [False, False, False, False, False, True, False, False, False]).all()
-    if reshaper == "fagg":
-        assert flagger.isFlagged(loc=d.index[3:5]).squeeze().all()
-        assert (~flagger.isFlagged(loc=d.index[0:3]).squeeze()).all()
-        assert (~flagger.isFlagged(loc=d.index[5:]).squeeze()).all()
-    if reshaper == "fshift":
-        assert (flagger.isFlagged().squeeze() == [False, False, False, False, True, False, False, False, False]).all()
-
-    flags = flagger.getFlags()
-    assert pre_data[field].equals(data[field])
-    assert len(data[field]) == len(flags[field])
-    assert (pre_flags[field].index == flags[field].index).all()
-
-
-@pytest.mark.parametrize("flagger", TESTFLAGGER)
-def test_harmSingleVarInterpolations(data, flagger):
-    flagger = flagger.initFlags(data)
-    field = data.columns[0]
-    pre_data = data[field]
-    pre_flags = flagger.getFlags(field)
-    tests = [
-        (
-            "nagg",
-            "15Min",
-            pd.Series(
-                data=[-87.5, -25.0, 0.0, 37.5, 50.0],
-                index=pd.date_range("2011-01-01 00:00:00", "2011-01-01 01:00:00", freq="15min"),
-            ),
-        ),
-        (
-            "nagg",
-            "30Min",
-            pd.Series(
-                data=[-87.5, -25.0, 87.5],
-                index=pd.date_range("2011-01-01 00:00:00", "2011-01-01 01:00:00", freq="30min"),
-            ),
-        ),
-        (
-            "bagg",
-            "15Min",
-            pd.Series(
-                data=[-50.0, -37.5, -37.5, 12.5, 37.5, 50.0],
-                index=pd.date_range("2010-12-31 23:45:00", "2011-01-01 01:00:00", freq="15min"),
-            ),
-        ),
-        (
-            "bagg",
-            "30Min",
-            pd.Series(
-                data=[-50.0, -75.0, 50.0, 50.0],
-                index=pd.date_range("2010-12-31 23:30:00", "2011-01-01 01:00:00", freq="30min"),
-            ),
-        ),
-    ]
-
-    for interpolation, freq, expected in tests:
-        data_harm, flagger_harm = aggregate(
-            data, field, flagger, freq, value_func=np.sum, method=interpolation
-        )
-        assert data_harm[field].equals(expected)
-        data_deharm, flagger_deharm = mapToOriginal(
-            data_harm, "data", flagger_harm, method="inverse_" + interpolation
-        )
-        assert data_deharm[field].equals(pre_data)
-        assert flagger_deharm.getFlags([field]).squeeze().equals(pre_flags)
-
-    tests = [
-        (
-            "fshift",
-            "15Min",
-            pd.Series(
-                data=[np.nan, -37.5, -25.0, 0.0, 37.5, 50.0],
-                index=pd.date_range("2010-12-31 23:45:00", "2011-01-01 01:00:00", freq="15Min"),
-            ),
-        ),
-        (
-            "fshift",
-            "30Min",
-            pd.Series(
-                data=[np.nan, -37.5, 0.0, 50.0],
-                index=pd.date_range("2010-12-31 23:30:00", "2011-01-01 01:00:00", freq="30Min"),
-            ),
-        ),
-        (
-            "bshift",
-            "15Min",
-            pd.Series(
-                data=[-50.0, -37.5, -25.0, 12.5, 37.5, 50.0],
-                index=pd.date_range("2010-12-31 23:45:00", "2011-01-01 01:00:00", freq="15Min"),
-            ),
-        ),
-        (
-            "bshift",
-            "30Min",
-            pd.Series(
-                data=[-50.0, -37.5, 12.5, 50.0],
-                index=pd.date_range("2010-12-31 23:30:00", "2011-01-01 01:00:00", freq="30Min"),
-            ),
-        ),
-        (
-            "nshift",
-            "15min",
-            pd.Series(
-                data=[np.nan, -37.5, -25.0, 12.5, 37.5, 50.0],
-                index=pd.date_range("2010-12-31 23:45:00", "2011-01-01 01:00:00", freq="15Min"),
-            ),
-        ),
-        (
-            "nshift",
-            "30min",
-            pd.Series(
-                data=[np.nan, -37.5, 12.5, 50.0],
-                index=pd.date_range("2010-12-31 23:30:00", "2011-01-01 01:00:00", freq="30Min"),
-            ),
-        ),
-    ]
-
-    for interpolation, freq, expected in tests:
-        data_harm, flagger_harm = shift(data, field, flagger, freq, method=interpolation)
-        assert data_harm[field].equals(expected)
-        data_deharm, flagger_deharm = mapToOriginal(
-            data_harm, "data", flagger_harm, method="inverse_" + interpolation
-        )
-        assert data_deharm[field].equals(pre_data)
-        assert flagger_deharm.getFlags([field]).squeeze().equals(pre_flags)
-
-
-@pytest.mark.parametrize("method", INTERPOLATIONS)
+    flagger[data[field].index[3:4], field] = BAD
+    data, flagger = mapToOriginal(data, field, flagger, method="inverse_" + reshaper)
+
+    assert len(data[field]) == len(flagger[field])
+    assert data[field].equals(pre_data[field])
+    assert flagger[field].index.equals(pre_flagger[field].index)
+
+    if 'agg' in reshaper:
+        if reshaper == "nagg":
+            start, end = 3, 7
+        elif reshaper == "fagg":
+            start, end = 3, 5
+        elif reshaper == "bagg":
+            start, end = 5, 7
+        else:
+            raise NotImplementedError('untested test case')
+
+        assert all(flagger[field].iloc[start:end])
+        assert all(~flagger[field].iloc[:start])
+        assert all(~flagger[field].iloc[end:])
+
+    elif 'shift' in reshaper:
+        if reshaper == "nshift":
+            exp = [False, False, False, False, True, False, False, False, False]
+        elif reshaper == "fshift":
+            exp = [False, False, False, False, True, False, False, False, False]
+        elif reshaper == "bshift":
+            exp = [False, False, False, False, False, True, False, False, False]
+        else:
+            raise NotImplementedError('untested test case')
+
+        flagged = flagger[field] > UNFLAGGED
+        assert all(flagged == exp)
+
+    else:
+        raise NotImplementedError('untested test case')
+
+
+@pytest.mark.parametrize(
+    'params, expected',
+    [
+        (("nagg", "15Min"), pd.Series(data=[-87.5, -25.0, 0.0, 37.5, 50.0], index=pd.date_range("2011-01-01 00:00:00", "2011-01-01 01:00:00", freq="15min"))),
+        (("nagg", "30Min"), pd.Series(data=[-87.5, -25.0, 87.5], index=pd.date_range("2011-01-01 00:00:00", "2011-01-01 01:00:00", freq="30min"))),
+        (("bagg", "15Min"), pd.Series(data=[-50.0, -37.5, -37.5, 12.5, 37.5, 50.0], index=pd.date_range("2010-12-31 23:45:00", "2011-01-01 01:00:00", freq="15min"))),
+        (("bagg", "30Min"), pd.Series(data=[-50.0, -75.0, 50.0, 50.0], index=pd.date_range("2010-12-31 23:30:00", "2011-01-01 01:00:00", freq="30min"))),
+    ])
+def test_harmSingleVarInterpolationAgg(data, params, expected):
+    flagger = initFlagsLike(data)
+    field = 'data'
+    pre_data = data.copy()
+    pre_flaggger = flagger.copy()
+    method, freq = params
+
+    data_harm, flagger_harm = aggregate(data, field, flagger, freq, value_func=np.sum, method=method)
+    assert data_harm[field].equals(expected)
+
+    data_deharm, flagger_deharm = mapToOriginal(data_harm, "data", flagger_harm, method="inverse_" + method)
+    assert data_deharm[field].equals(pre_data[field])
+    assert flagger_deharm[field].equals(pre_flaggger[field])
+
+
+@pytest.mark.parametrize(
+    'params, expected',
+    [
+        (("fshift", "15Min"), pd.Series(data=[np.nan, -37.5, -25.0, 0.0, 37.5, 50.0], index=pd.date_range("2010-12-31 23:45:00", "2011-01-01 01:00:00", freq="15Min"))),
+        (("fshift", "30Min"), pd.Series(data=[np.nan, -37.5, 0.0, 50.0], index=pd.date_range("2010-12-31 23:30:00", "2011-01-01 01:00:00", freq="30Min"))),
+        (("bshift", "15Min"), pd.Series(data=[-50.0, -37.5, -25.0, 12.5, 37.5, 50.0], index=pd.date_range("2010-12-31 23:45:00", "2011-01-01 01:00:00", freq="15Min"))),
+        (("bshift", "30Min"), pd.Series(data=[-50.0, -37.5, 12.5, 50.0], index=pd.date_range("2010-12-31 23:30:00", "2011-01-01 01:00:00", freq="30Min"))),
+        (("nshift", "15min"), pd.Series(data=[np.nan, -37.5, -25.0, 12.5, 37.5, 50.0], index=pd.date_range("2010-12-31 23:45:00", "2011-01-01 01:00:00", freq="15Min"))),
+        (("nshift", "30min"), pd.Series(data=[np.nan, -37.5, 12.5, 50.0], index=pd.date_range("2010-12-31 23:30:00", "2011-01-01 01:00:00", freq="30Min"))),
+    ])
+def  test_harmSingleVarInterpolationShift(data, params, expected):
+    flagger = initFlagsLike(data)
+    field = 'data'
+    pre_data = data.copy()
+    pre_flagger = flagger.copy()
+    method, freq = params
+
+    data_harm, flagger_harm = shift(data, field, flagger, freq, method=method)
+    assert data_harm[field].equals(expected)
+
+    data_deharm, flagger_deharm = mapToOriginal(data_harm, "data", flagger_harm, method="inverse_" + method)
+    assert data_deharm[field].equals(pre_data[field])
+    assert flagger_deharm[field].equals(pre_flagger[field])
+
+
+@pytest.mark.parametrize("method", ["time", "polynomial"])
 def test_gridInterpolation(data, method):
     freq = "15min"
-    data = data.squeeze()
-    field = data.name
+    field = 'data'
+    data = data[field]
     data = (data * np.sin(data)).append(data.shift(1, "2h")).shift(1, "3s")
     data = dios.DictOfSeries(data)
-    flagger = TESTFLAGGER[0].initFlags(data)
+    flagger = initFlagsLike(data)
 
     # we are just testing if the interpolation gets passed to the series without causing an error:
-
     interpolate(data, field, flagger, freq, method=method, downcast_interpolation=True)
+
     if method == "polynomial":
         interpolate(data, field, flagger, freq, order=2, method=method, downcast_interpolation=True)
         interpolate(data, field, flagger, freq, order=10, method=method, downcast_interpolation=True)
 
 
-@pytest.mark.parametrize("flagger", TESTFLAGGER)
-def test_wrapper(data, flagger):
+def test_wrapper(data):
     # we are only testing, whether the wrappers do pass processing:
-    field = data.columns[0]
+    field = 'data'
     freq = "15min"
-    flagger = flagger.initFlags(data)
+    flagger = initFlagsLike(data)
 
     linear(data, field, flagger, freq, to_drop=None)
     aggregate(data, field, flagger, freq, value_func=np.nansum, method="nagg", to_drop=None)
-- 
GitLab