Skip to content
Snippets Groups Projects
Commit ba659772 authored by David Schäfer's avatar David Schäfer
Browse files

Annotated float scheme

parent dadbe200
No related branches found
No related tags found
1 merge request!842Annotated float scheme
......@@ -21,6 +21,7 @@ SPDX-License-Identifier: GPL-3.0-or-later
- `setFlags`: function to replace `flagManual`
- `flagUniLOF`: added defaultly applied correction to mitigate phenomenon of overflagging at relatively steep data value slopes. (parameter `slope_correct`).
- `History`: added option to change aggregation behavior
- Translation scheme `FloatScheme`
### Changed
- `flagPattern` uses *fastdtw* package now to compute timeseries distances
- `SaQC.flags` always returns a `DictOfSeries`
......
......@@ -5,11 +5,8 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# -*- coding: utf-8 -*-
from saqc.core.translation.basescheme import (
FloatScheme,
MappingScheme,
TranslationScheme,
)
from saqc.core.translation.basescheme import MappingScheme, TranslationScheme
from saqc.core.translation.dmpscheme import DmpScheme
from saqc.core.translation.floatscheme import AnnotatedFloatScheme, FloatScheme
from saqc.core.translation.positionalscheme import PositionalScheme
from saqc.core.translation.simplescheme import SimpleScheme
......@@ -215,31 +215,3 @@ class MappingScheme(TranslationScheme):
out = self._translate(flags, self._backward)
out.attrs = attrs or {}
return out
class FloatScheme(TranslationScheme):
"""
Acts as the default Translator, provides a changeable subset of the
internal float flags
"""
DFILTER_DEFAULT: float = FILTER_ALL
def __call__(self, flag: float | int) -> float:
try:
return float(flag)
except (TypeError, ValueError, OverflowError):
raise ValueError(f"invalid flag, expected a numerical value, got: {flag}")
def toInternal(self, flags: pd.DataFrame | DictOfSeries) -> Flags:
try:
return Flags(flags.astype(float))
except (TypeError, ValueError, OverflowError):
raise ValueError(
f"invalid flag(s), expected a collection of numerical values, got: {flags}"
)
def toExternal(self, flags: Flags, attrs: dict | None = None) -> DictOfSeries:
out = DictOfSeries(flags)
out.attrs = attrs or {}
return out
......@@ -76,6 +76,9 @@ class DmpScheme(MappingScheme):
history = History(flags.index)
for (flag, cause, comment), values in flags.groupby(_QUALITY_LABELS):
if cause == "" and comment == "":
continue
try:
comment = json.loads(comment)
except json.decoder.JSONDecodeError:
......@@ -105,6 +108,9 @@ class DmpScheme(MappingScheme):
Flags object
"""
if isinstance(flags, pd.DataFrame):
flags = DictOfSeries(flags)
self.validityCheck(flags)
data = {}
......@@ -112,7 +118,7 @@ class DmpScheme(MappingScheme):
if isinstance(flags, pd.DataFrame):
fields = flags.columns.get_level_values(0).drop_duplicates()
else:
fields = flags.columns
fields = flags.keys()
for field in fields:
data[str(field)] = self.toHistory(flags[field])
......@@ -172,7 +178,7 @@ class DmpScheme(MappingScheme):
return out
@classmethod
def validityCheck(cls, flags: pd.DataFrame | DictOfSeries) -> None:
def validityCheck(cls, flags: DictOfSeries) -> None:
"""
Check wether the given causes and comments are valid.
......@@ -180,7 +186,6 @@ class DmpScheme(MappingScheme):
----------
df : external flags
"""
for df in flags.values():
if not df.columns.isin(_QUALITY_LABELS).all(axis=None):
......
#! /usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
# -*- coding: utf-8 -*-
from __future__ import annotations
import numpy as np
import pandas as pd
from saqc.constants import FILTER_ALL, UNFLAGGED
from saqc.core.flags import Flags
from saqc.core.frame import DictOfSeries
from saqc.core.history import History
from saqc.core.translation.basescheme import TranslationScheme
class FloatScheme(TranslationScheme):
"""
Acts as the default Translator, provides a changeable subset of the
internal float flags
"""
DFILTER_DEFAULT: float = FILTER_ALL
def __call__(self, flag: float | int) -> float:
try:
return float(flag)
except (TypeError, ValueError, OverflowError):
raise ValueError(f"invalid flag, expected a numerical value, got: {flag}")
def toInternal(self, flags: pd.DataFrame | DictOfSeries) -> Flags:
try:
return Flags(flags.astype(float))
except (TypeError, ValueError, OverflowError):
raise ValueError(
f"invalid flag(s), expected a collection of numerical values, got: {flags}"
)
def toExternal(self, flags: Flags, attrs: dict | None = None) -> DictOfSeries:
out = DictOfSeries(flags)
out.attrs = attrs or {}
return out
class AnnotatedFloatScheme(FloatScheme):
def toExternal(self, flags: Flags, attrs: dict | None = None) -> DictOfSeries:
tflags = super().toExternal(flags, attrs=attrs)
out = DictOfSeries()
for field in tflags.columns:
df = pd.DataFrame(
{
"flag": tflags[field],
"func": "",
"parameters": "",
}
)
history = flags.history[field]
for col in history.columns:
valid = (history.hist[col] != UNFLAGGED) & history.hist[col].notna()
meta = history.meta[col]
df.loc[valid, "func"] = meta["func"]
df.loc[valid, "parameters"] = str(meta["kwargs"])
out[field] = df
return out
def toInternal(self, flags: DictOfSeries) -> Flags:
data = {}
for key, frame in flags.items():
history = History(index=frame.index)
for (flag, func, kwargs), values in frame.groupby(
["flag", "func", "parameters"]
):
column = pd.Series(np.nan, index=frame.index)
column.loc[values.index] = self(flag)
history.append(column, meta={"func": func, "kwargs": kwargs})
data[key] = history
return Flags(data)
......@@ -69,9 +69,8 @@ class PositionalScheme(MappingScheme):
fflags = super()._translate(df, self._FORWARD)
field_history = History(field_flags.index)
for _, s in fflags.items():
field_history.append(s)
field_history.append(s.replace(UNFLAGGED, np.nan))
data[str(field)] = field_history
return Flags(data)
def toExternal(self, flags: Flags, **kwargs) -> DictOfSeries:
......
......@@ -13,9 +13,10 @@ import numpy as np
import pandas as pd
import pytest
from saqc import BAD, DOUBTFUL, FILTER_NONE, UNFLAGGED, SaQC
from saqc.core import Flags
from saqc.constants import BAD, DOUBTFUL, FILTER_NONE, UNFLAGGED
from saqc.core import Flags, SaQC
from saqc.core.translation import DmpScheme, MappingScheme, PositionalScheme
from saqc.core.translation.floatscheme import AnnotatedFloatScheme
from tests.common import initData
......@@ -277,3 +278,23 @@ def test_positionalMulitcallsPreserveState():
expected = tflags1[k].str.slice(start=1) * 2
got = tflags2[k].str.slice(start=1)
assert expected.equals(got)
def test_annotatedFloatScheme():
data = initData(1)
col = data.columns[0]
scheme = AnnotatedFloatScheme()
saqc = SaQC(data=data, scheme=scheme)
saqc = saqc.setFlags(col, data=data[col].index[::4], flag=DOUBTFUL).flagRange(
col, min=3, max=10, flag=BAD
)
flags = saqc.flags
assert flags[col]["flag"].isin({DOUBTFUL, BAD, UNFLAGGED}).all(axis=None)
assert flags[col]["func"].isin({"", "setFlags", "flagRange"}).all(axis=None)
round_trip = scheme.toExternal(scheme.toInternal(flags))
assert tuple(round_trip.keys()) == tuple(flags.keys())
for key in flags.keys():
assert round_trip[key].equals(flags[key])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment