diff --git a/CHANGELOG.md b/CHANGELOG.md index 713f0baffc097a399288fcaa4bdebf41fb73819c..ccbc82abe8df796f25d77b4931c28842fd56001d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ SPDX-License-Identifier: GPL-3.0-or-later ### Added - added checks and unified error message for common inputs. - added command line `--version` option +- `-ll` CLI option as a shorthand for `--log-level` +- basic json support for CLI config files, which are detected by `.json`-extension. +- `--json-field` CLI option to use a non-root element of a json file. ### Changed - pin pandas to versions >= 2.0 - parameter `fill_na` of `SaQC.flagUniLOF` and `SaQC.assignUniLOF` is now of type diff --git a/saqc/__init__.py b/saqc/__init__.py index d2646a84f97aeb8fd00c6ab357cf8e6f76771d9d..29e0b55c57f390c1c426c31e6da2d7a8c79c8de5 100644 --- a/saqc/__init__.py +++ b/saqc/__init__.py @@ -8,6 +8,7 @@ """The System for automated Quality Control package.""" from saqc.constants import BAD, DOUBTFUL, FILTER_ALL, FILTER_NONE, GOOD, UNFLAGGED +from saqc.exceptions import ParsingError from saqc.core import Flags, DictOfSeries, SaQC from saqc.core.translation import DmpScheme, FloatScheme, PositionalScheme, SimpleScheme from saqc.parsing.reader import fromConfig diff --git a/saqc/__main__.py b/saqc/__main__.py index f77ea9c180b59de201d1b89baf38697110701dcc..9802e046eeca3412ddc611339473a8409ce1eac9 100644 --- a/saqc/__main__.py +++ b/saqc/__main__.py @@ -6,6 +6,9 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + +import json import logging from functools import partial from pathlib import Path @@ -17,18 +20,18 @@ import pyarrow as pa from saqc.core import DictOfSeries from saqc.core.core import TRANSLATION_SCHEMES -from saqc.parsing.reader import fromConfig +from saqc.parsing.reader import _ConfigReader from saqc.version import __version__ logger = logging.getLogger("SaQC") +LOG_FORMAT = "[%(asctime)s][%(name)s][%(levelname)s]: %(message)s" def _setupLogging(loglvl): logger.setLevel(loglvl) handler = logging.StreamHandler() - formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s]: %(message)s") - handler.setFormatter(formatter) logger.addHandler(handler) + logging.basicConfig(level=loglvl, format=LOG_FORMAT) def setupIO(nodata): @@ -73,7 +76,8 @@ def writeData(writer_dict, df, fname): "--config", type=click.Path(), required=True, - help="path to the configuration file", + help="Path to a configuration file. Use a '.json' extension to provide a JSON-" + "configuration. Otherwise files are treated as CSV.", ) @click.option( "-d", @@ -81,43 +85,66 @@ def writeData(writer_dict, df, fname): type=click.Path(), multiple=True, required=True, - help="path to the data file", + help="Path to a data file.", ) @click.option( "-o", "--outfile", type=click.Path(exists=False), required=False, - help="path to the output file", + help="Path to a output file.", ) @click.option( "--scheme", default="simple", show_default=True, type=click.Choice(tuple(TRANSLATION_SCHEMES.keys())), - help="the flagging scheme to use", + help="A flagging scheme to use.", +) +@click.option( + "--nodata", default=np.nan, help="Set a custom nodata value.", show_default=True ) -@click.option("--nodata", default=np.nan, help="nodata value") @click.option( "--log-level", + "-ll", default="INFO", show_default=True, type=click.Choice(["DEBUG", "INFO", "WARNING"]), - help="set output verbosity", + help="Set log verbosity.", ) -def main(config, data, scheme, outfile, nodata, log_level): +@click.option( + "--json-field", + default=None, + help="Use the value from the given FIELD from the root object of a json file. The " + "value must hold a array of saqc tests. If the option is not given, a passed " + "JSON config is assumed to have an array of saqc tests as root element.", +) +def main( + config: str, + data: str, + scheme: str, + outfile: str, + nodata: str | float, + log_level: str, + json_field: str | None, +): # data is always a list of data files _setupLogging(log_level) reader, writer = setupIO(nodata) - data = [readData(reader, f) for f in data] - saqc = fromConfig( - config, - data=data, - scheme=TRANSLATION_SCHEMES[scheme or "simple"](), - ) + config = str(config) + cr = _ConfigReader(data=data, scheme=scheme) + if config.endswith("json"): + f = None + if json_field is not None: + f = lambda j: j[str(json_field)] + cr = cr.readJson(config, unpack=f) + else: + cr = cr.readCsv(config) + + saqc = cr.run() data_result = saqc.data.to_pandas() flags_result = saqc.flags diff --git a/saqc/exceptions.py b/saqc/exceptions.py new file mode 100644 index 0000000000000000000000000000000000000000..7a628a99449b31792861f5fcc4ff4dde62e72a1f --- /dev/null +++ b/saqc/exceptions.py @@ -0,0 +1,9 @@ +#! /usr/bin/env python +# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ +# SPDX-License-Identifier: GPL-3.0-or-later +# -*- coding: utf-8 -*- +from __future__ import annotations + + +class ParsingError(RuntimeError): + pass diff --git a/saqc/parsing/reader.py b/saqc/parsing/reader.py index 8c8673e31817361b16b64daaf20889ca2335d975..d2a4b284fe3b37010480c1e31b47cf7b50a967aa 100644 --- a/saqc/parsing/reader.py +++ b/saqc/parsing/reader.py @@ -1,105 +1,225 @@ #! /usr/bin/env python - # SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ -# # SPDX-License-Identifier: GPL-3.0-or-later - # -*- coding: utf-8 -*- +from __future__ import annotations + import ast import io +import json +import logging +import textwrap from pathlib import Path -from typing import TextIO -from urllib.error import URLError +from typing import Any, Dict, Iterable, List, Sequence, TextIO, Tuple from urllib.request import urlopen import pandas as pd from saqc import SaQC +from saqc.exceptions import ParsingError from saqc.lib.tools import isQuoted from saqc.parsing.visitor import ConfigFunctionParser -COMMENT = "#" -SEPARATOR = ";" - - -def _openFile(fname) -> TextIO: - if isinstance(fname, (str, Path)): - try: - fobj = io.StringIO(urlopen(str(fname)).read().decode("utf-8")) - fobj.seek(0) - except (ValueError, URLError): - fobj = io.open(fname, "r", encoding="utf-8") - else: - fobj = fname - - return fobj - - -def _closeFile(fobj): - try: - fobj.close() - except AttributeError: - pass - - -def readFile(fname) -> pd.DataFrame: - fobj = _openFile(fname) +def _readLines( + it: Iterable[str], column_sep=";", comment_prefix="#", skip=0 +) -> pd.DataFrame: out = [] - for i, line in enumerate(fobj): - row = line.strip().split(COMMENT, 1)[0] - if not row: - # skip over comment line + for i, line in enumerate(it): + if (skip := skip - 1) > 0: continue - - parts = [p.strip() for p in row.split(SEPARATOR)] + if not (row := line.strip().split(comment_prefix, 1)[0]): + continue + parts = [p.strip() for p in row.split(column_sep)] if len(parts) != 2: - raise RuntimeError( - "The configuration format expects exactly two columns, one " - "for the variable name and one for the test to apply, but " - f"in line {i} we got: \n'{line}'" + raise ParsingError( + f"The configuration format expects exactly two " + f"columns, one for the variable name and one for " + f"the tests, but {len(parts)} columns were found " + f"in line {i}.\n\t{line!r}" ) out.append([i + 1] + parts) + if not out: + raise ParsingError("Config file is empty") + return pd.DataFrame(out[1:], columns=["lineno", "varname", "test"]).set_index( + "lineno" + ) - _closeFile(fobj) - df = pd.DataFrame( - out[1:], - columns=[ - "row", - ] - + out[0][1:], - ).set_index("row") - return df +def readFile(fname, skip=1) -> pd.DataFrame: + """Read and parse a config file to a DataFrame""" + + def _open(file_or_buf) -> TextIO: + if not isinstance(file_or_buf, (str, Path)): + return file_or_buf + try: + fh = io.open(file_or_buf, "r", encoding="utf-8") + except (OSError, ValueError): + fh = io.StringIO(urlopen(str(file_or_buf)).read().decode("utf-8")) + fh.seek(0) + return fh + + def _close(fh): + try: + fh.close() + except AttributeError: + pass + + # mimic `with open(): ...` + file = _open(fname) + try: + return _readLines(file, skip=skip) + finally: + _close(file) -# Todo: needs a (maybe tiny) docstring! def fromConfig(fname, *args, **func_kwargs): - saqc = SaQC(*args, **func_kwargs) - config = readFile(fname) - - for _, field, expr in config.itertuples(): - regex = False - if isQuoted(field): - fld = field[1:-1] - regex = True - else: - fld = field + return _ConfigReader(*args, **func_kwargs).readCsv(fname).run() + + +class _ConfigReader: + logger: logging.Logger + saqc: SaQC + file: str | None + config: pd.DataFrame | None + parsed: List[Tuple[Any, ...]] | None + regex: bool | None + varname: str | None + lineno: int | None + field: str | None + test: str | None + func: str | None + func_kws: Dict[str, Any] | None + + def __init__(self, *args, **kwargs): + self.logger = logging.getLogger(self.__class__.__name__) + self.saqc = SaQC(*args, **kwargs) + self.file = None + self.config = None + self.parsed = None + self.regex = None + self.varname = None + self.lineno = None + self.field = None + self.test = None + self.func = None + self.func_kws = None + + def readCsv(self, file: str, skip=1): + self.logger.debug(f"opening csv file: {file}") + self.config = readFile(file, skip=skip) + self.file = file + return self + + def readRecords(self, seq: Sequence[Dict[str, Any]]): + self.logger.debug(f"read records: {seq}") + df = pd.DataFrame.from_records(seq) + df.columns = ["varname", "func", "kwargs"] + kws = df["kwargs"].apply( + lambda e: ", ".join([f"{k}={v}" for k, v in e.items()]) + ) + df["test"] = df["func"] + "(" + kws + ")" + self.config = df.loc[:, ["varname", "test"]].copy() + return self + + def _readJson(self, d, unpack): + if unpack is not None: + d = unpack(d) + elif isinstance(d, dict): + raise TypeError("parsed json resulted in a dict, but a array/list is need") + return self.readRecords(d) + + def readJson(self, file: str, unpack: callable | None = None): + self.logger.debug(f"opening json file: {file}") + with open(file, "r") as fh: + d = json.load(fh) + self.file = file + return self._readJson(d, unpack) + + def readJsonString(self, jn: str, unpack: callable | None = None): + self.logger.debug(f"read json string: {jn}") + d = json.loads(jn) + return self._readJson(d, unpack) + + def readString(self, s: str, line_sep="\n", column_sep=";"): + self.logger.debug(f"read config string: {s}") + lines = s.split(line_sep) + self.config = _readLines(lines, column_sep=column_sep) + return self + + def _parseLine(self): + self.logger.debug(f"parse line {self.lineno}: {self.varname!r}; {self.test!r}") + self.regex = isQuoted(self.varname) + self.field = self.varname[1:-1] if self.regex else self.varname try: - tree = ast.parse(expr, mode="eval") - func_name, func_kwargs = ConfigFunctionParser().parse(tree.body) + tree = ast.parse(self.test, mode="eval").body + func, kws = ConfigFunctionParser().parse(tree) except Exception as e: - raise type(e)(f"failed to parse: {field} ; {expr}") from e - - func_kwargs["field" if "field" not in func_kwargs else "target"] = fld + # We raise a NEW exception here, because the + # traceback hold no relevant info for a CLI user. + err = type(e) if isinstance(e, NameError) else ParsingError + meta = self._getFormattedInfo( + "The exception occurred during parsing of a config" + ) + if hasattr(e, "add_note"): # python 3.11+ + e = err(*e.args) + e.add_note(meta) + raise e from None + raise err(str(e) + meta) from None + + if "field" in kws: + kws["target"] = self.field + else: + kws["field"] = self.field + self.func = func + self.func_kws = kws + + def _execLine(self): + self.logger.debug( + f"execute line {self.lineno}: {self.varname!r}; {self.test!r}" + ) + # We explicitly route all function calls through SaQC.__getattr__ + # in order to do a FUNC_MAP lookup. Otherwise, we wouldn't be able + # to overwrite existing test functions with custom register calls. try: - # We explictly route all function calls through SaQC.__getattr__ - # in order to do a FUNC_MAP lookup. Otherwise we wouldn't be able - # to overwrite exsiting test functions with custom register calls. - saqc = saqc.__getattr__(func_name)(regex=regex, **func_kwargs) + self.saqc = self.saqc.__getattr__(self.func)( + regex=self.regex, **self.func_kws + ) except Exception as e: - raise type(e)(f"failed to execute: {field} ; {expr}") from e - - return saqc + # We use a special technique for raising here, because + # we want this location of rising, line up in the traceback, + # instead of showing up at last (most relevant). Also, we + # want to include some meta information about the config. + meta = self._getFormattedInfo( + "The exception occurred during execution of a config" + ) + if hasattr(e, "add_note"): # python 3.11+ + e.add_note(meta) + raise e + raise type(e)(str(e) + meta).with_traceback(e.__traceback__) from None + + def _getFormattedInfo(self, msg=None, indent=2): + prefix = " " * indent + info = textwrap.indent( + f"file: {self.file!r}\n" + f"line: {self.lineno}\n" + f"varname: {self.varname!r}\n" + f"test: {self.test!r}\n", + prefix, + ) + if msg: + info = textwrap.indent(f"{msg}\n{info}", prefix) + return f"\n{info}" + + def run(self): + """Parse and execute a config line by line.""" + assert self.config is not None + for lineno, varname, test in self.config.itertuples(): + self.lineno = lineno + self.varname = varname + self.test = test + self._parseLine() + self._execLine() + return self.saqc diff --git a/tests/core/test_reader.py b/tests/core/test_reader.py index a7d0f2d6620ee1dba4f7d53ac562779df8a7e12b..9234d0e4829317c95d9948aa161cc721cac502e8 100644 --- a/tests/core/test_reader.py +++ b/tests/core/test_reader.py @@ -9,8 +9,8 @@ import numpy as np import pytest -import saqc.lib.ts_operators as ts_ops from saqc.core import DictOfSeries, Flags, SaQC, flagging +from saqc.exceptions import ParsingError from saqc.parsing.environ import ENVIRONMENT from saqc.parsing.reader import fromConfig, readFile from tests.common import initData, writeIO @@ -108,26 +108,25 @@ def test_configFile(data): fromConfig(writeIO(config), data) -def test_configChecks(data): - var1, _, var3, *_ = data.columns - +@pytest.mark.parametrize( + "test, expected", + [ + (f"var1; min", ParsingError), # not a function call + (f"var3; flagNothing()", NameError), # unknown function + (f"var1; flagFunc(mn=0)", TypeError), # bad argument name + (f"var1; flagFunc()", TypeError), # not enough arguments + ], +) +def test_configChecks(data, test, expected): @flagging() def flagFunc(data, field, flags, arg, opt_arg=None, **kwargs): flags[:, field] = np.nan return data, flags header = f"varname;test" - tests = [ - (f"{var1};flagFunc(mn=0)", TypeError), # bad argument name - (f"{var1};flagFunc()", TypeError), # not enough arguments - (f"{var3};flagNothing()", NameError), # unknown function - (f"{var1}; min", TypeError), # not a function call - ] - - for test, expected in tests: - fobj = writeIO(header + "\n" + test) - with pytest.raises(expected): - fromConfig(fobj, data=data) + fobj = writeIO(header + "\n" + test) + with pytest.raises(expected): + fromConfig(fobj, data=data) def test_supportedArguments(data):