Skip to content
Snippets Groups Projects
Commit 0b1b1390 authored by Bert Palm's avatar Bert Palm 🎇
Browse files

implemented flagManual + test + madly detailed docstring

parent c792b390
No related branches found
No related tags found
2 merge requests!193Release 1.4,!188Release 1.4
Pipeline #5051 passed with stage
in 6 minutes and 29 seconds
......@@ -10,6 +10,8 @@ from saqc.lib.tools import groupConsecutives, sesonalMask
from saqc.core.register import register, Func
from saqc.core.visitor import ENVIRONMENT
from dios import DictOfSeries
from typing import Any
def _dslIsFlagged(flagger, var, flag=None, comparator=None):
......@@ -22,7 +24,6 @@ def _dslIsFlagged(flagger, var, flag=None, comparator=None):
def _execGeneric(flagger, data, func, field, nodata):
# TODO:
# - check series.index compatibility
# - field is only needed to translate 'this' parameters
......@@ -116,7 +117,7 @@ def flagMissing(data, field, flagger, nodata=np.nan, **kwargs):
@register
def flagSesonalRange(
data, field, flagger, min, max, startmonth=1, endmonth=12, startday=1, endday=31, **kwargs,
data, field, flagger, min, max, startmonth=1, endmonth=12, startday=1, endday=31, **kwargs,
):
smask = sesonalMask(data[field].index, startmonth, startday, endmonth, endday)
......@@ -147,9 +148,8 @@ def forceFlags(data, field, flagger, flag, **kwargs):
@register
def flagIsolated(
data, field, flagger, gap_window, group_window, **kwargs,
data, field, flagger, gap_window, group_window, **kwargs,
):
gap_window = pd.tseries.frequencies.to_offset(gap_window)
group_window = pd.tseries.frequencies.to_offset(group_window)
......@@ -162,9 +162,9 @@ def flagIsolated(
start = srs.index[0]
stop = srs.index[-1]
if stop - start <= group_window:
left = mask[start - gap_window : start].iloc[:-1]
left = mask[start - gap_window: start].iloc[:-1]
if left.all():
right = mask[stop : stop + gap_window].iloc[1:]
right = mask[stop: stop + gap_window].iloc[1:]
if right.all():
flags[start:stop] = True
......@@ -175,7 +175,137 @@ def flagIsolated(
@register
def flagDummy(data, field, flagger, **kwargs):
""" Do nothing """
""" Do nothing """
return data, flagger
@register
def flagManual(data, field, flagger, mdata, mflag: Any = 1, method='plain', **kwargs):
""" Flag data by given manual data.
The data is flagged at locations where `mdata` is equal to a provided flag (`mflag`).
The format of mdata can be a indexed object, like pd.Series, pd.Dataframe or dios.DictOfSeries,
but also can be a plain list- or array-like.
How indexed mdata is aligned to data is specified via `method` argument.
Parameters
----------
data : DictOfSeries
field : str
The field chooses the column in flags and data in question.
It also determine the column in mdata if its of type pd.Dataframe or dios.DictOfSeries.
flagger : flagger
mdata : {pd.Series, pd.Dataframe, DictOfSeries, str}
The manual data
mflag : scalar
The flag that indicates data points in `mdata`, that should be flagged.
method : {'plain', 'ontime', 'left-open', 'right-open'}, default plain
Define how mdata is applied on data. Except 'plain' mdata must have a index.
* 'plain': mdata must have same length than data and is applied one-to-one on data.
* 'ontime': work only with indexed mdata, it is applied, where timestamps are match.
* 'right-open': mdata defines periods, which are defined by two consecutive timestamps, the
value of the first aka. left is applied on the whole period.
* 'left-open': like 'right-open' but the value is defined in the latter aka. right timestamp.
kwargs : Any
passed to flagger
Returns
-------
data, flagger: original data, modified flagger
Examples
--------
An example for mdata
>>> mdata = pd.Series([1,0,1], index=pd.to_datetime(['2000-02', '2000-03', '2001-05']))
>>> mdata
2000-02-01 1
2000-03-01 0
2001-05-01 1
dtype: int64
On *dayly* data, with the 'ontime' method, only the provided timestamnps are used.
Bear in mind that only exact timestamps apply, any offset will result in ignoring
the timestamp.
>>> _, fl = flagManual(data, field, flagger, mdata, mflag=1, method='ontime')
>>> fl.isFlagged(field)
2000-01-31 False
2000-02-01 True
2000-02-02 False
2000-02-03 False
.. ..
2000-02-29 False
2000-03-01 True
2000-03-02 False
Freq: D, dtype: bool
With the 'right-open' method, the mdata is forward fill:
>>> _, fl = flagManual(data, field, flagger, mdata, mflag=1, method='right-open')
>>> fl.isFlagged(field)
2000-01-31 False
2000-02-01 True
2000-02-02 True
.. ..
2000-02-29 True
2000-03-01 False
2000-03-02 False
Freq: D, dtype: bool
With the 'left-open' method, backward filling is used:
>>> _, fl = flagManual(data, field, flagger, mdata, mflag=1, method='left-open')
>>> fl.isFlagged(field)
2000-01-31 False
2000-02-01 False
2000-02-02 True
.. ..
2000-02-29 True
2000-03-01 True
2000-03-02 False
Freq: D, dtype: bool
"""
dat = data[field]
if isinstance(mdata, str):
# todo import path type in mdata, use
# s = pd.read_csv(mdata, index_col=N, usecol=[N,N,..]) <- use positional
# use a list-arg in config to get the columns
# at last, fall throug to next checks
raise NotImplementedError("giving a path is currently not supported")
if isinstance(mdata, (pd.DataFrame, DictOfSeries)):
mdata = mdata[field]
hasindex = isinstance(mdata, (pd.Series, pd.DataFrame, DictOfSeries))
if not hasindex and method != 'plain':
raise ValueError("mdata has no index")
if method == 'plain':
if hasindex:
mdata = mdata.to_numpy()
if len(mdata) != len(dat):
raise ValueError('mdata must have same length then data')
mdata = pd.Series(mdata, index=dat.index)
elif method == 'ontime':
pass # reindex will do the job later
elif method in ['left-open', 'right-open']:
mdata = mdata.reindex(dat.index.union(mdata.index))
# -->)[t0-->)[t1--> (ffill)
if method == 'right-open':
mdata = mdata.ffill()
# <--t0](<--t1](<-- (bfill)
if method == 'left-open':
mdata = mdata.bfill()
else:
raise ValueError(method)
mask = mdata == mflag
mask = mask.reindex(dat.index).fillna(False)
flagger = flagger.setFlags(field=field, loc=mask, **kwargs)
return data, flagger
......
......@@ -6,14 +6,7 @@ import numpy as np
import pandas as pd
import dios
from saqc.funcs.functions import (
flagRange,
flagSesonalRange,
forceFlags,
clearFlags,
flagIsolated,
flagCrossScoring
)
from saqc.funcs.functions import *
from test.common import initData, TESTFLAGGER
......@@ -97,6 +90,7 @@ def test_flagIsolated(data, flagger):
)
assert flagger_result.isFlagged(field)[[3, 5, 13, 14]].all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
@pytest.mark.parametrize("dat", [pytest.lazy_fixture("course_2")])
def test_flagCrossScoring(dat, flagger):
......@@ -115,3 +109,85 @@ def test_flagCrossScoring(dat, flagger):
for field in fields:
isflagged = flagger_result.isFlagged(field)
assert isflagged[characteristics['raise']].all()
@pytest.mark.parametrize("flagger", TESTFLAGGER)
def test_flagManual(data, flagger):
field = data.columns[0]
flagger = flagger.initFlags(data)
args = data, field, flagger
dat = data[field]
mdata = pd.Series('lala', index=dat.index)
index_exp = mdata.iloc[[10, 33, 200, 500]].index
mdata.iloc[[101, 133, 220, 506]] = 'b'
mdata.loc[index_exp] = 'a'
shrinked = mdata.loc[index_exp.union(mdata.iloc[[1, 2, 3, 4, 600, 601]].index)]
kwargs_list = [
dict(mdata=mdata, mflag='a', method='plain'),
dict(mdata=mdata.to_list(), mflag='a', method='plain'),
dict(mdata=mdata, mflag='a', method='ontime'),
dict(mdata=shrinked, mflag='a', method='ontime'),
]
for kw in kwargs_list:
_, fl = flagManual(*args, **kw)
isflagged = fl.isFlagged(field)
assert isflagged[isflagged].index.equals(index_exp)
# flag not exist in mdata
_, fl = flagManual(*args, mdata=mdata, mflag="i do not exist", method='ontime')
isflagged = fl.isFlagged(field)
assert isflagged[isflagged].index.equals(pd.DatetimeIndex([]))
# check right-open / ffill
index = pd.date_range(start="2016-01-01", end="2018-12-31", periods=11)
mdata = pd.Series(0, index=index)
mdata.loc[index[[1, 5, 6, 7, 9, 10]]] = 1
# >>> mdata
# 2016-01-01 00:00:00 0
# 2016-04-19 12:00:00 1
# 2016-08-07 00:00:00 0
# 2016-11-24 12:00:00 0
# 2017-03-14 00:00:00 0
# 2017-07-01 12:00:00 1
# 2017-10-19 00:00:00 1
# 2018-02-05 12:00:00 1
# 2018-05-26 00:00:00 0
# 2018-09-12 12:00:00 1
# 2018-12-31 00:00:00 1
# dtype: int64
# add first and last index from data
expected = mdata.copy()
expected.loc[dat.index[0]] = 0
expected.loc[dat.index[-1]] = 1
expected = expected.astype(bool)
_, fl = flagManual(*args, mdata=mdata, mflag=1, method='right-open')
isflagged = fl.isFlagged(field)
last = expected.index[0]
for curr in expected.index[1:]:
expected_value = mdata[last]
# datetime slicing is inclusive !
i = isflagged[last:curr].index[:-1]
chunk = isflagged.loc[i]
assert (chunk == expected_value).all()
last = curr
# check last value
assert isflagged[curr] == expected[curr]
# check left-open / bfill
expected.loc[dat.index[-1]] = 0 # this time the last is False
_, fl = flagManual(*args, mdata=mdata, mflag=1, method='left-open')
isflagged = fl.isFlagged(field)
last = expected.index[0]
assert isflagged[last] == expected[last]
for curr in expected.index[1:]:
expected_value = mdata[curr]
# datetime slicing is inclusive !
i = isflagged[last:curr].index[1:]
chunk = isflagged.loc[i]
assert (chunk == expected_value).all()
last = curr
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment