Skip to content
Snippets Groups Projects
Commit 9e781d1e authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

lost

parents 3db6a7d6 a9e269f5
No related branches found
No related tags found
No related merge requests found
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# import funcs
import numpy as np
class Fields:
VARNAME = "headerout"
......@@ -18,11 +15,4 @@ class Params:
FLAGPERIOD = "flag_period"
FLAGVALUES = "flag_values"
FLAG = "flag"
# FUNCMAP = {
# "manflag": funcs.flagManual,
# "mad": funcs.flagMad,
# "constant": funcs.flagConstant,
# "generic": funcs.flagGeneric
# }
PLOT = "plot"
......@@ -3,39 +3,46 @@
import numpy as np
import pandas as pd
import matplotlib as mpl
from warnings import warn
from config import Fields, Params
from funcs import flagDispatch
from dsl import parseFlag
from flagger import PositionalFlagger, BaseFlagger
class FlagParams:
FLAG = "flag"
PERIODE = "flag_period"
VALUES = "flag_values"
ASSIGN = "assign"
def flagWindow(flagger, flags, mask, direction='fw', window=0, **kwargs) -> pd.Series:
fw = False
bw = False
f = flagger.isFlagged(flags) & mask
if isinstance(window, int):
x = f.rolling(window=window + 1).sum()
if direction in ['fw', 'both']:
fw = x.fillna(method='bfill').astype(bool)
if direction in ['bw', 'both']:
bw = x.shift(-window).fillna(method='bfill').astype(bool)
else:
# time-based windows
if direction in ['bw', 'both']:
raise NotImplementedError
fw = f.rolling(window=window, closed='both').sum().astype(bool)
fmask = bw | fw
flags[fmask] = flagger.setFlag(flags[fmask], **kwargs)
return flags
def flagPeriod(flagger: BaseFlagger, flags: pd.Series, freq: str) -> pd.Series:
out = flags.copy()
for start in flags.index[flagger.isFlagged(flags)]:
stop = start + pd.to_timedelta(freq)
out.loc[start:stop] = flagger.setFlag(flags.loc[start:stop],
*np.atleast_1d(flags.loc[start]))
return out
def flagPeriod(flagger, flags, mask=True, flag_period=0, **kwargs) -> pd.Series:
return flagWindow(flagger, flags, mask, 'fw', window=flag_period, **kwargs)
def flagNext(flagger: BaseFlagger, flags: pd.Series, n: int) -> pd.Series:
idx = np.where(flagger.isFlagged(flags))[0]
for nn in range(1, n + 1):
nn_idx = np.clip(idx + nn, a_min=None, a_max=len(flags) - 1)
nn_idx_unflagged = nn_idx[~flagger.isFlagged(flags.iloc[nn_idx])]
flags.loc[flags.index[nn_idx_unflagged]] = flags.iloc[nn_idx_unflagged - nn].values
return flags
def flagNext(flagger, flags, mask=True, flag_values=0, **kwargs) -> pd.Series:
return flagWindow(flagger, flags, mask, 'fw', window=flag_values, **kwargs)
def runner(meta, flagger, data, flags=None, nodata=np.nan):
plotvars = []
if flags is None:
flags = pd.DataFrame(index=data.index)
......@@ -49,7 +56,7 @@ def runner(meta, flagger, data, flags=None, nodata=np.nan):
for idx, configrow in meta.iterrows():
varname, _, _, assign = configrow[fields]
if varname not in flags and \
(varname in data or varname not in data and assign is True):
(varname in data or varname not in data and assign is True):
col_flags = flagger.initFlags(pd.DataFrame(index=data.index,
columns=[varname]))
flags = col_flags if flags.empty else flags.join(col_flags)
......@@ -77,44 +84,145 @@ def runner(meta, flagger, data, flags=None, nodata=np.nan):
if varname not in data and varname not in flags:
continue
dchunk = data.loc[start_date:end_date]
dchunk = data.loc[start_date:end_date].copy()
if dchunk.empty:
continue
fchunk = flags.loc[start_date:end_date]
fchunk = flags.loc[start_date:end_date].copy()
try:
dchunk, fchunk = flagDispatch(func_name,
dchunk, fchunk, varname,
flagger, nodata=nodata,
**flag_params)
dchunk, ffchunk = flagDispatch(func_name,
dchunk, fchunk.copy(), varname,
flagger, nodata=nodata,
**flag_params)
except NameError:
raise NameError(
f"function name {func_name} is not definied (variable '{varname}, 'line: {idx + 1})")
# flag a timespan after the condition is met,
# duration given in 'flag_period'
flag_period = flag_params.pop(Params.FLAGPERIOD, None)
if flag_period:
fchunk[varname] = flagPeriod(flagger,
fchunk[varname],
data.index.freq)
# flag a certain amount of values after condition is met,
# number given in 'flag_values'
flag_values = flag_params.pop(Params.FLAGVALUES, None)
if flag_values:
fchunk[varname] = flagNext(flagger,
fchunk[varname],
flag_values)
old = flagger.isFlagged(fchunk[varname])
new = flagger.isFlagged(ffchunk[varname])
mask = old != new
# flag a timespan after the condition is met
if Params.FLAGPERIOD in flag_params:
ffchunk[varname] = flagPeriod(flagger, ffchunk[varname], mask, **flag_params)
# flag a certain amount of values after condition is met
if Params.FLAGVALUES in flag_params:
ffchunk[varname] = flagNext(flagger, ffchunk[varname], mask, **flag_params)
if Params.FLAGPERIOD in flag_params or Params.FLAGVALUES in flag_params:
# hack as assignments above don't preserve categorical type
ffchunk = ffchunk.astype({
c: flagger.flags for c in ffchunk.columns if flagger.flag_fields[0] in c})
if flag_params.get(Params.PLOT, False):
plotvars.append(varname)
plot(dchunk, ffchunk, mask, varname, flagger, title=flag_test)
data.loc[start_date:end_date] = dchunk
flags[start_date:end_date] = fchunk.squeeze()
flags[start_date:end_date] = ffchunk.squeeze()
flagger.nextTest()
# plot all together
if plotvars:
plot(data, flags, True, set(plotvars), flagger)
return data, flags
def plot(data, flags, flagmask, varname, flagger, interactive_backend=True, title="Data Plot"):
# the flagmask is True for flags to be shown False otherwise
if not interactive_backend:
# Import plot libs without interactivity, if not needed. This ensures that this can
# produce an plot.png even if tkinter is not installed. E.g. if one want to run this
# on machines without X-Server aka. graphic interface.
mpl.use('Agg')
else:
mpl.use('TkAgg')
from matplotlib import pyplot as plt
# needed for datetime conversion
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
if not isinstance(varname, (list, set)):
varname = set([varname])
tmp = []
for var in varname:
if var not in data.columns:
warn(f"Cannot plot column '{var}' that is not present in data.", UserWarning)
else:
tmp.append(var)
if tmp:
varname = tmp
else:
return
def plot_vline(plt, points, color='blue'):
# workaround for ax.vlines() as this work unexpected
for point in points:
plt.axvline(point, color=color, linestyle=':')
def _plot(varname, ax):
x = data.index
y = data[varname]
flags_ = flags[varname]
nrofflags = len(flagger.flags.categories)
ax.plot(x, y, '-',markersize=1, color='silver')
if nrofflags == 3:
colors = {0:'silver', 1:'lime', 2:'red'}
if nrofflags == 4:
colors = {0:'silver', 1:'lime', 2:'yellow', 3:'red'}
# plot (all) data in silver
ax.plot(x, y, '-', color='silver', label='data')
# plot (all) missing data in silver
nans = y.isna()
ylim = plt.ylim()
flagged = flagger.isFlagged(flags_)
idx = y.index[nans & ~flagged]
# ax.vlines(idx, *ylim, linestyles=':', color='silver', label="missing")
plot_vline(ax, idx, color='silver')
# plot all flagged data in black
ax.plot(x[flagged], y[flagged], '.', color='black', label="flagged by other test")
# plot all flagged missing data (flagged before) in black
idx = y.index[nans & flagged & ~flagmask]
# ax.vlines(idx, *ylim, linestyles=':', color='black')
plot_vline(ax, idx, color='black')
ax.set_ylabel(varname)
# plot currently flagged data in color of flag
for i, f in enumerate(flagger.flags):
if i == 0:
continue
flagged = flagger.isFlagged(flags_, flag=f) & flagmask
label = f"flag: {f}" if i else 'data'
ax.plot(x[flagged], y[flagged], '.', color=colors[i], label=label)
idx = y.index[nans & flagged]
# ax.vlines(idx, *ylim, linestyles=':', color=colors[i])
plot_vline(ax, idx, color=colors[i])
plots = len(varname)
if plots > 1:
fig, axes = plt.subplots(plots, 1, sharex=True)
axes[0].set_title(title)
for i, v in enumerate(varname):
_plot(v, axes[i])
else:
fig, ax = plt.subplots()
plt.title(title)
_plot(varname.pop(), ax)
plt.xlabel('time')
# dummy plot for label `missing` see plot_vline for more info
plt.plot([], [], ':', color='silver', label="missing data")
plt.legend()
plt.show()
def prepareMeta(meta, data):
# NOTE: an option needed to only pass tests within an file and deduce
# everything else from data
......@@ -150,7 +258,7 @@ def readData(fname, index_col, nans):
if __name__ == "__main__":
from flagger import DmpFlagger
datafname = "resources/data.csv"
metafname = "resources/meta.csv"
......@@ -159,5 +267,5 @@ if __name__ == "__main__":
nans=["-9999", "-9999.0"])
meta = prepareMeta(pd.read_csv(metafname), data)
flagger = PositionalFlagger()
flagger = DmpFlagger()
pdata, pflags = runner(meta, flagger, data)
......@@ -14,8 +14,11 @@ class Flags(pd.CategoricalDtype):
assert len(flags) > 2
super().__init__(flags, ordered=True)
def unflagged(self):
return self[0]
def min(self):
return self[2]
return self[1]
def max(self):
return self[-1]
......@@ -39,20 +42,28 @@ class BaseFlagger:
in assignments, especially if a multi column index is used
"""
if flag is None:
flag = self.flags[-1]
flag = self.flags.max()
else:
self._checkFlag(flag)
flags = flags.copy()
flags[flags < flag] = flag
return flags.values
def initFlags(self, data: pd.DataFrame) -> pd.DataFrame:
# out = data.copy() # .astype(self)
out = data.copy().astype(self.flags)
out.loc[:] = self.flags[0]
return out
def isFlagged(self, flags: ArrayLike, flag: T = None) -> ArrayLike:
if flag is None:
return (pd.notnull(flags) & (flags > self.flags[1]))
return pd.notnull(flags) & (flags > self.flags[0])
self._checkFlag(flag)
return flags == flag
def _checkFlag(self, flag):
if flag not in self.flags:
raise ValueError(f"Invalid flag '{flag}'. "
f"Possible choices are {list(self.flags.categories)[1:]}")
def nextTest(self):
pass
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import subprocess
import pandas as pd
from .baseflagger import BaseFlagger
class Keywords:
VERSION = "$version"
class FlagFields:
FLAG = "quality_flag"
CAUSE = "quality_cause"
......@@ -24,6 +29,8 @@ class DmpFlagger(BaseFlagger):
def __init__(self):
super().__init__(FLAGS)
self.flag_fields = [FlagFields.FLAG, FlagFields.CAUSE, FlagFields.COMMENT]
version = subprocess.check_output('git describe --tags --always --dirty'.split())
self.project_version = version.decode().strip()
def initFlags(self, data, **kwargs):
columns = data.columns if isinstance(data, pd.DataFrame) else [data.name]
......@@ -40,15 +47,20 @@ class DmpFlagger(BaseFlagger):
def setFlag(self, flags, flag=None, cause="", comment="", **kwargs):
if not isinstance(flags, pd.DataFrame):
raise TypeError
if flag is None:
flag = self.flags.max()
assert flag in self.flags
else:
self._checkFlag(flag)
flags = self._reduceColumns(flags)
flags.loc[flags[FlagFields.FLAG] < flag, FlagFields.FLAG] = flag
if Keywords.VERSION in comment:
comment = comment.replace(Keywords.VERSION, self.project_version)
for field, f in [(FlagFields.CAUSE, cause), (FlagFields.COMMENT, comment)]:
flags.loc[:, field] = f
flags = self._reduceColumns(flags)
mask = flags[FlagFields.FLAG] < flag
flags.loc[mask, self.flag_fields] = flag, cause, comment
return flags.values
......@@ -58,6 +70,13 @@ class DmpFlagger(BaseFlagger):
return super().isFlagged(flagcol, flag)
def _reduceColumns(self, flags):
if isinstance(flags.columns, pd.MultiIndex):
if set(flags.columns) == set(self.flag_fields):
pass
elif isinstance(flags, pd.DataFrame) \
and isinstance(flags.columns, pd.MultiIndex) \
and (len(flags.columns) == 3):
flags = flags.copy()
flags.columns = flags.columns.get_level_values(ColumnLevels.FLAGS)
else:
raise TypeError
return flags
......@@ -3,7 +3,8 @@
import numpy as np
import pandas as pd
from lib.tools import valueRange, slidingWindowIndices
from lib.tools import valueRange, slidingWindowIndices, inferFrequency
from dsl import evalExpression
from config import Params
......@@ -24,7 +25,6 @@ def flagDispatch(func_name, *args, **kwargs):
def flagGeneric(data, flags, field, flagger, nodata=np.nan, **flag_params):
expression = flag_params[Params.FUNC]
result = evalExpression(expression, flagger,
data, flags, field,
......@@ -47,7 +47,6 @@ def flagGeneric(data, flags, field, flagger, nodata=np.nan, **flag_params):
def flagConstant(data, flags, field, flagger, eps,
length, thmin=None, **kwargs):
datacol = data[field]
flagcol = flags[field]
......@@ -87,46 +86,23 @@ def flagManual(data, flags, field, flagger, **kwargs):
def flagRange(data, flags, field, flagger, min, max, **kwargs):
datacol = data[field].values
mask = (datacol < min) | (datacol >= max)
flags.loc[mask, field] = flagger.setFlag(flags.loc[mask, field], **kwargs)
return data, flags
def flagMad(data, flags, field, flagger, length, z, deriv, **kwargs):
def _flagMad(data: np.ndarray, z: int, deriv: int) -> np.ndarray:
# NOTE: numpy is at least twice as fast as numba.jit(nopython)
# median absolute deviation
for i in range(deriv):
data[i+1:] = np.diff(data[i:])
data[i] = np.nan
median = np.nanmedian(data)
mad = np.nanmedian(np.abs(data-median))
tresh = mad * (z/0.6745)
with np.errstate(invalid="ignore"):
return (data < (median - tresh)) | (data > (median + tresh))
datacol = data[field]
flagcol = flags[field]
values = (datacol
.mask(flagger.isFlagged(flagcol))
.values)
window = pd.to_timedelta(length) - pd.to_timedelta(data.index.freq)
mask = np.zeros_like(values, dtype=bool)
for start_idx, end_idx in slidingWindowIndices(datacol.index, window, "1D"):
mad_flags = _flagMad(values[start_idx:end_idx], z, deriv)
# reset the mask
mask[:] = False
mask[start_idx:end_idx] = mad_flags
flagcol[mask] = flagger.setFlag(flagcol[mask], **kwargs)
flags[field] = flagcol
def flagMad(data, flags, field, flagger, length, z, freq=None, **kwargs):
d = data[field].copy()
freq = inferFrequency(d) if freq is None else freq
if freq is None:
raise ValueError("freqency cannot inferred, provide `freq` as a param to mad().")
winsz = int(pd.to_timedelta(length) / freq)
median = d.rolling(window=winsz, center=True, closed='both').median()
diff = abs(d - median)
mad = diff.rolling(window=winsz, center=True, closed='both').median()
mask = (mad > 0) & (0.6745 * diff > z * mad)
flags.loc[mask, field] = flagger.setFlag(flags.loc[mask, field], **kwargs)
return data, flags
......
......@@ -95,3 +95,8 @@ def broadcastMany(*args: ArrayLike) -> np.ndarray:
target_shape = np.broadcast(*out).shape
return tuple(np.broadcast_to(arr, target_shape) for arr in out)
def inferFrequency(data):
return pd.tseries.frequencies.to_offset(pd.infer_freq(data.index))
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from test.common import *
from test.test_core import *
from test.dsl.test_generic import *
from test.dsl.test_evaluator import *
from test.flagger.test_dmpflagger import *
#! /usr/bin/env python
# -*- coding: utf-8 -*-
......@@ -5,8 +5,8 @@ import pytest
import numpy as np
from test.common import initData
from flagger import SimpleFlagger
from dsl import evalExpression
from flagger.simpleflagger import SimpleFlagger
from dsl.evaluator import evalExpression
def test_evaluationBool():
......
......@@ -7,9 +7,10 @@ import pytest
from test.common import initData
from dsl import evalExpression
from flagger import SimpleFlagger
from funcs.functions import flagGeneric, Params
from dsl.evaluator import evalExpression
from flagger.simpleflagger import SimpleFlagger
from funcs.functions import flagGeneric
from config import Params
def test_ismissing():
......@@ -60,14 +61,14 @@ def test_isflaggedArgument():
flags = flagger.initFlags(data)
var1, var2, *_ = data.columns
flags.iloc[::2, 0] = flagger.setFlag(flags.iloc[::2, 0], -9)
flags.iloc[::2, 0] = flagger.setFlag(flags.iloc[::2, 0], 1)
idx = evalExpression("isflagged({:}, -9)".format(var1),
idx = evalExpression("isflagged({:}, 1)".format(var1),
flagger,
data, flags,
var2)
flagged = flagger.isFlagged(flags[var1], -9)
flagged = flagger.isFlagged(flags[var1], 1)
assert (flagged == idx).all
......
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from ..common import initData, initMeta
from test.common import initData, initMeta
from core import runner
from flagger.dmpflagger import DmpFlagger, FlagFields
......@@ -16,7 +16,7 @@ def test_basic():
metastring = f"""
headerout, Flag_1, Flag_2
{var1},"generic, {{func: this < {var1mean}}}","range, {{min: 10, max: 20, comment: saqc}}"
{var1},"generic, {{func: this < {var1mean}, flag: DOUBTFUL}}","range, {{min: 10, max: 20, comment: saqc}}"
{var2},"generic, {{func: this > {var2mean}, cause: error}}"
"""
meta = initMeta(metastring, data)
......@@ -28,10 +28,11 @@ def test_basic():
flags11 = flags.loc[col1 < var1mean, (var1, FlagFields.FLAG)]
flags12 = flags.loc[((col1 < 10) | (col1 > 20)), (var1, FlagFields.COMMENT)]
# flags12 = flags.loc[(col1 >= var1mean) & ((col1 < 10) | (col1 > 20)), (var1, FlagFields.COMMENT)]
flags21 = flags.loc[col2 > var2mean, (var2, FlagFields.CAUSE)]
assert (flags11 >= flagger.flags.min()).all()
assert (flags11 > flagger.flags.min()).all()
assert (flags12 == "saqc").all()
assert (flags21 == "error").all()
......
......@@ -7,7 +7,9 @@ import pandas as pd
from core import runner, flagNext, flagPeriod, prepareMeta
from config import Fields
from flagger import SimpleFlagger, DmpFlagger, PositionalFlagger
from flagger.simpleflagger import SimpleFlagger
from flagger.dmpflagger import DmpFlagger
from flagger.positionalflagger import PositionalFlagger
from test.common import initData
......@@ -125,7 +127,7 @@ def test_flagNext(flagger):
flags.iloc[idx] = flagger.setFlag(flags.iloc[idx])
n = 4
fflags = flagNext(flagger, flags.copy(), 4)
fflags = flagNext(flagger, flags.copy(), flag_values=4)
result_idx = np.unique(np.where(flagger.isFlagged(fflags))[0])
expected_idx = np.arange(min(idx), max(idx) + n + 1)
assert (result_idx == expected_idx).all()
......@@ -142,10 +144,11 @@ def test_flagPeriod(flagger):
idx = [0, 1, 2]
flags.iloc[idx] = flagger.setFlag(flags.iloc[idx])
tdelta = pd.to_timedelta("4h")
flags = flagPeriod(flagger, flags.copy(), tdelta)
period = '4h'
flags = flagPeriod(flagger, flags.copy(), flag_period=period)
expected_dates = set(flags[flagger.isFlagged(flags)].index)
tdelta = pd.to_timedelta(period)
dates = set()
for start in flags.index[idx]:
stop = start + tdelta
......@@ -159,7 +162,6 @@ if __name__ == "__main__":
# NOTE: PositionalFlagger is currently broken, going to fix it when needed
# for flagger in [SimpleFlagger, PositionalFlagger, DmpFlagger]:
for flagger in [SimpleFlagger(), DmpFlagger()]:
# for flagger in [DmpFlagger()]:
test_temporalPartitioning(flagger)
test_flagNext(flagger)
test_flagPeriod(flagger)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment