From ba659772419f8707c1b6f74b592429ea8015ecc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Sch=C3=A4fer?= <david.schaefer@ufz.de> Date: Mon, 15 Apr 2024 21:11:20 +0200 Subject: [PATCH] Annotated float scheme --- CHANGELOG.md | 1 + saqc/core/translation/__init__.py | 7 +- saqc/core/translation/basescheme.py | 28 -------- saqc/core/translation/dmpscheme.py | 11 ++- saqc/core/translation/floatscheme.py | 86 +++++++++++++++++++++++ saqc/core/translation/positionalscheme.py | 3 +- tests/core/test_translator.py | 25 ++++++- 7 files changed, 121 insertions(+), 40 deletions(-) create mode 100644 saqc/core/translation/floatscheme.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 967ac8053..6d811ff07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ SPDX-License-Identifier: GPL-3.0-or-later - `setFlags`: function to replace `flagManual` - `flagUniLOF`: added defaultly applied correction to mitigate phenomenon of overflagging at relatively steep data value slopes. (parameter `slope_correct`). - `History`: added option to change aggregation behavior +- Translation scheme `FloatScheme` ### Changed - `flagPattern` uses *fastdtw* package now to compute timeseries distances - `SaQC.flags` always returns a `DictOfSeries` diff --git a/saqc/core/translation/__init__.py b/saqc/core/translation/__init__.py index fe2d85790..7549e43c5 100644 --- a/saqc/core/translation/__init__.py +++ b/saqc/core/translation/__init__.py @@ -5,11 +5,8 @@ # SPDX-License-Identifier: GPL-3.0-or-later # -*- coding: utf-8 -*- -from saqc.core.translation.basescheme import ( - FloatScheme, - MappingScheme, - TranslationScheme, -) +from saqc.core.translation.basescheme import MappingScheme, TranslationScheme from saqc.core.translation.dmpscheme import DmpScheme +from saqc.core.translation.floatscheme import AnnotatedFloatScheme, FloatScheme from saqc.core.translation.positionalscheme import PositionalScheme from saqc.core.translation.simplescheme import SimpleScheme diff --git a/saqc/core/translation/basescheme.py b/saqc/core/translation/basescheme.py index 66f9cb8db..56bfa4cb2 100644 --- a/saqc/core/translation/basescheme.py +++ b/saqc/core/translation/basescheme.py @@ -215,31 +215,3 @@ class MappingScheme(TranslationScheme): out = self._translate(flags, self._backward) out.attrs = attrs or {} return out - - -class FloatScheme(TranslationScheme): - """ - Acts as the default Translator, provides a changeable subset of the - internal float flags - """ - - DFILTER_DEFAULT: float = FILTER_ALL - - def __call__(self, flag: float | int) -> float: - try: - return float(flag) - except (TypeError, ValueError, OverflowError): - raise ValueError(f"invalid flag, expected a numerical value, got: {flag}") - - def toInternal(self, flags: pd.DataFrame | DictOfSeries) -> Flags: - try: - return Flags(flags.astype(float)) - except (TypeError, ValueError, OverflowError): - raise ValueError( - f"invalid flag(s), expected a collection of numerical values, got: {flags}" - ) - - def toExternal(self, flags: Flags, attrs: dict | None = None) -> DictOfSeries: - out = DictOfSeries(flags) - out.attrs = attrs or {} - return out diff --git a/saqc/core/translation/dmpscheme.py b/saqc/core/translation/dmpscheme.py index ad80c0a9f..fb68713d5 100644 --- a/saqc/core/translation/dmpscheme.py +++ b/saqc/core/translation/dmpscheme.py @@ -76,6 +76,9 @@ class DmpScheme(MappingScheme): history = History(flags.index) for (flag, cause, comment), values in flags.groupby(_QUALITY_LABELS): + if cause == "" and comment == "": + continue + try: comment = json.loads(comment) except json.decoder.JSONDecodeError: @@ -105,6 +108,9 @@ class DmpScheme(MappingScheme): Flags object """ + if isinstance(flags, pd.DataFrame): + flags = DictOfSeries(flags) + self.validityCheck(flags) data = {} @@ -112,7 +118,7 @@ class DmpScheme(MappingScheme): if isinstance(flags, pd.DataFrame): fields = flags.columns.get_level_values(0).drop_duplicates() else: - fields = flags.columns + fields = flags.keys() for field in fields: data[str(field)] = self.toHistory(flags[field]) @@ -172,7 +178,7 @@ class DmpScheme(MappingScheme): return out @classmethod - def validityCheck(cls, flags: pd.DataFrame | DictOfSeries) -> None: + def validityCheck(cls, flags: DictOfSeries) -> None: """ Check wether the given causes and comments are valid. @@ -180,7 +186,6 @@ class DmpScheme(MappingScheme): ---------- df : external flags """ - for df in flags.values(): if not df.columns.isin(_QUALITY_LABELS).all(axis=None): diff --git a/saqc/core/translation/floatscheme.py b/saqc/core/translation/floatscheme.py new file mode 100644 index 000000000..55b19b6a0 --- /dev/null +++ b/saqc/core/translation/floatscheme.py @@ -0,0 +1,86 @@ +#! /usr/bin/env python + +# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ +# +# SPDX-License-Identifier: GPL-3.0-or-later + +# -*- coding: utf-8 -*- + +from __future__ import annotations + +import numpy as np +import pandas as pd + +from saqc.constants import FILTER_ALL, UNFLAGGED +from saqc.core.flags import Flags +from saqc.core.frame import DictOfSeries +from saqc.core.history import History +from saqc.core.translation.basescheme import TranslationScheme + + +class FloatScheme(TranslationScheme): + """ + Acts as the default Translator, provides a changeable subset of the + internal float flags + """ + + DFILTER_DEFAULT: float = FILTER_ALL + + def __call__(self, flag: float | int) -> float: + try: + return float(flag) + except (TypeError, ValueError, OverflowError): + raise ValueError(f"invalid flag, expected a numerical value, got: {flag}") + + def toInternal(self, flags: pd.DataFrame | DictOfSeries) -> Flags: + try: + return Flags(flags.astype(float)) + except (TypeError, ValueError, OverflowError): + raise ValueError( + f"invalid flag(s), expected a collection of numerical values, got: {flags}" + ) + + def toExternal(self, flags: Flags, attrs: dict | None = None) -> DictOfSeries: + out = DictOfSeries(flags) + out.attrs = attrs or {} + return out + + +class AnnotatedFloatScheme(FloatScheme): + def toExternal(self, flags: Flags, attrs: dict | None = None) -> DictOfSeries: + + tflags = super().toExternal(flags, attrs=attrs) + + out = DictOfSeries() + for field in tflags.columns: + df = pd.DataFrame( + { + "flag": tflags[field], + "func": "", + "parameters": "", + } + ) + + history = flags.history[field] + + for col in history.columns: + valid = (history.hist[col] != UNFLAGGED) & history.hist[col].notna() + meta = history.meta[col] + df.loc[valid, "func"] = meta["func"] + df.loc[valid, "parameters"] = str(meta["kwargs"]) + out[field] = df + + return out + + def toInternal(self, flags: DictOfSeries) -> Flags: + data = {} + for key, frame in flags.items(): + history = History(index=frame.index) + for (flag, func, kwargs), values in frame.groupby( + ["flag", "func", "parameters"] + ): + column = pd.Series(np.nan, index=frame.index) + column.loc[values.index] = self(flag) + history.append(column, meta={"func": func, "kwargs": kwargs}) + data[key] = history + return Flags(data) diff --git a/saqc/core/translation/positionalscheme.py b/saqc/core/translation/positionalscheme.py index 67cef5181..2d12adfa5 100644 --- a/saqc/core/translation/positionalscheme.py +++ b/saqc/core/translation/positionalscheme.py @@ -69,9 +69,8 @@ class PositionalScheme(MappingScheme): fflags = super()._translate(df, self._FORWARD) field_history = History(field_flags.index) for _, s in fflags.items(): - field_history.append(s) + field_history.append(s.replace(UNFLAGGED, np.nan)) data[str(field)] = field_history - return Flags(data) def toExternal(self, flags: Flags, **kwargs) -> DictOfSeries: diff --git a/tests/core/test_translator.py b/tests/core/test_translator.py index 28741da0e..3d4ca3784 100644 --- a/tests/core/test_translator.py +++ b/tests/core/test_translator.py @@ -13,9 +13,10 @@ import numpy as np import pandas as pd import pytest -from saqc import BAD, DOUBTFUL, FILTER_NONE, UNFLAGGED, SaQC -from saqc.core import Flags +from saqc.constants import BAD, DOUBTFUL, FILTER_NONE, UNFLAGGED +from saqc.core import Flags, SaQC from saqc.core.translation import DmpScheme, MappingScheme, PositionalScheme +from saqc.core.translation.floatscheme import AnnotatedFloatScheme from tests.common import initData @@ -277,3 +278,23 @@ def test_positionalMulitcallsPreserveState(): expected = tflags1[k].str.slice(start=1) * 2 got = tflags2[k].str.slice(start=1) assert expected.equals(got) + + +def test_annotatedFloatScheme(): + data = initData(1) + col = data.columns[0] + + scheme = AnnotatedFloatScheme() + saqc = SaQC(data=data, scheme=scheme) + saqc = saqc.setFlags(col, data=data[col].index[::4], flag=DOUBTFUL).flagRange( + col, min=3, max=10, flag=BAD + ) + flags = saqc.flags + + assert flags[col]["flag"].isin({DOUBTFUL, BAD, UNFLAGGED}).all(axis=None) + assert flags[col]["func"].isin({"", "setFlags", "flagRange"}).all(axis=None) + + round_trip = scheme.toExternal(scheme.toInternal(flags)) + assert tuple(round_trip.keys()) == tuple(flags.keys()) + for key in flags.keys(): + assert round_trip[key].equals(flags[key]) -- GitLab