Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • berntm/saqc
  • rdm-software/saqc
  • schueler/saqc
3 results
Show changes
Commits on Source (6)
......@@ -66,6 +66,7 @@ coming soon ...
## Features
- added the data processing module `proc_functions`
- `flagCrossValidation` implemented
- CLI: added support for parquet files
## Bugfixes
- `spikes_flagRaise` - overestimation of value courses average fixed
......
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from functools import partial
from pathlib import Path
import click
import numpy as np
import pandas as pd
import logging
import pyarrow as pa
from saqc.core import SaQC
from saqc.flagger import CategoricalFlagger
from saqc.flagger.dmpflagger import DmpFlagger
import dios
logger = logging.getLogger("SaQC")
FLAGGERS = {
"numeric": CategoricalFlagger([-1, 0, 1]),
"category": CategoricalFlagger(["NIL", "OK", "BAD"]),
......@@ -30,6 +34,35 @@ def _setup_logging(loglvl):
logger.addHandler(handler)
def setupIO(nodata):
reader = {
".csv" : partial(pd.read_csv, index_col=0, parse_dates=True),
".parquet" : pd.read_parquet
}
writer = {
".csv" : partial(pd.DataFrame.to_csv, header=True, index=True, na_rep=nodata),
".parquet" : lambda df, outfile: pa.parquet.write_table(pa.Table.from_pandas(df), outfile)
}
return reader, writer
def readData(reader_dict, fname):
extension = Path(fname).suffix
reader = reader_dict.get(extension)
if not reader:
raise ValueError(f"Unsupported file format '{extension}', use one of {tuple(READER.keys())}")
return reader(fname)
def writeData(writer_dict, df, fname):
extension = Path(fname).suffix
writer = writer_dict.get(extension)
if not writer:
raise ValueError(f"Unsupported file format '{extension}', use one of {tuple(READER.keys())}")
writer(df, fname)
@click.command()
@click.option(
"-c", "--config", type=click.Path(exists=True), required=True, help="path to the configuration file",
......@@ -49,26 +82,26 @@ def _setup_logging(loglvl):
def main(config, data, flagger, outfile, nodata, log_level, fail):
_setup_logging(log_level)
reader, writer = setupIO(nodata)
data = pd.read_csv(data, index_col=0, parse_dates=True,)
data = dios.DictOfSeries(data)
data = readData(reader, data)
saqc = SaQC(flagger=FLAGGERS[flagger], data=data, nodata=nodata, error_policy="raise" if fail else "warn",)
data_result, flagger_result = saqc.readConfig(config).getResult()
return
if outfile:
data_result = data_result.to_df()
flags = flagger_result.getFlags().to_df()
flags = flagger_result.flags.to_df()
flags_flagged = flagger_result.isFlagged().to_df()
flags_out = flags.where((flags.isnull() | flags_flagged), flagger_result.GOOD)
fields = {"data": data_result, "flags": flags_out}
if isinstance(flagger_result, DmpFlagger):
fields["comments"] = flagger_result.comments.to_df()
fields["causes"] = flagger_result.causes.to_df()
fields["quality_flag"] = fields.pop("flags")
fields["quality_comment"] = flagger_result.comments.to_df()
fields["quality_cause"] = flagger_result.causes.to_df()
out = (
pd.concat(fields.values(), axis=1, keys=fields.keys())
......@@ -76,7 +109,7 @@ def main(config, data, flagger, outfile, nodata, log_level, fail):
.sort_index(axis=1, level=0, sort_remaining=False)
)
out.columns = out.columns.rename(["", ""])
out.to_csv(outfile, header=True, index=True, na_rep=nodata)
writeData(writer, out, outfile)
if __name__ == "__main__":
......
......@@ -144,6 +144,15 @@ class SaQC:
out = out._wrap(func, lineno=lineno, expr=expr)(**kwargs)
return out
def _expandFields(self, func_dump, variables):
if not func_dump["regex"]:
return [func_dump]
out = []
for field in variables[variables.str.match(func_dump["field"])]:
out.append({**func_dump, "field": field})
return out
def evaluate(self):
"""
Realize all the registered calculations and return a updated SaQC Object
......@@ -161,38 +170,39 @@ class SaQC:
data, flagger = self._data, self._flagger
for func_dump in self._to_call:
func_name = func_dump['func_name']
func_kws = func_dump['func_kws']
field = func_dump['field']
plot = func_dump["ctrl_kws"]["plot"]
logger.debug(f"processing: {field}, {func_name}, {func_kws}")
try:
t0 = timeit.default_timer()
data_result, flagger_result = _saqcCallFunc(func_dump, data, flagger)
except Exception as e:
t1 = timeit.default_timer()
logger.debug(f"{func_name} failed after {t1 - t0} sec")
_handleErrors(e, func_dump, self._error_policy)
continue
else:
t1 = timeit.default_timer()
logger.debug(f"{func_name} finished after {t1 - t0} sec")
if plot:
plotHook(
data_old=data,
data_new=data_result,
flagger_old=flagger,
flagger_new=flagger_result,
sources=[],
targets=[field],
plot_name=func_name,
)
data = data_result
flagger = flagger_result
for func_dump in self._expandFields(func_dump, data.columns.union(flagger._flags.columns)):
func_name = func_dump['func_name']
func_kws = func_dump['func_kws']
field = func_dump['field']
plot = func_dump["ctrl_kws"]["plot"]
logger.debug(f"processing: {field}, {func_name}, {func_kws}")
try:
t0 = timeit.default_timer()
data_result, flagger_result = _saqcCallFunc(func_dump, data, flagger)
except Exception as e:
t1 = timeit.default_timer()
logger.debug(f"{func_name} failed after {t1 - t0} sec")
_handleErrors(e, func_dump, self._error_policy)
continue
else:
t1 = timeit.default_timer()
logger.debug(f"{func_name} finished after {t1 - t0} sec")
if plot:
plotHook(
data_old=data,
data_new=data_result,
flagger_old=flagger,
flagger_new=flagger_result,
sources=[],
targets=[field],
plot_name=func_name,
)
data = data_result
flagger = flagger_result
if any([fdump["ctrl_kws"]["plot"] for fdump in self._to_call]):
plotAllHook(data, flagger)
......@@ -217,7 +227,7 @@ class SaQC:
def _wrap(self, func_name, lineno=None, expr=None):
def inner(field: str, *args, regex: bool = False, to_mask=None, plot=False, inplace=False, **kwargs):
fields = [field] if not regex else self._data.columns[self._data.columns.str.match(field)]
# fields = [field] if not regex else self._data.columns[self._data.columns.str.match(field)]
kwargs.setdefault('nodata', self._nodata)
......@@ -238,13 +248,16 @@ class SaQC:
"func_args": args,
"func_kws": kwargs,
"ctrl_kws": ctrl_kws,
"field": field,
"regex": regex,
}
out = self if inplace else self.copy()
out._to_call.append(func_dump)
for field in fields:
dump_copy = {**func_dump, "field": field}
out._to_call.append(dump_copy)
# for field in fields:
# dump_copy = {**func_dump, "field": field}
# out._to_call.append(dump_copy)
return out
return inner
......
......@@ -105,12 +105,16 @@ class DmpFlagger(CategoricalFlagger):
else:
return self._construct_new(flags, causes, comments)
def setFlags(self, field, loc=None, flag=None, force=False, comment="", cause="", inplace=False, **kwargs):
def setFlags(self, field, loc=None, flag=None, force=False, comment="", cause="OTHER", inplace=False, **kwargs):
assert "iloc" not in kwargs, "deprecated keyword, iloc"
assertScalar("field", field, optional=False)
flag = self.BAD if flag is None else flag
comment = json.dumps(dict(comment=comment, commit=self.project_version, test=kwargs.get("func_name", "")))
comment = json.dumps(
{"comment": comment,
"commit": self.project_version,
"test": kwargs.get("func_name", "")}
)
if force:
row_indexer = loc
......
......@@ -207,10 +207,10 @@ def flagGeneric(data, field, flagger, func, nodata=np.nan, **kwargs):
if not np.issubdtype(mask.dtype, np.bool_):
raise TypeError(f"generic expression does not return a boolean array")
if flagger.getFlags(field).empty:
flagger = flagger.merge(
flagger.initFlags(
data=pd.Series(name=field, index=mask.index, dtype=np.float64)))
# if flagger.getFlags(field).empty:
# flagger = flagger.merge(
# flagger.initFlags(
# data=pd.Series(name=field, index=mask.index, dtype=np.float64)))
flagger = flagger.setFlags(field, mask, **kwargs)
return data, flagger
......
......@@ -47,7 +47,8 @@ def test_variableRegex(data):
for regex, expected in tests:
fobj = writeIO(header + "\n" + f"{regex} ; flagDummy()")
saqc = SaQC(SimpleFlagger(), data).readConfig(fobj)
result = [f["field"] for f in saqc._to_call]
expansion = saqc._expandFields(saqc._to_call[0], data.columns)
result = [f["field"] for f in expansion]
assert np.all(result == expected)
......@@ -104,14 +105,18 @@ def test_configFile(data):
def test_configChecks(data):
var1, var2, var3, *_ = data.columns
var1, _, var3, *_ = data.columns
@register(masking="none")
def flagFunc(data, field, flagger, arg, opt_arg=None, **kwargs):
return data, flagger
header = f"{F.VARNAME};{F.TEST}"
tests = [
(f"{var1};flagRange(mn=0)", TypeError), # bad argument name
(f"{var1};flagRange(min=0)", TypeError), # not enough arguments
(f"{var1};flagFunc(mn=0)", TypeError), # bad argument name
(f"{var1};flagFunc()", TypeError), # not enough arguments
(f"{var3};flagNothing()", NameError), # unknown function
(";flagRange(min=3)", SyntaxError), # missing variable
(";flagFunc(min=3)", SyntaxError), # missing variable
(f"{var1};", SyntaxError), # missing test
(f"{var1}; min", TypeError), # not a function call
]
......