diff --git a/config.py b/config.py index 340db537db5546c6555186a85dfe42f0cfc221c6..92db940d1ff7c660277cdc1e116ae04e531d6222 100644 --- a/config.py +++ b/config.py @@ -1,9 +1,6 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- -# import funcs -import numpy as np - class Fields: VARNAME = "headerout" @@ -18,11 +15,4 @@ class Params: FLAGPERIOD = "flag_period" FLAGVALUES = "flag_values" FLAG = "flag" - - -# FUNCMAP = { -# "manflag": funcs.flagManual, -# "mad": funcs.flagMad, -# "constant": funcs.flagConstant, -# "generic": funcs.flagGeneric -# } + PLOT = "plot" diff --git a/core.py b/core.py index d2c127fd5e3cb968e0d22a334bd384e3c21b94e2..f3712adabe63e4f97c251c0d016a1cb872694bd6 100644 --- a/core.py +++ b/core.py @@ -3,39 +3,46 @@ import numpy as np import pandas as pd +import matplotlib as mpl +from warnings import warn from config import Fields, Params from funcs import flagDispatch from dsl import parseFlag -from flagger import PositionalFlagger, BaseFlagger -class FlagParams: - FLAG = "flag" - PERIODE = "flag_period" - VALUES = "flag_values" - ASSIGN = "assign" +def flagWindow(flagger, flags, mask, direction='fw', window=0, **kwargs) -> pd.Series: + fw = False + bw = False + f = flagger.isFlagged(flags) & mask + + if isinstance(window, int): + x = f.rolling(window=window + 1).sum() + if direction in ['fw', 'both']: + fw = x.fillna(method='bfill').astype(bool) + if direction in ['bw', 'both']: + bw = x.shift(-window).fillna(method='bfill').astype(bool) + else: + # time-based windows + if direction in ['bw', 'both']: + raise NotImplementedError + fw = f.rolling(window=window, closed='both').sum().astype(bool) + + fmask = bw | fw + flags[fmask] = flagger.setFlag(flags[fmask], **kwargs) + return flags -def flagPeriod(flagger: BaseFlagger, flags: pd.Series, freq: str) -> pd.Series: - out = flags.copy() - for start in flags.index[flagger.isFlagged(flags)]: - stop = start + pd.to_timedelta(freq) - out.loc[start:stop] = flagger.setFlag(flags.loc[start:stop], - *np.atleast_1d(flags.loc[start])) - return out +def flagPeriod(flagger, flags, mask=True, flag_period=0, **kwargs) -> pd.Series: + return flagWindow(flagger, flags, mask, 'fw', window=flag_period, **kwargs) -def flagNext(flagger: BaseFlagger, flags: pd.Series, n: int) -> pd.Series: - idx = np.where(flagger.isFlagged(flags))[0] - for nn in range(1, n + 1): - nn_idx = np.clip(idx + nn, a_min=None, a_max=len(flags) - 1) - nn_idx_unflagged = nn_idx[~flagger.isFlagged(flags.iloc[nn_idx])] - flags.loc[flags.index[nn_idx_unflagged]] = flags.iloc[nn_idx_unflagged - nn].values - return flags +def flagNext(flagger, flags, mask=True, flag_values=0, **kwargs) -> pd.Series: + return flagWindow(flagger, flags, mask, 'fw', window=flag_values, **kwargs) def runner(meta, flagger, data, flags=None, nodata=np.nan): + plotvars = [] if flags is None: flags = pd.DataFrame(index=data.index) @@ -49,7 +56,7 @@ def runner(meta, flagger, data, flags=None, nodata=np.nan): for idx, configrow in meta.iterrows(): varname, _, _, assign = configrow[fields] if varname not in flags and \ - (varname in data or varname not in data and assign is True): + (varname in data or varname not in data and assign is True): col_flags = flagger.initFlags(pd.DataFrame(index=data.index, columns=[varname])) flags = col_flags if flags.empty else flags.join(col_flags) @@ -77,44 +84,145 @@ def runner(meta, flagger, data, flags=None, nodata=np.nan): if varname not in data and varname not in flags: continue - dchunk = data.loc[start_date:end_date] + dchunk = data.loc[start_date:end_date].copy() if dchunk.empty: continue - fchunk = flags.loc[start_date:end_date] + fchunk = flags.loc[start_date:end_date].copy() try: - dchunk, fchunk = flagDispatch(func_name, - dchunk, fchunk, varname, - flagger, nodata=nodata, - **flag_params) + dchunk, ffchunk = flagDispatch(func_name, + dchunk, fchunk.copy(), varname, + flagger, nodata=nodata, + **flag_params) except NameError: raise NameError( f"function name {func_name} is not definied (variable '{varname}, 'line: {idx + 1})") - # flag a timespan after the condition is met, - # duration given in 'flag_period' - flag_period = flag_params.pop(Params.FLAGPERIOD, None) - if flag_period: - fchunk[varname] = flagPeriod(flagger, - fchunk[varname], - data.index.freq) - - # flag a certain amount of values after condition is met, - # number given in 'flag_values' - flag_values = flag_params.pop(Params.FLAGVALUES, None) - if flag_values: - fchunk[varname] = flagNext(flagger, - fchunk[varname], - flag_values) + old = flagger.isFlagged(fchunk[varname]) + new = flagger.isFlagged(ffchunk[varname]) + mask = old != new + + # flag a timespan after the condition is met + if Params.FLAGPERIOD in flag_params: + ffchunk[varname] = flagPeriod(flagger, ffchunk[varname], mask, **flag_params) + + # flag a certain amount of values after condition is met + if Params.FLAGVALUES in flag_params: + ffchunk[varname] = flagNext(flagger, ffchunk[varname], mask, **flag_params) + + if Params.FLAGPERIOD in flag_params or Params.FLAGVALUES in flag_params: + # hack as assignments above don't preserve categorical type + ffchunk = ffchunk.astype({ + c: flagger.flags for c in ffchunk.columns if flagger.flag_fields[0] in c}) + + if flag_params.get(Params.PLOT, False): + plotvars.append(varname) + plot(dchunk, ffchunk, mask, varname, flagger, title=flag_test) data.loc[start_date:end_date] = dchunk - flags[start_date:end_date] = fchunk.squeeze() + flags[start_date:end_date] = ffchunk.squeeze() flagger.nextTest() + + # plot all together + if plotvars: + plot(data, flags, True, set(plotvars), flagger) + return data, flags +def plot(data, flags, flagmask, varname, flagger, interactive_backend=True, title="Data Plot"): + # the flagmask is True for flags to be shown False otherwise + if not interactive_backend: + # Import plot libs without interactivity, if not needed. This ensures that this can + # produce an plot.png even if tkinter is not installed. E.g. if one want to run this + # on machines without X-Server aka. graphic interface. + mpl.use('Agg') + else: + mpl.use('TkAgg') + from matplotlib import pyplot as plt + # needed for datetime conversion + from pandas.plotting import register_matplotlib_converters + register_matplotlib_converters() + + if not isinstance(varname, (list, set)): + varname = set([varname]) + + tmp = [] + for var in varname: + if var not in data.columns: + warn(f"Cannot plot column '{var}' that is not present in data.", UserWarning) + else: + tmp.append(var) + if tmp: + varname = tmp + else: + return + + def plot_vline(plt, points, color='blue'): + # workaround for ax.vlines() as this work unexpected + for point in points: + plt.axvline(point, color=color, linestyle=':') + + def _plot(varname, ax): + x = data.index + y = data[varname] + flags_ = flags[varname] + nrofflags = len(flagger.flags.categories) + ax.plot(x, y, '-',markersize=1, color='silver') + if nrofflags == 3: + colors = {0:'silver', 1:'lime', 2:'red'} + if nrofflags == 4: + colors = {0:'silver', 1:'lime', 2:'yellow', 3:'red'} + + # plot (all) data in silver + ax.plot(x, y, '-', color='silver', label='data') + # plot (all) missing data in silver + nans = y.isna() + ylim = plt.ylim() + flagged = flagger.isFlagged(flags_) + idx = y.index[nans & ~flagged] + # ax.vlines(idx, *ylim, linestyles=':', color='silver', label="missing") + plot_vline(ax, idx, color='silver') + + # plot all flagged data in black + ax.plot(x[flagged], y[flagged], '.', color='black', label="flagged by other test") + # plot all flagged missing data (flagged before) in black + idx = y.index[nans & flagged & ~flagmask] + # ax.vlines(idx, *ylim, linestyles=':', color='black') + plot_vline(ax, idx, color='black') + ax.set_ylabel(varname) + + # plot currently flagged data in color of flag + for i, f in enumerate(flagger.flags): + if i == 0: + continue + flagged = flagger.isFlagged(flags_, flag=f) & flagmask + label = f"flag: {f}" if i else 'data' + ax.plot(x[flagged], y[flagged], '.', color=colors[i], label=label) + idx = y.index[nans & flagged] + # ax.vlines(idx, *ylim, linestyles=':', color=colors[i]) + plot_vline(ax, idx, color=colors[i]) + + plots = len(varname) + if plots > 1: + fig, axes = plt.subplots(plots, 1, sharex=True) + axes[0].set_title(title) + for i, v in enumerate(varname): + _plot(v, axes[i]) + else: + fig, ax = plt.subplots() + plt.title(title) + _plot(varname.pop(), ax) + + plt.xlabel('time') + # dummy plot for label `missing` see plot_vline for more info + plt.plot([], [], ':', color='silver', label="missing data") + plt.legend() + plt.show() + + def prepareMeta(meta, data): # NOTE: an option needed to only pass tests within an file and deduce # everything else from data @@ -150,7 +258,7 @@ def readData(fname, index_col, nans): if __name__ == "__main__": - + from flagger import DmpFlagger datafname = "resources/data.csv" metafname = "resources/meta.csv" @@ -159,5 +267,5 @@ if __name__ == "__main__": nans=["-9999", "-9999.0"]) meta = prepareMeta(pd.read_csv(metafname), data) - flagger = PositionalFlagger() + flagger = DmpFlagger() pdata, pflags = runner(meta, flagger, data) diff --git a/flagger/baseflagger.py b/flagger/baseflagger.py index 37907eb50baa86354aff9dbe6f77e344c76f4461..3c8a9312154896721783f96341b5cb4892447bef 100644 --- a/flagger/baseflagger.py +++ b/flagger/baseflagger.py @@ -14,8 +14,11 @@ class Flags(pd.CategoricalDtype): assert len(flags) > 2 super().__init__(flags, ordered=True) + def unflagged(self): + return self[0] + def min(self): - return self[2] + return self[1] def max(self): return self[-1] @@ -39,20 +42,28 @@ class BaseFlagger: in assignments, especially if a multi column index is used """ if flag is None: - flag = self.flags[-1] + flag = self.flags.max() + else: + self._checkFlag(flag) + flags = flags.copy() flags[flags < flag] = flag return flags.values def initFlags(self, data: pd.DataFrame) -> pd.DataFrame: - # out = data.copy() # .astype(self) out = data.copy().astype(self.flags) out.loc[:] = self.flags[0] return out def isFlagged(self, flags: ArrayLike, flag: T = None) -> ArrayLike: if flag is None: - return (pd.notnull(flags) & (flags > self.flags[1])) + return pd.notnull(flags) & (flags > self.flags[0]) + self._checkFlag(flag) return flags == flag + def _checkFlag(self, flag): + if flag not in self.flags: + raise ValueError(f"Invalid flag '{flag}'. " + f"Possible choices are {list(self.flags.categories)[1:]}") + def nextTest(self): pass diff --git a/flagger/dmpflagger.py b/flagger/dmpflagger.py index a518df7552f0fe68bbc5c3223677eb50d944c648..06fc567d953350bab0b4a1e70e428b171c27686a 100644 --- a/flagger/dmpflagger.py +++ b/flagger/dmpflagger.py @@ -1,10 +1,15 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- +import subprocess import pandas as pd from .baseflagger import BaseFlagger +class Keywords: + VERSION = "$version" + + class FlagFields: FLAG = "quality_flag" CAUSE = "quality_cause" @@ -24,6 +29,8 @@ class DmpFlagger(BaseFlagger): def __init__(self): super().__init__(FLAGS) self.flag_fields = [FlagFields.FLAG, FlagFields.CAUSE, FlagFields.COMMENT] + version = subprocess.check_output('git describe --tags --always --dirty'.split()) + self.project_version = version.decode().strip() def initFlags(self, data, **kwargs): columns = data.columns if isinstance(data, pd.DataFrame) else [data.name] @@ -40,15 +47,20 @@ class DmpFlagger(BaseFlagger): def setFlag(self, flags, flag=None, cause="", comment="", **kwargs): + if not isinstance(flags, pd.DataFrame): + raise TypeError + if flag is None: flag = self.flags.max() - assert flag in self.flags + else: + self._checkFlag(flag) - flags = self._reduceColumns(flags) - flags.loc[flags[FlagFields.FLAG] < flag, FlagFields.FLAG] = flag + if Keywords.VERSION in comment: + comment = comment.replace(Keywords.VERSION, self.project_version) - for field, f in [(FlagFields.CAUSE, cause), (FlagFields.COMMENT, comment)]: - flags.loc[:, field] = f + flags = self._reduceColumns(flags) + mask = flags[FlagFields.FLAG] < flag + flags.loc[mask, self.flag_fields] = flag, cause, comment return flags.values @@ -58,6 +70,13 @@ class DmpFlagger(BaseFlagger): return super().isFlagged(flagcol, flag) def _reduceColumns(self, flags): - if isinstance(flags.columns, pd.MultiIndex): + if set(flags.columns) == set(self.flag_fields): + pass + elif isinstance(flags, pd.DataFrame) \ + and isinstance(flags.columns, pd.MultiIndex) \ + and (len(flags.columns) == 3): + flags = flags.copy() flags.columns = flags.columns.get_level_values(ColumnLevels.FLAGS) + else: + raise TypeError return flags diff --git a/funcs/functions.py b/funcs/functions.py index 1ddd95af41aa310300531b937e15eb3f93eb6fc9..92ba4681020f502150699173da4d01b65f040855 100644 --- a/funcs/functions.py +++ b/funcs/functions.py @@ -3,7 +3,8 @@ import numpy as np import pandas as pd -from lib.tools import valueRange, slidingWindowIndices + +from lib.tools import valueRange, slidingWindowIndices, inferFrequency from dsl import evalExpression from config import Params @@ -24,7 +25,6 @@ def flagDispatch(func_name, *args, **kwargs): def flagGeneric(data, flags, field, flagger, nodata=np.nan, **flag_params): - expression = flag_params[Params.FUNC] result = evalExpression(expression, flagger, data, flags, field, @@ -47,7 +47,6 @@ def flagGeneric(data, flags, field, flagger, nodata=np.nan, **flag_params): def flagConstant(data, flags, field, flagger, eps, length, thmin=None, **kwargs): - datacol = data[field] flagcol = flags[field] @@ -87,46 +86,23 @@ def flagManual(data, flags, field, flagger, **kwargs): def flagRange(data, flags, field, flagger, min, max, **kwargs): - datacol = data[field].values mask = (datacol < min) | (datacol >= max) flags.loc[mask, field] = flagger.setFlag(flags.loc[mask, field], **kwargs) return data, flags -def flagMad(data, flags, field, flagger, length, z, deriv, **kwargs): - - def _flagMad(data: np.ndarray, z: int, deriv: int) -> np.ndarray: - # NOTE: numpy is at least twice as fast as numba.jit(nopython) - # median absolute deviation - for i in range(deriv): - data[i+1:] = np.diff(data[i:]) - data[i] = np.nan - median = np.nanmedian(data) - mad = np.nanmedian(np.abs(data-median)) - tresh = mad * (z/0.6745) - with np.errstate(invalid="ignore"): - return (data < (median - tresh)) | (data > (median + tresh)) - - datacol = data[field] - flagcol = flags[field] - - values = (datacol - .mask(flagger.isFlagged(flagcol)) - .values) - - window = pd.to_timedelta(length) - pd.to_timedelta(data.index.freq) - mask = np.zeros_like(values, dtype=bool) - - for start_idx, end_idx in slidingWindowIndices(datacol.index, window, "1D"): - mad_flags = _flagMad(values[start_idx:end_idx], z, deriv) - # reset the mask - mask[:] = False - mask[start_idx:end_idx] = mad_flags - flagcol[mask] = flagger.setFlag(flagcol[mask], **kwargs) - - flags[field] = flagcol - +def flagMad(data, flags, field, flagger, length, z, freq=None, **kwargs): + d = data[field].copy() + freq = inferFrequency(d) if freq is None else freq + if freq is None: + raise ValueError("freqency cannot inferred, provide `freq` as a param to mad().") + winsz = int(pd.to_timedelta(length) / freq) + median = d.rolling(window=winsz, center=True, closed='both').median() + diff = abs(d - median) + mad = diff.rolling(window=winsz, center=True, closed='both').median() + mask = (mad > 0) & (0.6745 * diff > z * mad) + flags.loc[mask, field] = flagger.setFlag(flags.loc[mask, field], **kwargs) return data, flags diff --git a/lib/tools.py b/lib/tools.py index 23bb2548e7493e485024c8ae53f7c877d1fb80fa..5377baaa2c90cf48c0a99f5581e9bbfe81f505bd 100644 --- a/lib/tools.py +++ b/lib/tools.py @@ -95,3 +95,8 @@ def broadcastMany(*args: ArrayLike) -> np.ndarray: target_shape = np.broadcast(*out).shape return tuple(np.broadcast_to(arr, target_shape) for arr in out) + +def inferFrequency(data): + return pd.tseries.frequencies.to_offset(pd.infer_freq(data.index)) + + diff --git a/test/__init__.py b/test/__init__.py index e1f7e6e8bf04d3d59cff68b83d91fbf791f0faa2..25d6b51a610480acedc679e13118cf5420e7028c 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -1,2 +1,8 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- + +from test.common import * +from test.test_core import * +from test.dsl.test_generic import * +from test.dsl.test_evaluator import * +from test.flagger.test_dmpflagger import * diff --git a/test/dsl/__init__.py b/test/dsl/__init__.py deleted file mode 100644 index e1f7e6e8bf04d3d59cff68b83d91fbf791f0faa2..0000000000000000000000000000000000000000 --- a/test/dsl/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -#! /usr/bin/env python -# -*- coding: utf-8 -*- diff --git a/test/dsl/test_evaluator.py b/test/dsl/test_evaluator.py index 0ff60652da41a056072f01ff7a15575d7c73a2c0..1bcf5f89bb2e3a5f8fbcd929af4ee3ce41d4f58e 100644 --- a/test/dsl/test_evaluator.py +++ b/test/dsl/test_evaluator.py @@ -5,8 +5,8 @@ import pytest import numpy as np from test.common import initData -from flagger import SimpleFlagger -from dsl import evalExpression +from flagger.simpleflagger import SimpleFlagger +from dsl.evaluator import evalExpression def test_evaluationBool(): diff --git a/test/dsl/test_generic.py b/test/dsl/test_generic.py index 1736a7e05443879529d635799472f58c9957c63f..c0b9d832295c495c677d1676bc9f271b220b1703 100644 --- a/test/dsl/test_generic.py +++ b/test/dsl/test_generic.py @@ -7,9 +7,10 @@ import pytest from test.common import initData -from dsl import evalExpression -from flagger import SimpleFlagger -from funcs.functions import flagGeneric, Params +from dsl.evaluator import evalExpression +from flagger.simpleflagger import SimpleFlagger +from funcs.functions import flagGeneric +from config import Params def test_ismissing(): @@ -60,14 +61,14 @@ def test_isflaggedArgument(): flags = flagger.initFlags(data) var1, var2, *_ = data.columns - flags.iloc[::2, 0] = flagger.setFlag(flags.iloc[::2, 0], -9) + flags.iloc[::2, 0] = flagger.setFlag(flags.iloc[::2, 0], 1) - idx = evalExpression("isflagged({:}, -9)".format(var1), + idx = evalExpression("isflagged({:}, 1)".format(var1), flagger, data, flags, var2) - flagged = flagger.isFlagged(flags[var1], -9) + flagged = flagger.isFlagged(flags[var1], 1) assert (flagged == idx).all diff --git a/test/flagger/__init__.py b/test/flagger/__init__.py deleted file mode 100644 index e1f7e6e8bf04d3d59cff68b83d91fbf791f0faa2..0000000000000000000000000000000000000000 --- a/test/flagger/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -#! /usr/bin/env python -# -*- coding: utf-8 -*- diff --git a/test/flagger/test_dmpflagger.py b/test/flagger/test_dmpflagger.py index 72b063f25a456a83466fe03864686e1a241b3816..d410f06044dff452b9b63004f61981ddd6a27174 100644 --- a/test/flagger/test_dmpflagger.py +++ b/test/flagger/test_dmpflagger.py @@ -1,7 +1,7 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- -from ..common import initData, initMeta +from test.common import initData, initMeta from core import runner from flagger.dmpflagger import DmpFlagger, FlagFields @@ -16,7 +16,7 @@ def test_basic(): metastring = f""" headerout, Flag_1, Flag_2 - {var1},"generic, {{func: this < {var1mean}}}","range, {{min: 10, max: 20, comment: saqc}}" + {var1},"generic, {{func: this < {var1mean}, flag: DOUBTFUL}}","range, {{min: 10, max: 20, comment: saqc}}" {var2},"generic, {{func: this > {var2mean}, cause: error}}" """ meta = initMeta(metastring, data) @@ -28,10 +28,11 @@ def test_basic(): flags11 = flags.loc[col1 < var1mean, (var1, FlagFields.FLAG)] flags12 = flags.loc[((col1 < 10) | (col1 > 20)), (var1, FlagFields.COMMENT)] + # flags12 = flags.loc[(col1 >= var1mean) & ((col1 < 10) | (col1 > 20)), (var1, FlagFields.COMMENT)] flags21 = flags.loc[col2 > var2mean, (var2, FlagFields.CAUSE)] - assert (flags11 >= flagger.flags.min()).all() + assert (flags11 > flagger.flags.min()).all() assert (flags12 == "saqc").all() assert (flags21 == "error").all() diff --git a/test/test_core.py b/test/test_core.py index 1f32eef28eb1fd7085f37e5ffde0f3be09acdf4c..2e0607140620ece6d47221ebebba35a65ea9b32e 100644 --- a/test/test_core.py +++ b/test/test_core.py @@ -7,7 +7,9 @@ import pandas as pd from core import runner, flagNext, flagPeriod, prepareMeta from config import Fields -from flagger import SimpleFlagger, DmpFlagger, PositionalFlagger +from flagger.simpleflagger import SimpleFlagger +from flagger.dmpflagger import DmpFlagger +from flagger.positionalflagger import PositionalFlagger from test.common import initData @@ -125,7 +127,7 @@ def test_flagNext(flagger): flags.iloc[idx] = flagger.setFlag(flags.iloc[idx]) n = 4 - fflags = flagNext(flagger, flags.copy(), 4) + fflags = flagNext(flagger, flags.copy(), flag_values=4) result_idx = np.unique(np.where(flagger.isFlagged(fflags))[0]) expected_idx = np.arange(min(idx), max(idx) + n + 1) assert (result_idx == expected_idx).all() @@ -142,10 +144,11 @@ def test_flagPeriod(flagger): idx = [0, 1, 2] flags.iloc[idx] = flagger.setFlag(flags.iloc[idx]) - tdelta = pd.to_timedelta("4h") - flags = flagPeriod(flagger, flags.copy(), tdelta) + period = '4h' + flags = flagPeriod(flagger, flags.copy(), flag_period=period) expected_dates = set(flags[flagger.isFlagged(flags)].index) + tdelta = pd.to_timedelta(period) dates = set() for start in flags.index[idx]: stop = start + tdelta @@ -159,7 +162,6 @@ if __name__ == "__main__": # NOTE: PositionalFlagger is currently broken, going to fix it when needed # for flagger in [SimpleFlagger, PositionalFlagger, DmpFlagger]: for flagger in [SimpleFlagger(), DmpFlagger()]: - # for flagger in [DmpFlagger()]: test_temporalPartitioning(flagger) test_flagNext(flagger) test_flagPeriod(flagger)