-
David Schäfer authoreda884f8df
test_harm_funcs.py 10.66 KiB
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
import numpy as np
import pandas as pd
from test.common import TESTFLAGGER
from saqc.funcs.harm_functions import harmonize, deharmonize,\
_interpolate, _interpolateGrid, _insertGrid, _outsortCrap
TESTFLAGGER = TESTFLAGGER[:-1]
RESHAPERS = [
'nearest_shift',
'fshift',
'bshift'
]
COFLAGGING = [
False,
True
]
SETSHIFTCOMMENT = [
False,
True
]
INTERPOLATIONS = [
'fshift',
'bshift',
'nearest_shift',
'nearest_agg',
'bagg'
]
INTERPOLATIONS2 = [
'fagg',
'time',
'polynomial'
]
FREQS = [
'15min',
'30min'
]
@pytest.fixture
def data():
index = pd.date_range(start='1.1.2011 00:00:00', end='1.1.2011 01:00:00', freq='15min')
index = index.insert(2, pd.Timestamp(2011, 1, 1, 0, 29, 0))
index = index.insert(2, pd.Timestamp(2011, 1, 1, 0, 28, 0))
index = index.insert(5, pd.Timestamp(2011, 1, 1, 0, 32, 0))
index = index.insert(5, pd.Timestamp(2011, 1, 1, 0, 31, 0))
index = index.insert(0, pd.Timestamp(2010, 12, 31, 23, 57, 0))
index = index.drop(pd.Timestamp('2011-01-01 00:30:00'))
dat = pd.Series(np.linspace(-50, 50, index.size), index=index, name='data')
# good to have some nan
dat[-3] = np.nan
data = dat.to_frame()
return data
@pytest.fixture
def multi_data():
index = pd.date_range(start='1.1.2011 00:00:00', end='1.1.2011 01:00:00', freq='15min')
index = index.insert(2, pd.Timestamp(2011, 1, 1, 0, 29, 0))
index = index.insert(2, pd.Timestamp(2011, 1, 1, 0, 28, 0))
index = index.insert(5, pd.Timestamp(2011, 1, 1, 0, 32, 0))
index = index.insert(5, pd.Timestamp(2011, 1, 1, 0, 31, 0))
index = index.insert(0, pd.Timestamp(2010, 12, 31, 23, 57, 0))
index = index.drop(pd.Timestamp('2011-01-01 00:30:00'))
dat = pd.Series(np.linspace(-50, 50, index.size), index=index, name='data')
# good to have some nan
dat[-3] = np.nan
data = dat.to_frame()
data.index = data.index.shift(1, '2min')
dat2 = data.copy()
dat2.index = dat2.index.shift(1, '17min')
dat2.rename(columns={'data': 'data2'}, inplace=True)
dat3 = data.copy()
dat3.index = dat3.index.shift(1, '1h')
dat3.rename(columns={'data': 'data3'}, inplace=True)
dat3.drop(dat3.index[2:-2], inplace=True)
# merge
data = pd.merge(data, dat2, how='outer', left_index=True, right_index=True)
data = pd.merge(data, dat3, how='outer', left_index=True, right_index=True)
return data
@pytest.mark.parametrize('flagger', TESTFLAGGER)
@pytest.mark.parametrize('reshaper', RESHAPERS)
@pytest.mark.parametrize('co_flagging', COFLAGGING)
def test_harmSingleVarIntermediateFlagging(data, flagger, reshaper, co_flagging):
flagger = flagger.initFlags(data)
# flags = flagger.initFlags(data)
# make pre harm copies:
pre_data = data.copy()
pre_flags = flagger.getFlags()
freq = '15min'
# harmonize data:
data, flagger = harmonize(data, 'data', flagger, freq, 'time', reshaper)
# flag something bad
flagger = flagger.setFlags('data', loc=data.index[3:4])
data, flagger = deharmonize(
data, 'data', flagger, co_flagging=co_flagging)
if reshaper is 'nearest_shift':
if co_flagging is True:
assert flagger.isFlagged(loc=data.index[3:7]).squeeze().all()
assert (~flagger.isFlagged(loc=data.index[0:3]).squeeze()).all()
assert (~flagger.isFlagged(loc=data.index[7:]).squeeze()).all()
if co_flagging is False:
assert (flagger.isFlagged().squeeze() ==
[False, False, False, False, True, False, True, False, False]).all()
if reshaper is 'bshift':
if co_flagging is True:
assert flagger.isFlagged(loc=data.index[5:7]).squeeze().all()
assert (~flagger.isFlagged(loc=data.index[0:5]).squeeze()).all()
assert (~flagger.isFlagged(loc=data.index[7:]).squeeze()).all()
if co_flagging is False:
assert (flagger.isFlagged().squeeze() ==
[False, False, False, False, False, True, True, False, False]).all()
if reshaper is 'fshift':
if co_flagging is True:
assert flagger.isFlagged(loc=data.index[3:5]).squeeze().all()
assert flagger.isFlagged(loc=data.index[6:7]).squeeze().all()
assert (~flagger.isFlagged(loc=data.index[0:3]).squeeze()).all()
assert (~flagger.isFlagged(loc=data.index[7:]).squeeze()).all()
if co_flagging is False:
assert (flagger.isFlagged().squeeze() ==
[False, False, False, False, True, False, True, False, False]).all()
flags = flagger.getFlags()
assert pre_data.equals(data)
assert len(data) == len(flags)
assert (pre_flags.index == flags.index).all()
@pytest.mark.parametrize('flagger', TESTFLAGGER)
@pytest.mark.parametrize('interpolation', INTERPOLATIONS)
@pytest.mark.parametrize('freq', FREQS)
def test_harmSingleVarInterpolations(data, flagger, interpolation, freq):
flagger = flagger.initFlags(data)
flags = flagger.getFlags()
# make pre harm copies:
pre_data = data.copy()
pre_flags = flags.copy()
harm_start = data.index[0].floor(freq=freq)
harm_end = data.index[-1].ceil(freq=freq)
test_index = pd.date_range(start=harm_start, end=harm_end, freq=freq)
data, flagger = harmonize(
data, 'data', flagger, freq, interpolation, 'fshift',
reshape_shift_comment=False, inter_agg=np.sum)
if interpolation is 'fshift':
if freq == '15min':
assert data.equals(pd.DataFrame({'data': [np.nan, -37.5, -25.0, 0.0, 37.5, 50.0]}, index=test_index))
if freq == '30min':
assert data.equals(pd.DataFrame({'data': [np.nan, -37.5, 0.0, 50.0]}, index=test_index))
if interpolation is 'bshift':
if freq == '15min':
assert data.equals(pd.DataFrame({'data': [-50.0, -37.5, -25.0, 12.5, 37.5, 50.0]}, index=test_index))
if freq == '30min':
assert data.equals(pd.DataFrame({'data': [-50.0, -37.5, 12.5, 50.0]}, index=test_index))
if interpolation is 'nearest_shift':
if freq == '15min':
assert data.equals(pd.DataFrame({'data': [np.nan, -37.5, -25.0, 12.5, 37.5, 50.0]}, index=test_index))
if freq == '30min':
assert data.equals(pd.DataFrame({'data': [np.nan, -37.5, 12.5, 50.0]}, index=test_index))
if interpolation is 'nearest_agg':
if freq == '15min':
assert data.equals(pd.DataFrame({'data': [np.nan, -87.5, -25.0, 0.0, 37.5, 50.0]}, index=test_index))
if freq == '30min':
assert data.equals(pd.DataFrame({'data': [np.nan, -87.5, -25.0, 87.5]}, index=test_index))
if interpolation is 'bagg':
if freq == '15min':
assert data.equals(pd.DataFrame({'data': [-50.0, -37.5, -37.5, 12.5, 37.5, 50.0]}, index=test_index))
if freq == '30min':
assert data.equals(pd.DataFrame({'data': [-50.0, -75.0, 50.0, 50.0]}, index=test_index))
data, flagger = deharmonize(data, 'data', flagger, co_flagging=True)
data, flagger = deharmonize(data, 'data', flagger, co_flagging=True)
flags = flagger.getFlags()
assert pre_data.equals(data)
assert len(data) == len(flags)
assert (pre_flags.index == flags.index).all()
@pytest.mark.parametrize('flagger', TESTFLAGGER)
@pytest.mark.parametrize('shift_comment', SETSHIFTCOMMENT)
def test_multivariatHarmonization(multi_data, flagger, shift_comment):
flagger = flagger.initFlags(multi_data)
flags = flagger.getFlags()
# for comparison
pre_data = multi_data.copy()
pre_flags = flags.copy()
freq = '15min'
harm_start = multi_data.index[0].floor(freq=freq)
harm_end = multi_data.index[-1].ceil(freq=freq)
test_index = pd.date_range(start=harm_start, end=harm_end, freq=freq)
# harm:
multi_data, flagger = harmonize(
multi_data, 'data', flagger,
freq, 'time', 'nearest_shift',
reshape_shift_comment=shift_comment)
multi_data, flagger = harmonize(
multi_data, 'data2', flagger,
freq, 'bagg', 'bshift',
inter_agg=sum,
reshape_agg=max,
reshape_shift_comment=shift_comment)
multi_data, flagger = harmonize(
multi_data, 'data3', flagger,
freq, 'fshift', 'fshift',
reshape_shift_comment=shift_comment)
assert multi_data.index.equals(test_index)
assert pd.Timedelta(pd.infer_freq(multi_data.index)) == pd.Timedelta(freq)
multi_data, flagger = deharmonize(
multi_data, 'data3', flagger,
co_flagging=False)
multi_data, flagger = deharmonize(
multi_data, 'data2', flagger,
co_flagging=True)
multi_data, flagger = deharmonize(
multi_data, 'data', flagger,
co_flagging=True)
flags = flagger.getFlags()
assert pre_data.equals(multi_data[pre_data.columns.to_list()])
assert len(multi_data) == len(flags)
assert (pre_flags.index == flags.index).all()
@pytest.mark.parametrize('method', INTERPOLATIONS2)
def test_gridInterpolation(data, method):
freq = '15min'
data = ((data * np.sin(data)).append(data.shift(1, '2h')).shift(1, '3s'))
# we are just testing if the interolation gets passed to the series without causing an error:
_interpolateGrid(data, freq, method, order=1, agg_method=sum, downcast_interpolation=True)
if method == 'polynomial':
_interpolateGrid(data, freq, method, order=2, agg_method=sum, downcast_interpolation=True)
_interpolateGrid(data, freq, method, order=10, agg_method=sum, downcast_interpolation=True)
data = _insertGrid(data, freq)
_interpolate(data, method, inter_limit=3)
@pytest.mark.parametrize('flagger', TESTFLAGGER)
def test_outsortCrap(data, flagger):
field = data.columns[0]
flagger = flagger.initFlags(data)
flagger = flagger.setFlags(field, iloc=slice(5, 7))
drop_index = data.index[5:7]
d, _ = _outsortCrap(data, field, flagger, drop_suspicious=True, drop_bad=False)
assert drop_index.difference(d.index).equals(drop_index)
d, _ = _outsortCrap(data, field, flagger, drop_suspicious=False, drop_bad=True)
assert drop_index.difference(d.index).equals(drop_index)
flagger = flagger.setFlags(field, iloc=slice(0, 1), flag=flagger.GOOD)
drop_index = drop_index.insert(-1, data.index[0])
d, _ = _outsortCrap(data, field, flagger, drop_suspicious=False, drop_bad=False,
drop_list=[flagger.BAD, flagger.GOOD])
assert drop_index.sort_values().difference(d.index).equals(drop_index.sort_values())
f_drop, _ = _outsortCrap(data, field, flagger, drop_suspicious=False, drop_bad=False,
drop_list=[flagger.BAD, flagger.GOOD], return_drops=True)
assert f_drop.index.sort_values().equals(drop_index.sort_values())