Skip to content
Snippets Groups Projects
Commit 5b87cff6 authored by David Schäfer's avatar David Schäfer
Browse files

timeseries frequency not longer required

parent cabbd1bc
No related branches found
No related tags found
No related merge requests found
......@@ -19,12 +19,13 @@ class FlagParams:
ASSIGN = "assign"
def _inferFrequency(data):
return pd.tseries.frequencies.to_offset(pd.infer_freq(data.index))
def _periodToTicks(period, freq):
return int(ceil(pd.to_timedelta(period)/pd.to_timedelta(freq)))
def flagPeriod(flagger: BaseFlagger, flags: pd.Series, freq: str) -> pd.Series:
out = flags.copy()
for start in flags.index[flagger.isFlagged(flags)]:
stop = start + pd.to_timedelta(freq)
out.loc[start:stop] = flagger.setFlag(flags.loc[start:stop],
*np.atleast_1d(flags.loc[start]))
return out
def flagNext(flagger: BaseFlagger, flags: pd.Series, n: int) -> pd.Series:
......@@ -41,23 +42,18 @@ def runner(meta, flagger, data, flags=None, nodata=np.nan):
if flags is None:
flags = pd.DataFrame(index=data.index)
# NOTE:
# We need an index frequency in order to calculate ticks
# from given periods further down the road. Maybe this
# restriction should only be enforced when we really
# need a time series...
data.index.freq = _inferFrequency(data)
if not data.index.freq:
raise TypeError("cannot infer time frequency from dataset")
# the required meta data columns
fields = [Fields.VARNAME, Fields.STARTDATE, Fields.ENDDATE, Fields.ASSIGN]
# NOTE:
# get to know every variable from meta
# should go into a separate function
for idx, configrow in meta.iterrows():
varname, _, _, assign = configrow[fields]
if varname not in flags and (varname in data or varname not in data and assign is True):
col_flags = flagger.initFlags(pd.DataFrame(index=data.index, columns=[varname]))
if varname not in flags and \
(varname in data or varname not in data and assign is True):
col_flags = flagger.initFlags(pd.DataFrame(index=data.index,
columns=[varname]))
flags = col_flags if flags.empty else flags.join(col_flags)
# NOTE:
......@@ -102,8 +98,9 @@ def runner(meta, flagger, data, flags=None, nodata=np.nan):
# duration given in 'flag_period'
flag_period = flag_params.pop(Params.FLAGPERIOD, None)
if flag_period:
flag_params[Params.FLAGVALUES] = _periodToTicks(
flag_period, data.index.freq)
fchunk[varname] = flagPeriod(flagger,
fchunk[varname],
data.index.freq)
# flag a certain amount of values after condition is met,
# number given in 'flag_values'
......
......@@ -5,7 +5,7 @@ import pytest
import numpy as np
import pandas as pd
from core import runner, flagNext, prepareMeta
from core import runner, flagNext, flagPeriod, prepareMeta
from config import Fields
from flagger import SimpleFlagger, DmpFlagger, PositionalFlagger
from test.common import initData
......@@ -131,6 +131,29 @@ def test_flagNext(flagger):
assert (result_idx == expected_idx).all()
@pytest.mark.parametrize("flagger", TESTFLAGGERS)
def test_flagPeriod(flagger):
"""
Test if the flagNext functionality works as expected
"""
data = initData().iloc[:, 1]
flags = flagger.initFlags(data)
idx = [0, 1, 2]
flags.iloc[idx] = flagger.setFlag(flags.iloc[idx])
tdelta = pd.to_timedelta("4h")
flags = flagPeriod(flagger, flags.copy(), tdelta)
expected_dates = set(flags[flagger.isFlagged(flags)].index)
dates = set()
for start in flags.index[idx]:
stop = start + tdelta
dates = dates | set(flags[start:stop].index)
assert expected_dates == dates
if __name__ == "__main__":
# NOTE: PositionalFlagger is currently broken, going to fix it when needed
......@@ -139,6 +162,7 @@ if __name__ == "__main__":
# for flagger in [DmpFlagger()]:
test_temporalPartitioning(flagger)
test_flagNext(flagger)
test_flagPeriod(flagger)
test_missingConfig(flagger)
test_missingVariable(flagger)
test_assignVariable(flagger)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment