Skip to content
Snippets Groups Projects
Commit fb3e8f05 authored by David Schäfer's avatar David Schäfer
Browse files

Merge branch 'reader' into 'master'

reimplementation of the reader functionality

See merge request !9
parents 45504221 2ebbefd1
No related branches found
No related tags found
1 merge request!9reimplementation of the reader functionality
Pipeline #2392 passed with stage
in 6 minutes and 43 seconds
......@@ -5,7 +5,7 @@ import logging
import numpy as np
import pandas as pd
from saqc.core.reader import readConfig, prepareConfig, checkConfig
from saqc.core.reader import readConfig, checkConfig
from saqc.core.config import Fields
from saqc.core.evaluator import evalExpression
from saqc.lib.plotting import plotHook, plotAllHook
......@@ -85,7 +85,7 @@ def run(
_setup()
_checkInput(data, flags, flagger)
config = prepareConfig(readConfig(config_file), data)
config = readConfig(config_file, data)
# split config into the test and some 'meta' data
tests = config.filter(regex=Fields.TESTS)
......
......@@ -4,11 +4,29 @@
import re
from typing import Dict, List, Any, Union
from contextlib import contextmanager
from io import StringIO, TextIOWrapper
import numpy as np
import pandas as pd
from saqc.core.config import Fields as F
from saqc.core.evaluator import compileExpression
from saqc.flagger import BaseFlagger
ConfigList = List[Dict[str, Any]]
CONFIG_TYPES = {
F.VARNAME: str,
F.START: pd.to_datetime,
F.END: pd.to_datetime,
F.TESTS: str,
F.PLOT: lambda v: str(v).lower() != "false",
F.LINENUMBER: int,
}
def _raise(config_row, exc, msg, field=None):
......@@ -20,7 +38,79 @@ def _raise(config_row, exc, msg, field=None):
raise exc(msg)
def checkConfig(config_df, data, flagger, nodata):
@contextmanager
def _open(fname: str) -> Union[StringIO, TextIOWrapper]:
if isinstance(fname, StringIO):
yield fname
else:
f = open(fname)
yield f
f.close()
def _parseRow(row: str, sep: str, comment: str) -> List[str]:
"""
remove in column comments, mainly needed to allow end line comments
"""
return [c.split(comment)[0].strip() for c in row.split(sep)]
def _castRow(row: Dict[str, str]) -> Dict[str, Any]:
"""
cast values to the data type given in 'types'
"""
out = {}
for k, v in row.items():
try:
out[k] = CONFIG_TYPES[k](v)
except:
_raise(row, ValueError, f"invalid value: v")
return out
def _expandVarnameWildcards(config: ConfigList, data: pd.DataFrame) -> ConfigList:
new = []
for row in config:
varname = row[F.VARNAME]
if varname and varname not in data:
if varname == "*":
varname = ".*"
expansion = data.columns[data.columns.str.match(varname)]
if not len(expansion):
expansion = [varname]
for var in expansion:
new.append({**row, F.VARNAME: var})
else:
new.append(row)
return new
def readConfig(fname: str, data: pd.DataFrame, sep: str = ";", comment: str = "#") -> pd.DataFrame:
defaults = {F.VARNAME: "", F.START: data.index.min(), F.END: data.index.max(), F.PLOT: False}
with _open(fname) as f:
content = f.readlines()
header: List = None
config: ConfigList = []
for i, line in enumerate(content):
line = line.strip()
if line.startswith(comment) or not line:
continue
row = _parseRow(line, sep, comment)
if header is None:
header = row
continue
values = dict(zip(header, row))
values = {**defaults, **values, F.LINENUMBER: i + 1}
config.append(_castRow(values))
expanded = _expandVarnameWildcards(config, data)
return pd.DataFrame(expanded)
def checkConfig(config_df: pd.DataFrame, data: pd.DataFrame, flagger: BaseFlagger, nodata: float) -> pd.DataFrame:
for _, config_row in config_df.iterrows():
var_name = config_row[F.VARNAME]
......@@ -43,69 +133,3 @@ def checkConfig(config_df, data, flagger, nodata):
config_row, type(exc), exc.args[0] + f" (failing statement: '{expr}')", col,
)
return config_df
def prepareConfig(config_df, data):
# ensure column-names are lowercase and have no trailing whitespaces
config_df.columns = [c.lstrip().lower() for c in config_df.columns]
# add line numbers and remove comments
config_df[F.LINENUMBER] = np.arange(len(config_df)) + 2
try:
comment_mask = ~config_df.iloc[:, 0].str.startswith("#")
except AttributeError:
comment_mask = np.ones(len(config_df), dtype=np.bool)
config_df = config_df[comment_mask]
if config_df.empty:
raise SyntaxWarning("config file is empty or all lines are #commented")
# NOTE:
# time slicing support is currently disabled
# fill missing columns
# for field in [F.VARNAME, F.START, F.END, F.PLOT]:
for field in [F.VARNAME, F.PLOT]:
if field not in config_df:
config_df = config_df.assign(**{field: np.nan})
for field in [F.START, F.END]:
config_df = config_df.assign(**{field: np.nan})
# fill nans with default values
config_df = config_df.fillna(
{F.VARNAME: np.nan, F.START: data.index.min(), F.END: data.index.max(), F.PLOT: False,}
)
# dtype = np.datetime64 if isinstance(data.index, pd.DatetimeIndex) else int
# config_df[F.START] = config_df[F.START].astype(dtype)
# config_df[F.END] = config_df[F.END].astype(dtype)
config_df = _expandVarnameWildcards(config_df, data)
return config_df
def _expandVarnameWildcards(config_df, data):
new = []
for idx, row in config_df.iterrows():
varname = row[F.VARNAME]
if varname and not pd.isnull(varname) and varname not in data:
if varname == "*":
varname = ".*"
try:
variables = data.columns[data.columns.str.match(varname)]
if variables.empty:
variables = [varname]
for var in variables:
row = row.copy()
row[F.VARNAME] = var
new.append(row)
except re.error:
pass
else:
new.append(row)
return pd.DataFrame(new).reset_index(drop=True)
def readConfig(fname):
return pd.read_csv(fname, delimiter=";", skipinitialspace=True)
......@@ -7,7 +7,7 @@ import re
import numpy as np
import pandas as pd
from saqc.core.core import prepareConfig, readConfig
from saqc.core.core import readConfig
from saqc.flagger import (
ContinuousFlagger,
CategoricalFlagger,
......@@ -44,10 +44,10 @@ def initMetaString(metastring, data):
cleaned = re.sub(
r"\s*,\s*", r",", re.sub(r"\|", r";", re.sub(r"\n[ \t]+", r"\n", metastring))
)
fobj = io.StringIO(cleaned)
meta = prepareConfig(readConfig(fobj), data)
fobj = io.StringIO(cleaned.strip())
config = readConfig(fobj, data)
fobj.seek(0)
return fobj, meta
return fobj, config
def _getKeys(metadict):
......@@ -61,8 +61,9 @@ def _getKeys(metadict):
def initMetaDict(config_dict, data):
df = pd.DataFrame(config_dict)[_getKeys(config_dict)]
meta = prepareConfig(df, data)
fobj = io.StringIO()
meta.to_csv(fobj, index=False, sep=";")
df.fillna("").to_csv(fobj, index=False, sep=";")
fobj.seek(0)
return fobj, meta
config = readConfig(fobj, data)
fobj.seek(0)
return fobj, config
......@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import pytest
import numpy as np
from saqc.core.reader import checkConfig
from saqc.core.config import Fields as F
......@@ -40,6 +41,37 @@ def test_configPreparation(data):
assert result == expected
def test_variableWildcards(data):
tests = [
("*", ".*"),
(".*", ".*"),
("var(1|2)", "var(1|2)"),
("(.*3)", "(.*3)")
]
for config_wc, expected_wc in tests:
_, config = initMetaDict(
[{F.VARNAME: config_wc, F.TESTS: "flagAll()"}],
data
)
expected = data.columns[data.columns.str.match(expected_wc)]
assert np.all(config[F.VARNAME] == expected)
def test_inlineComments(data):
"""
adresses issue #3
"""
config = f"""
{F.VARNAME}|{F.TESTS}|{F.PLOT}
pre2|flagAll() # test|False # test
"""
_, meta_frame = initMetaString(config, data)
assert meta_frame.loc[0, F.PLOT] == False
assert meta_frame.loc[0, F.TESTS] == "flagAll()"
def test_configReaderLineNumbers(data):
config = f"""
{F.VARNAME}|{F.TESTS}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment