Skip to content
Snippets Groups Projects
Commit f7670f15 authored by Bert Palm's avatar Bert Palm 🎇
Browse files

Merge branch 'more_input_options' into 'develop'

rewrote config reading and parsing

See merge request !717
parents 60af1eaf 6f20c142
No related branches found
No related tags found
1 merge request!717rewrote config reading and parsing
Pipeline #186540 passed with stages
in 8 minutes and 36 seconds
......@@ -11,6 +11,9 @@ SPDX-License-Identifier: GPL-3.0-or-later
### Added
- added checks and unified error message for common inputs.
- added command line `--version` option
- `-ll` CLI option as a shorthand for `--log-level`
- basic json support for CLI config files, which are detected by `.json`-extension.
- `--json-field` CLI option to use a non-root element of a json file.
### Changed
- pin pandas to versions >= 2.0
- parameter `fill_na` of `SaQC.flagUniLOF` and `SaQC.assignUniLOF` is now of type
......
......@@ -8,6 +8,7 @@
"""The System for automated Quality Control package."""
from saqc.constants import BAD, DOUBTFUL, FILTER_ALL, FILTER_NONE, GOOD, UNFLAGGED
from saqc.exceptions import ParsingError
from saqc.core import Flags, DictOfSeries, SaQC
from saqc.core.translation import DmpScheme, FloatScheme, PositionalScheme, SimpleScheme
from saqc.parsing.reader import fromConfig
......
......@@ -6,6 +6,9 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
import logging
from functools import partial
from pathlib import Path
......@@ -17,18 +20,18 @@ import pyarrow as pa
from saqc.core import DictOfSeries
from saqc.core.core import TRANSLATION_SCHEMES
from saqc.parsing.reader import fromConfig
from saqc.parsing.reader import _ConfigReader
from saqc.version import __version__
logger = logging.getLogger("SaQC")
LOG_FORMAT = "[%(asctime)s][%(name)s][%(levelname)s]: %(message)s"
def _setupLogging(loglvl):
logger.setLevel(loglvl)
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s]: %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logging.basicConfig(level=loglvl, format=LOG_FORMAT)
def setupIO(nodata):
......@@ -73,7 +76,8 @@ def writeData(writer_dict, df, fname):
"--config",
type=click.Path(),
required=True,
help="path to the configuration file",
help="Path to a configuration file. Use a '.json' extension to provide a JSON-"
"configuration. Otherwise files are treated as CSV.",
)
@click.option(
"-d",
......@@ -81,43 +85,66 @@ def writeData(writer_dict, df, fname):
type=click.Path(),
multiple=True,
required=True,
help="path to the data file",
help="Path to a data file.",
)
@click.option(
"-o",
"--outfile",
type=click.Path(exists=False),
required=False,
help="path to the output file",
help="Path to a output file.",
)
@click.option(
"--scheme",
default="simple",
show_default=True,
type=click.Choice(tuple(TRANSLATION_SCHEMES.keys())),
help="the flagging scheme to use",
help="A flagging scheme to use.",
)
@click.option(
"--nodata", default=np.nan, help="Set a custom nodata value.", show_default=True
)
@click.option("--nodata", default=np.nan, help="nodata value")
@click.option(
"--log-level",
"-ll",
default="INFO",
show_default=True,
type=click.Choice(["DEBUG", "INFO", "WARNING"]),
help="set output verbosity",
help="Set log verbosity.",
)
def main(config, data, scheme, outfile, nodata, log_level):
@click.option(
"--json-field",
default=None,
help="Use the value from the given FIELD from the root object of a json file. The "
"value must hold a array of saqc tests. If the option is not given, a passed "
"JSON config is assumed to have an array of saqc tests as root element.",
)
def main(
config: str,
data: str,
scheme: str,
outfile: str,
nodata: str | float,
log_level: str,
json_field: str | None,
):
# data is always a list of data files
_setupLogging(log_level)
reader, writer = setupIO(nodata)
data = [readData(reader, f) for f in data]
saqc = fromConfig(
config,
data=data,
scheme=TRANSLATION_SCHEMES[scheme or "simple"](),
)
config = str(config)
cr = _ConfigReader(data=data, scheme=scheme)
if config.endswith("json"):
f = None
if json_field is not None:
f = lambda j: j[str(json_field)]
cr = cr.readJson(config, unpack=f)
else:
cr = cr.readCsv(config)
saqc = cr.run()
data_result = saqc.data.to_pandas()
flags_result = saqc.flags
......
#! /usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
# SPDX-License-Identifier: GPL-3.0-or-later
# -*- coding: utf-8 -*-
from __future__ import annotations
class ParsingError(RuntimeError):
pass
#! /usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
# -*- coding: utf-8 -*-
from __future__ import annotations
import ast
import io
import json
import logging
import textwrap
from pathlib import Path
from typing import TextIO
from urllib.error import URLError
from typing import Any, Dict, Iterable, List, Sequence, TextIO, Tuple
from urllib.request import urlopen
import pandas as pd
from saqc import SaQC
from saqc.exceptions import ParsingError
from saqc.lib.tools import isQuoted
from saqc.parsing.visitor import ConfigFunctionParser
COMMENT = "#"
SEPARATOR = ";"
def _openFile(fname) -> TextIO:
if isinstance(fname, (str, Path)):
try:
fobj = io.StringIO(urlopen(str(fname)).read().decode("utf-8"))
fobj.seek(0)
except (ValueError, URLError):
fobj = io.open(fname, "r", encoding="utf-8")
else:
fobj = fname
return fobj
def _closeFile(fobj):
try:
fobj.close()
except AttributeError:
pass
def readFile(fname) -> pd.DataFrame:
fobj = _openFile(fname)
def _readLines(
it: Iterable[str], column_sep=";", comment_prefix="#", skip=0
) -> pd.DataFrame:
out = []
for i, line in enumerate(fobj):
row = line.strip().split(COMMENT, 1)[0]
if not row:
# skip over comment line
for i, line in enumerate(it):
if (skip := skip - 1) > 0:
continue
parts = [p.strip() for p in row.split(SEPARATOR)]
if not (row := line.strip().split(comment_prefix, 1)[0]):
continue
parts = [p.strip() for p in row.split(column_sep)]
if len(parts) != 2:
raise RuntimeError(
"The configuration format expects exactly two columns, one "
"for the variable name and one for the test to apply, but "
f"in line {i} we got: \n'{line}'"
raise ParsingError(
f"The configuration format expects exactly two "
f"columns, one for the variable name and one for "
f"the tests, but {len(parts)} columns were found "
f"in line {i}.\n\t{line!r}"
)
out.append([i + 1] + parts)
if not out:
raise ParsingError("Config file is empty")
return pd.DataFrame(out[1:], columns=["lineno", "varname", "test"]).set_index(
"lineno"
)
_closeFile(fobj)
df = pd.DataFrame(
out[1:],
columns=[
"row",
]
+ out[0][1:],
).set_index("row")
return df
def readFile(fname, skip=1) -> pd.DataFrame:
"""Read and parse a config file to a DataFrame"""
def _open(file_or_buf) -> TextIO:
if not isinstance(file_or_buf, (str, Path)):
return file_or_buf
try:
fh = io.open(file_or_buf, "r", encoding="utf-8")
except (OSError, ValueError):
fh = io.StringIO(urlopen(str(file_or_buf)).read().decode("utf-8"))
fh.seek(0)
return fh
def _close(fh):
try:
fh.close()
except AttributeError:
pass
# mimic `with open(): ...`
file = _open(fname)
try:
return _readLines(file, skip=skip)
finally:
_close(file)
# Todo: needs a (maybe tiny) docstring!
def fromConfig(fname, *args, **func_kwargs):
saqc = SaQC(*args, **func_kwargs)
config = readFile(fname)
for _, field, expr in config.itertuples():
regex = False
if isQuoted(field):
fld = field[1:-1]
regex = True
else:
fld = field
return _ConfigReader(*args, **func_kwargs).readCsv(fname).run()
class _ConfigReader:
logger: logging.Logger
saqc: SaQC
file: str | None
config: pd.DataFrame | None
parsed: List[Tuple[Any, ...]] | None
regex: bool | None
varname: str | None
lineno: int | None
field: str | None
test: str | None
func: str | None
func_kws: Dict[str, Any] | None
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(self.__class__.__name__)
self.saqc = SaQC(*args, **kwargs)
self.file = None
self.config = None
self.parsed = None
self.regex = None
self.varname = None
self.lineno = None
self.field = None
self.test = None
self.func = None
self.func_kws = None
def readCsv(self, file: str, skip=1):
self.logger.debug(f"opening csv file: {file}")
self.config = readFile(file, skip=skip)
self.file = file
return self
def readRecords(self, seq: Sequence[Dict[str, Any]]):
self.logger.debug(f"read records: {seq}")
df = pd.DataFrame.from_records(seq)
df.columns = ["varname", "func", "kwargs"]
kws = df["kwargs"].apply(
lambda e: ", ".join([f"{k}={v}" for k, v in e.items()])
)
df["test"] = df["func"] + "(" + kws + ")"
self.config = df.loc[:, ["varname", "test"]].copy()
return self
def _readJson(self, d, unpack):
if unpack is not None:
d = unpack(d)
elif isinstance(d, dict):
raise TypeError("parsed json resulted in a dict, but a array/list is need")
return self.readRecords(d)
def readJson(self, file: str, unpack: callable | None = None):
self.logger.debug(f"opening json file: {file}")
with open(file, "r") as fh:
d = json.load(fh)
self.file = file
return self._readJson(d, unpack)
def readJsonString(self, jn: str, unpack: callable | None = None):
self.logger.debug(f"read json string: {jn}")
d = json.loads(jn)
return self._readJson(d, unpack)
def readString(self, s: str, line_sep="\n", column_sep=";"):
self.logger.debug(f"read config string: {s}")
lines = s.split(line_sep)
self.config = _readLines(lines, column_sep=column_sep)
return self
def _parseLine(self):
self.logger.debug(f"parse line {self.lineno}: {self.varname!r}; {self.test!r}")
self.regex = isQuoted(self.varname)
self.field = self.varname[1:-1] if self.regex else self.varname
try:
tree = ast.parse(expr, mode="eval")
func_name, func_kwargs = ConfigFunctionParser().parse(tree.body)
tree = ast.parse(self.test, mode="eval").body
func, kws = ConfigFunctionParser().parse(tree)
except Exception as e:
raise type(e)(f"failed to parse: {field} ; {expr}") from e
func_kwargs["field" if "field" not in func_kwargs else "target"] = fld
# We raise a NEW exception here, because the
# traceback hold no relevant info for a CLI user.
err = type(e) if isinstance(e, NameError) else ParsingError
meta = self._getFormattedInfo(
"The exception occurred during parsing of a config"
)
if hasattr(e, "add_note"): # python 3.11+
e = err(*e.args)
e.add_note(meta)
raise e from None
raise err(str(e) + meta) from None
if "field" in kws:
kws["target"] = self.field
else:
kws["field"] = self.field
self.func = func
self.func_kws = kws
def _execLine(self):
self.logger.debug(
f"execute line {self.lineno}: {self.varname!r}; {self.test!r}"
)
# We explicitly route all function calls through SaQC.__getattr__
# in order to do a FUNC_MAP lookup. Otherwise, we wouldn't be able
# to overwrite existing test functions with custom register calls.
try:
# We explictly route all function calls through SaQC.__getattr__
# in order to do a FUNC_MAP lookup. Otherwise we wouldn't be able
# to overwrite exsiting test functions with custom register calls.
saqc = saqc.__getattr__(func_name)(regex=regex, **func_kwargs)
self.saqc = self.saqc.__getattr__(self.func)(
regex=self.regex, **self.func_kws
)
except Exception as e:
raise type(e)(f"failed to execute: {field} ; {expr}") from e
return saqc
# We use a special technique for raising here, because
# we want this location of rising, line up in the traceback,
# instead of showing up at last (most relevant). Also, we
# want to include some meta information about the config.
meta = self._getFormattedInfo(
"The exception occurred during execution of a config"
)
if hasattr(e, "add_note"): # python 3.11+
e.add_note(meta)
raise e
raise type(e)(str(e) + meta).with_traceback(e.__traceback__) from None
def _getFormattedInfo(self, msg=None, indent=2):
prefix = " " * indent
info = textwrap.indent(
f"file: {self.file!r}\n"
f"line: {self.lineno}\n"
f"varname: {self.varname!r}\n"
f"test: {self.test!r}\n",
prefix,
)
if msg:
info = textwrap.indent(f"{msg}\n{info}", prefix)
return f"\n{info}"
def run(self):
"""Parse and execute a config line by line."""
assert self.config is not None
for lineno, varname, test in self.config.itertuples():
self.lineno = lineno
self.varname = varname
self.test = test
self._parseLine()
self._execLine()
return self.saqc
......@@ -9,8 +9,8 @@
import numpy as np
import pytest
import saqc.lib.ts_operators as ts_ops
from saqc.core import DictOfSeries, Flags, SaQC, flagging
from saqc.exceptions import ParsingError
from saqc.parsing.environ import ENVIRONMENT
from saqc.parsing.reader import fromConfig, readFile
from tests.common import initData, writeIO
......@@ -108,26 +108,25 @@ def test_configFile(data):
fromConfig(writeIO(config), data)
def test_configChecks(data):
var1, _, var3, *_ = data.columns
@pytest.mark.parametrize(
"test, expected",
[
(f"var1; min", ParsingError), # not a function call
(f"var3; flagNothing()", NameError), # unknown function
(f"var1; flagFunc(mn=0)", TypeError), # bad argument name
(f"var1; flagFunc()", TypeError), # not enough arguments
],
)
def test_configChecks(data, test, expected):
@flagging()
def flagFunc(data, field, flags, arg, opt_arg=None, **kwargs):
flags[:, field] = np.nan
return data, flags
header = f"varname;test"
tests = [
(f"{var1};flagFunc(mn=0)", TypeError), # bad argument name
(f"{var1};flagFunc()", TypeError), # not enough arguments
(f"{var3};flagNothing()", NameError), # unknown function
(f"{var1}; min", TypeError), # not a function call
]
for test, expected in tests:
fobj = writeIO(header + "\n" + test)
with pytest.raises(expected):
fromConfig(fobj, data=data)
fobj = writeIO(header + "\n" + test)
with pytest.raises(expected):
fromConfig(fobj, data=data)
def test_supportedArguments(data):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment