Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • berntm/saqc
  • rdm-software/saqc
  • schueler/saqc
3 results
Show changes
Commits on Source (6)
...@@ -66,6 +66,7 @@ coming soon ... ...@@ -66,6 +66,7 @@ coming soon ...
## Features ## Features
- added the data processing module `proc_functions` - added the data processing module `proc_functions`
- `flagCrossValidation` implemented - `flagCrossValidation` implemented
- CLI: added support for parquet files
## Bugfixes ## Bugfixes
- `spikes_flagRaise` - overestimation of value courses average fixed - `spikes_flagRaise` - overestimation of value courses average fixed
......
#! /usr/bin/env python #! /usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import logging
from functools import partial
from pathlib import Path
import click import click
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import logging import pyarrow as pa
from saqc.core import SaQC from saqc.core import SaQC
from saqc.flagger import CategoricalFlagger from saqc.flagger import CategoricalFlagger
from saqc.flagger.dmpflagger import DmpFlagger from saqc.flagger.dmpflagger import DmpFlagger
import dios
logger = logging.getLogger("SaQC") logger = logging.getLogger("SaQC")
FLAGGERS = { FLAGGERS = {
"numeric": CategoricalFlagger([-1, 0, 1]), "numeric": CategoricalFlagger([-1, 0, 1]),
"category": CategoricalFlagger(["NIL", "OK", "BAD"]), "category": CategoricalFlagger(["NIL", "OK", "BAD"]),
...@@ -30,6 +34,35 @@ def _setup_logging(loglvl): ...@@ -30,6 +34,35 @@ def _setup_logging(loglvl):
logger.addHandler(handler) logger.addHandler(handler)
def setupIO(nodata):
reader = {
".csv" : partial(pd.read_csv, index_col=0, parse_dates=True),
".parquet" : pd.read_parquet
}
writer = {
".csv" : partial(pd.DataFrame.to_csv, header=True, index=True, na_rep=nodata),
".parquet" : lambda df, outfile: pa.parquet.write_table(pa.Table.from_pandas(df), outfile)
}
return reader, writer
def readData(reader_dict, fname):
extension = Path(fname).suffix
reader = reader_dict.get(extension)
if not reader:
raise ValueError(f"Unsupported file format '{extension}', use one of {tuple(READER.keys())}")
return reader(fname)
def writeData(writer_dict, df, fname):
extension = Path(fname).suffix
writer = writer_dict.get(extension)
if not writer:
raise ValueError(f"Unsupported file format '{extension}', use one of {tuple(READER.keys())}")
writer(df, fname)
@click.command() @click.command()
@click.option( @click.option(
"-c", "--config", type=click.Path(exists=True), required=True, help="path to the configuration file", "-c", "--config", type=click.Path(exists=True), required=True, help="path to the configuration file",
...@@ -49,26 +82,26 @@ def _setup_logging(loglvl): ...@@ -49,26 +82,26 @@ def _setup_logging(loglvl):
def main(config, data, flagger, outfile, nodata, log_level, fail): def main(config, data, flagger, outfile, nodata, log_level, fail):
_setup_logging(log_level) _setup_logging(log_level)
reader, writer = setupIO(nodata)
data = pd.read_csv(data, index_col=0, parse_dates=True,) data = readData(reader, data)
data = dios.DictOfSeries(data)
saqc = SaQC(flagger=FLAGGERS[flagger], data=data, nodata=nodata, error_policy="raise" if fail else "warn",) saqc = SaQC(flagger=FLAGGERS[flagger], data=data, nodata=nodata, error_policy="raise" if fail else "warn",)
data_result, flagger_result = saqc.readConfig(config).getResult() data_result, flagger_result = saqc.readConfig(config).getResult()
return
if outfile: if outfile:
data_result = data_result.to_df() data_result = data_result.to_df()
flags = flagger_result.getFlags().to_df() flags = flagger_result.flags.to_df()
flags_flagged = flagger_result.isFlagged().to_df() flags_flagged = flagger_result.isFlagged().to_df()
flags_out = flags.where((flags.isnull() | flags_flagged), flagger_result.GOOD) flags_out = flags.where((flags.isnull() | flags_flagged), flagger_result.GOOD)
fields = {"data": data_result, "flags": flags_out} fields = {"data": data_result, "flags": flags_out}
if isinstance(flagger_result, DmpFlagger): if isinstance(flagger_result, DmpFlagger):
fields["comments"] = flagger_result.comments.to_df() fields["quality_flag"] = fields.pop("flags")
fields["causes"] = flagger_result.causes.to_df() fields["quality_comment"] = flagger_result.comments.to_df()
fields["quality_cause"] = flagger_result.causes.to_df()
out = ( out = (
pd.concat(fields.values(), axis=1, keys=fields.keys()) pd.concat(fields.values(), axis=1, keys=fields.keys())
...@@ -76,7 +109,7 @@ def main(config, data, flagger, outfile, nodata, log_level, fail): ...@@ -76,7 +109,7 @@ def main(config, data, flagger, outfile, nodata, log_level, fail):
.sort_index(axis=1, level=0, sort_remaining=False) .sort_index(axis=1, level=0, sort_remaining=False)
) )
out.columns = out.columns.rename(["", ""]) out.columns = out.columns.rename(["", ""])
out.to_csv(outfile, header=True, index=True, na_rep=nodata) writeData(writer, out, outfile)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -144,6 +144,15 @@ class SaQC: ...@@ -144,6 +144,15 @@ class SaQC:
out = out._wrap(func, lineno=lineno, expr=expr)(**kwargs) out = out._wrap(func, lineno=lineno, expr=expr)(**kwargs)
return out return out
def _expandFields(self, func_dump, variables):
if not func_dump["regex"]:
return [func_dump]
out = []
for field in variables[variables.str.match(func_dump["field"])]:
out.append({**func_dump, "field": field})
return out
def evaluate(self): def evaluate(self):
""" """
Realize all the registered calculations and return a updated SaQC Object Realize all the registered calculations and return a updated SaQC Object
...@@ -161,38 +170,39 @@ class SaQC: ...@@ -161,38 +170,39 @@ class SaQC:
data, flagger = self._data, self._flagger data, flagger = self._data, self._flagger
for func_dump in self._to_call: for func_dump in self._to_call:
func_name = func_dump['func_name'] for func_dump in self._expandFields(func_dump, data.columns.union(flagger._flags.columns)):
func_kws = func_dump['func_kws'] func_name = func_dump['func_name']
field = func_dump['field'] func_kws = func_dump['func_kws']
plot = func_dump["ctrl_kws"]["plot"] field = func_dump['field']
logger.debug(f"processing: {field}, {func_name}, {func_kws}") plot = func_dump["ctrl_kws"]["plot"]
logger.debug(f"processing: {field}, {func_name}, {func_kws}")
try:
t0 = timeit.default_timer() try:
data_result, flagger_result = _saqcCallFunc(func_dump, data, flagger) t0 = timeit.default_timer()
data_result, flagger_result = _saqcCallFunc(func_dump, data, flagger)
except Exception as e:
t1 = timeit.default_timer() except Exception as e:
logger.debug(f"{func_name} failed after {t1 - t0} sec") t1 = timeit.default_timer()
_handleErrors(e, func_dump, self._error_policy) logger.debug(f"{func_name} failed after {t1 - t0} sec")
continue _handleErrors(e, func_dump, self._error_policy)
else: continue
t1 = timeit.default_timer() else:
logger.debug(f"{func_name} finished after {t1 - t0} sec") t1 = timeit.default_timer()
logger.debug(f"{func_name} finished after {t1 - t0} sec")
if plot:
plotHook( if plot:
data_old=data, plotHook(
data_new=data_result, data_old=data,
flagger_old=flagger, data_new=data_result,
flagger_new=flagger_result, flagger_old=flagger,
sources=[], flagger_new=flagger_result,
targets=[field], sources=[],
plot_name=func_name, targets=[field],
) plot_name=func_name,
)
data = data_result
flagger = flagger_result data = data_result
flagger = flagger_result
if any([fdump["ctrl_kws"]["plot"] for fdump in self._to_call]): if any([fdump["ctrl_kws"]["plot"] for fdump in self._to_call]):
plotAllHook(data, flagger) plotAllHook(data, flagger)
...@@ -217,7 +227,7 @@ class SaQC: ...@@ -217,7 +227,7 @@ class SaQC:
def _wrap(self, func_name, lineno=None, expr=None): def _wrap(self, func_name, lineno=None, expr=None):
def inner(field: str, *args, regex: bool = False, to_mask=None, plot=False, inplace=False, **kwargs): def inner(field: str, *args, regex: bool = False, to_mask=None, plot=False, inplace=False, **kwargs):
fields = [field] if not regex else self._data.columns[self._data.columns.str.match(field)] # fields = [field] if not regex else self._data.columns[self._data.columns.str.match(field)]
kwargs.setdefault('nodata', self._nodata) kwargs.setdefault('nodata', self._nodata)
...@@ -238,13 +248,16 @@ class SaQC: ...@@ -238,13 +248,16 @@ class SaQC:
"func_args": args, "func_args": args,
"func_kws": kwargs, "func_kws": kwargs,
"ctrl_kws": ctrl_kws, "ctrl_kws": ctrl_kws,
"field": field,
"regex": regex,
} }
out = self if inplace else self.copy() out = self if inplace else self.copy()
out._to_call.append(func_dump)
for field in fields: # for field in fields:
dump_copy = {**func_dump, "field": field} # dump_copy = {**func_dump, "field": field}
out._to_call.append(dump_copy) # out._to_call.append(dump_copy)
return out return out
return inner return inner
......
...@@ -105,12 +105,16 @@ class DmpFlagger(CategoricalFlagger): ...@@ -105,12 +105,16 @@ class DmpFlagger(CategoricalFlagger):
else: else:
return self._construct_new(flags, causes, comments) return self._construct_new(flags, causes, comments)
def setFlags(self, field, loc=None, flag=None, force=False, comment="", cause="", inplace=False, **kwargs): def setFlags(self, field, loc=None, flag=None, force=False, comment="", cause="OTHER", inplace=False, **kwargs):
assert "iloc" not in kwargs, "deprecated keyword, iloc" assert "iloc" not in kwargs, "deprecated keyword, iloc"
assertScalar("field", field, optional=False) assertScalar("field", field, optional=False)
flag = self.BAD if flag is None else flag flag = self.BAD if flag is None else flag
comment = json.dumps(dict(comment=comment, commit=self.project_version, test=kwargs.get("func_name", ""))) comment = json.dumps(
{"comment": comment,
"commit": self.project_version,
"test": kwargs.get("func_name", "")}
)
if force: if force:
row_indexer = loc row_indexer = loc
......
...@@ -207,10 +207,10 @@ def flagGeneric(data, field, flagger, func, nodata=np.nan, **kwargs): ...@@ -207,10 +207,10 @@ def flagGeneric(data, field, flagger, func, nodata=np.nan, **kwargs):
if not np.issubdtype(mask.dtype, np.bool_): if not np.issubdtype(mask.dtype, np.bool_):
raise TypeError(f"generic expression does not return a boolean array") raise TypeError(f"generic expression does not return a boolean array")
if flagger.getFlags(field).empty: # if flagger.getFlags(field).empty:
flagger = flagger.merge( # flagger = flagger.merge(
flagger.initFlags( # flagger.initFlags(
data=pd.Series(name=field, index=mask.index, dtype=np.float64))) # data=pd.Series(name=field, index=mask.index, dtype=np.float64)))
flagger = flagger.setFlags(field, mask, **kwargs) flagger = flagger.setFlags(field, mask, **kwargs)
return data, flagger return data, flagger
......
...@@ -47,7 +47,8 @@ def test_variableRegex(data): ...@@ -47,7 +47,8 @@ def test_variableRegex(data):
for regex, expected in tests: for regex, expected in tests:
fobj = writeIO(header + "\n" + f"{regex} ; flagDummy()") fobj = writeIO(header + "\n" + f"{regex} ; flagDummy()")
saqc = SaQC(SimpleFlagger(), data).readConfig(fobj) saqc = SaQC(SimpleFlagger(), data).readConfig(fobj)
result = [f["field"] for f in saqc._to_call] expansion = saqc._expandFields(saqc._to_call[0], data.columns)
result = [f["field"] for f in expansion]
assert np.all(result == expected) assert np.all(result == expected)
...@@ -104,14 +105,18 @@ def test_configFile(data): ...@@ -104,14 +105,18 @@ def test_configFile(data):
def test_configChecks(data): def test_configChecks(data):
var1, var2, var3, *_ = data.columns var1, _, var3, *_ = data.columns
@register(masking="none")
def flagFunc(data, field, flagger, arg, opt_arg=None, **kwargs):
return data, flagger
header = f"{F.VARNAME};{F.TEST}" header = f"{F.VARNAME};{F.TEST}"
tests = [ tests = [
(f"{var1};flagRange(mn=0)", TypeError), # bad argument name (f"{var1};flagFunc(mn=0)", TypeError), # bad argument name
(f"{var1};flagRange(min=0)", TypeError), # not enough arguments (f"{var1};flagFunc()", TypeError), # not enough arguments
(f"{var3};flagNothing()", NameError), # unknown function (f"{var3};flagNothing()", NameError), # unknown function
(";flagRange(min=3)", SyntaxError), # missing variable (";flagFunc(min=3)", SyntaxError), # missing variable
(f"{var1};", SyntaxError), # missing test (f"{var1};", SyntaxError), # missing test
(f"{var1}; min", TypeError), # not a function call (f"{var1}; min", TypeError), # not a function call
] ]
......