Skip to content
Snippets Groups Projects
Commit a78ac3ab authored by David Schäfer's avatar David Schäfer
Browse files

initial commit

parents
No related branches found
No related tags found
No related merge requests found
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from funcs import funcs
import numpy as np
class Fields:
VARNAME = "headerout"
STARTDATE = "date start"
ENDDATE = "date end"
FLAGS = "Flag*"
class Params:
NAME = "name"
FUNC = "func"
FLAGPERIOD = "flag_period"
FLAGVALUES = "flag_values"
FLAG = "flag"
FUNCMAP = {
"maintenance": funcs.flagMaintenance,
"man_flag": funcs.flagManual,
"MAD": funcs.flagMad,
"constant": funcs.flagConstant
}
NODATA = np.nan
core.py 0 → 100644
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from math import ceil, isnan
from typing import TypeVar
import numpy as np
import pandas as pd
from config import Fields, FUNCMAP, Params, NODATA
from dsl.evaluator import evalCondition
from dsl.parser import parseFlag
from flagger import PositionalFlagger
DataSeq = TypeVar("DataSeq", np.ndarray, pd.Series, pd.DataFrame)
def _inferFrequency(data):
return pd.tseries.frequencies.to_offset(pd.infer_freq(data.index))
def _periodToTicks(period, freq):
return int(ceil(pd.to_timedelta(period)/pd.to_timedelta(freq)))
def _flagNext(to_flag: DataSeq, n: int) -> DataSeq:
"""
to_flag: Union[np.ndarray[bool], pd.Series[bool]]
"""
idx = np.nonzero(flags)[0]
for nn in range(n + 1):
nn_idx = np.clip(idx + nn, a_min=None, a_max=len(to_flag) - 1)
to_flag[nn_idx] = True
return to_flag
def flagGeneric(data, flags, field, flagger, flag_params):
to_flag = evalCondition(
flag_params[Params.FUNC],
data, flags, field, NODATA=NODATA)
# flag a timespan after the condition is met,
# duration given in 'flag_period'
flag_period = flag_params.pop(Params.FLAGPERIOD, None)
if flag_period:
flag_params[Params.FLAGVALUES] = _periodToTicks(flag_period,
data.index.freq)
# flag a certain amount of values after condition is met,
# number given in 'flag_values'
flag_values = flag_params.pop(Params.FLAGVALUES, None)
if flag_values:
to_flag = _flagNext(to_flag, flag_values)
# flag to set might be given in 'flag'
flag_value = flag_params.get(Params.FLAG, flagger.critical_flag)
flags.loc[to_flag, field] = flagger.setFlag(
flags=flags.loc[to_flag, field],
flag=flag_value)
return flags
def flaggingRunner(meta, data, flags, flagger):
# TODO:
# - flags should be optional
# NOTE:
# we need an index frequency in order to calculate ticks
# from given periods further down the road
data.index.freq = _inferFrequency(data)
assert data.index.freq, "no frequency deducable from timeseries"
# the required meta data columns
fields = [Fields.VARNAME, Fields.STARTDATE, Fields.ENDDATE]
# NOTE:
# the outer loop runs over the flag tests, the inner one over the
# variables. Switching the loop order would complicate the
# reference to flags from other variables within the dataset
flag_fields = meta.columns.to_series().filter(regex=Fields.FLAGS)
for flag_pos, flag_field in enumerate(flag_fields):
# NOTE: just an optimization
if meta[flag_field].dropna().empty:
continue
for _, configrow in meta.iterrows():
flag_test = configrow[flag_field]
if pd.isnull(flag_test):
continue
varname, start_date, end_date = configrow[fields]
if varname not in data:
continue
dchunk = data.loc[start_date:end_date]
if dchunk.empty:
continue
# NOTE:
# within the activation period of a variable, the flag will
# be initialized if
fchunk = (flags
.loc[start_date:end_date]
.fillna({varname: flagger.no_flag}))
flag_params = parseFlag(flag_test)
flag_name = flag_params[Params.NAME]
# NOTE: higher flags might be overwriten by lower ones
func = FUNCMAP.get(flag_name, None)
if func:
dchunk, fchunk = func(dchunk, fchunk, varname,
flagger, **flag_params)
elif flag_name == "generic":
fchunk = flagGeneric(dchunk, fchunk, varname,
flagger, flag_params)
else:
raise RuntimeError(
"Malformed flag field ('{:}') for variable: {:}"
.format(flag_test, varname))
flagger.nextTest()
data.loc[start_date:end_date] = dchunk
flags.loc[start_date:end_date] = fchunk
return data, flags
def prepareMeta(meta, data):
# NOTE: an option needed to only pass test within an file and deduce
# everything else from data
# no dates given, fall back to the available date range
if Fields.STARTDATE not in meta:
meta = meta.assign(**{Fields.STARTDATE: np.nan})
if Fields.ENDDATE not in meta:
meta = meta.assign(**{Fields.ENDDATE: np.nan})
meta = meta.fillna(
{Fields.ENDDATE: data.index.max(),
Fields.STARTDATE: data.index.max()})
meta = meta.dropna(subset=[Fields.VARNAME])
meta[Fields.STARTDATE] = pd.to_datetime(meta[Fields.STARTDATE])
meta[Fields.ENDDATE] = pd.to_datetime(meta[Fields.ENDDATE])
return meta
def readData(fname):
data = pd.read_csv(
fname, index_col="Date Time", parse_dates=True,
na_values=["-9999", "-9999.0"], low_memory=False)
data.columns = [c.split(" ")[0] for c in data.columns]
data = data.reindex(
pd.date_range(data.index.min(), data.index.max(), freq="10min"))
return data
if __name__ == "__main__":
datafname = "resources/data.csv"
metafname = "resources/meta.csv"
data = readData(datafname)
meta = prepareMeta(pd.read_csv(metafname), data)
flags = pd.DataFrame(columns=data.columns, index=data.index)
flagger = PositionalFlagger()
pdata, pflags = flaggingRunner(meta, data, flags, flagger)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import ast
import copy
import numbers
import operator as op
import numpy as np
import pandas as pd
# supported operators
OPERATORS = {
ast.Add: op.add, ast.Sub: op.sub,
ast.Mult: op.mul, ast.Div: op.truediv,
ast.Pow: op.pow,
ast.USub: op.neg,
ast.NotEq: op.ne, ast.Eq: op.eq,
ast.Gt: op.gt, ast.GtE: op.ge,
ast.Lt: op.lt, ast.LtE: op.le,
ast.BitAnd: op.and_, ast.BitOr: op.or_, ast.BitXor: op.xor,
}
FUNCTIONS = {
"abs": (abs, "data"),
"max": (max, "data"),
"min": (min, "data"),
"len": (len, "data"),
"mean": (np.mean, "data"),
"sum": (np.sum, "data"),
"std": (np.std, "data"),
# "maxflag": (getMaxflags, "flags")
}
def setKey(d, key, value):
out = copy.copy(d)
out[key] = value
return out
def _raiseNameError(name):
raise NameError("name '{:}' is not definied"
.format(name))
def evalCondition(expr: str, data: pd.DataFrame, flags: pd.DataFrame,
field: str, **namespace: dict) -> np.ndarray:
# type: (...) -> np.ndarray[bool]
def _eval(node, namespace):
# type: (ast.Node, dict) -> None
# the namespace dictionary should provide the data frame for the device
# being processed and any additional variables (e.g. NODTA, this)
if isinstance(node, ast.Num): # <number>
return node.n
elif isinstance(node, ast.UnaryOp):
return OPERATORS[type(node.op)](
_eval(node.operand, namespace))
elif isinstance(node, ast.BinOp):
return OPERATORS[type(node.op)](
_eval(node.left, namespace),
_eval(node.right, namespace))
elif isinstance(node, ast.Compare):
# NOTE: chained comparison not supported yet
op = OPERATORS[node.ops[0].__class__]
out = op(_eval(node.left, namespace),
_eval(node.comparators[0], namespace))
return out
elif isinstance(node, ast.Call):
# functions out of math are allowed
# kwargs not supported yet
try:
func, target = FUNCTIONS[node.func.id]
except KeyError:
_raiseNameError(node.func.id)
namespace = setKey(namespace, "target", target)
args = [_eval(n, namespace) for n in node.args]
return func(*args)
elif isinstance(node, ast.Name): # <variable>
field = namespace.get(node.id, node.id)
if isinstance(field, numbers.Number):
# name is not referring to an DataFrame field
return field
try:
flagcol = namespace["flags"][field]
datacol = namespace["data"][field]
except KeyError:
_raiseNameError(field)
if namespace.get("target") == "flags":
out = flagcol
else:
out = datacol # .mask(flagger.isFlagged(flagcol))
return out.values
else:
raise TypeError(node)
if not (data.columns == flags.columns).all():
raise TypeError(
"arguments 'data' and 'flags' need the same column names")
namespace = {**namespace, **{"data": data, "flags": flags, "this": field}}
return _eval(ast.parse(expr, mode='eval').body, namespace)
# field = namespace["this"]
# flags = flag_func(flags=namespace["flags"].loc[to_flag_idx, field])
# namespace["flags"].loc[to_flag_idx, field] = flags
# return namespace
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import re
def parseFlag(params):
out = {}
for i, part in enumerate(re.split(r";\s*", params)):
if ":" in part:
k, v = [i.strip() for i in part.split(":")]
try:
out[k] = int(v)
except ValueError:
try:
out[k] = float(v)
except ValueError:
out[k] = v
elif i == 0:
out["name"] = part
return out
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from .positionalflagger import PositionalFlagger
from .abstractflagger import AbstractFlagger
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from abc import ABC, abstractproperty
from numbers import Number
import pandas as pd
class AbstractFlagger(ABC):
@abstractproperty
def no_flag(self):
raise NotImplementedError
@abstractproperty
def critical_flag(self):
raise NotImplementedError
def setFlag(self, flags: pd.DataFrame, flag: Number) -> pd.DataFrame:
flags.loc[:] = flag
return flags
def isFlagged(self, flags: pd.DataFrame) -> pd.DataFrame:
return flags != self.no_flag
def nextTest(self):
pass
def firstTest(self):
pass
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from numbers import Number
from typing import Union, Sequence
import numpy as np
import pandas as pd
from config import NODATA
from .abstractflagger import AbstractFlagger
from lib.tools import numpyfy, broadcastMany
class PositionalFlagger(AbstractFlagger):
def __init__(self, no_flag=9, critical_flag=2):
self._flag_pos = 1
self._initial_flag_pos = 1
self._no_flag = no_flag
self._critical_flag = critical_flag
@property
def critical_flag(self):
return self._critical_flag
@property
def no_flag(self):
return self._no_flag
def firstTest(self):
self._flag_pos = self._initial_flag_pos
def nextTest(self):
self._flag_pos += 1
def setFlag(self, flags: pd.DataFrame, flag: Number) -> pd.DataFrame:
return self._setFlags(flags, flag, self._flag_pos)
def isFlagged(self, flags: pd.DataFrame):
return self._getMaxflags(flags) != self.critical_flag
def _getMaxflags(self, flags: pd.DataFrame,
exclude: Union[int, Sequence] = 0) -> pd.DataFrame:
flagmax = np.max(np.array(flags))
ndigits = int(np.ceil(np.log10(flagmax)))
exclude = set(np.array(exclude).ravel())
out = np.zeros_like(flags)
for pos in range(ndigits):
if pos not in exclude:
out = np.maximum(out, self._getFlags(flags, pos))
return out
def _getFlags(self, flags: pd.DataFrame, pos: int) -> pd.DataFrame:
flags = self._prepFlags(flags)
pos = np.broadcast_to(np.atleast_1d(pos), flags.shape)
ndigits = np.floor(np.log10(flags)).astype(np.int)
idx = np.where(ndigits >= pos)
out = np.zeros_like(flags)
out[idx] = flags[idx] // 10**(ndigits[idx]-pos[idx]) % 10
return out
def _prepFlags(self, flags: pd.DataFrame) -> pd.DataFrame:
out = numpyfy(flags)
out[~np.isfinite(out)] = NODATA
return out
def _setFlags(self, flags: pd.DataFrame,
values: Union[pd.DataFrame, int], pos: int) -> pd.DataFrame:
flags, pos, values = broadcastMany(flags, pos, values)
out = flags.astype(np.float64)
# right-pad 'flags' with zeros, to assure the
# desired flag position is available
ndigits = np.floor(np.log10(out)).astype(np.int)
idx = (ndigits < pos)
out[idx] *= 10**(pos[idx]-ndigits[idx])
ndigits = np.log10(out).astype(np.int)
out[idx] += 10**(ndigits[idx]-pos[idx]) * values[idx]
return out.astype(np.int64)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from lib.tools import valueRange, slidingWindowIndices
def flagMaintenance(data, flags, field, flagger, **kwargs):
return data, flags
def flagConstant(data, flags, field, flagger, eps,
length, thmin=None, **kwargs):
datacol = data[field]
flagcol = flags[field]
length = ((pd.to_timedelta(length) - data.index.freq)
.to_timedelta64()
.astype(np.int64))
values = (datacol
.mask((datacol < thmin) | datacol.isnull())
.values
.astype(np.int64))
dates = datacol.index.values.astype(np.int64)
mask = np.isfinite(values)
for start_idx, end_idx in slidingWindowIndices(datacol.index, length):
mask_chunk = mask[start_idx:end_idx]
values_chunk = values[start_idx:end_idx][mask_chunk]
dates_chunk = dates[start_idx:end_idx][mask_chunk]
# we might have removed dates from the start/end of the
# chunk resulting in a period shorter than 'length'
# print (start_idx, end_idx)
if valueRange(dates_chunk) < length:
continue
if valueRange(values_chunk) < eps:
flagcol[start_idx:end_idx] = flagger.setCritical()
data[field] = datacol
flags[field] = flagcol
return data, flags
def flagManual(data, flags, field, flagger, **kwargs):
return data, flags
def flagMad(data, flags, field, flagger, **kwargs):
return data, flags
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import numbers
from typing import Union
import numpy as np
import pandas as pd
import numba as nb
@nb.jit(nopython=True, cache=True)
def findIndex(iterable, value, start):
i = start
while i < len(iterable):
v = iterable[i]
if v >= value:
# if v == value:
# include the end_date if present
# return i + 1
return i
i = i + 1
return -1
@nb.jit(nopython=True, cache=True)
def valueRange(iterable):
minval = iterable[0]
maxval = minval
for v in iterable:
if v < minval:
minval = v
elif v > maxval:
maxval = v
return maxval - minval
def slidingWindowIndices(dates, window_size, iter_delta=None):
# lets work on numpy data structures for performance reasons
if isinstance(dates, pd.DataFrame):
dates = dates.index
dates = np.array(dates, dtype=np.int64)
if np.any(np.diff(dates) <= 0):
raise ValueError("strictly monotic index needed")
window_size = pd.to_timedelta(window_size, box=False).astype(np.int64)
if iter_delta:
iter_delta = pd.to_timedelta(iter_delta, box=False).astype(np.int64)
start_date = dates[0]
last_date = dates[-1]
start_idx = 0
end_idx = start_idx
while True:
end_date = start_date + window_size
if (end_date > last_date) or (start_idx == -1) or (end_idx == -1):
break
end_idx = findIndex(dates, end_date, end_idx)
yield start_idx, end_idx
if iter_delta:
start_idx = findIndex(dates, start_date+iter_delta, start_idx)
else:
start_idx += 1
start_date = dates[start_idx]
def numpyfy(arg: Union[pd.DataFrame,
pd.Series,
np.array,
numbers.Number]) -> np.ndarray:
try:
# pandas dataframe
return arg.values
except AttributeError:
try:
# numpy array
return arg.copy()
except AttributeError:
# scalar
return np.atleast_1d(arg)
def broadcastMany(*args):
arrays = [np.atleast_1d(a) for a in args]
target_ndim = max(arr.ndim for arr in arrays)
out = []
for arr in arrays:
out.append(arr[(slice(None),) + (None,) * (target_ndim - arr.ndim)])
target_shape = np.broadcast(*out).shape
return tuple(np.broadcast_to(arr, target_shape) for arr in out)
This diff is collapsed.
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from core import flaggingRunner
from config import Fields
from flagger import AbstractFlagger
from .testfuncs import initData, initEmptyFlags, printFailed, printSuccess
class TestFlagger(AbstractFlagger):
@property
def no_flag(self):
return 0
@property
def critical_flag(self):
return 2
def initMeta(data):
dates = data.index
variables = data.columns
randg = np.random.randint
start_dates = [dates[randg(0, (len(dates)//2)-1)] for _ in variables]
end_dates = [dates[randg(len(dates)//2, len(dates) - 1 )] for _ in variables]
tests = ["generic; func: abs(this) + 1 > 0"] * len(variables)
return pd.DataFrame({Fields.VARNAME: data.columns,
Fields.STARTDATE: start_dates,
Fields.ENDDATE: end_dates,
Fields.FLAGS: tests})
def testTemporalPartitioning():
data = initData()
flags = initEmptyFlags(data)
meta = initMeta(data)
flagger = TestFlagger()
pdata, pflags = flaggingRunner(meta, data, flags, flagger)
fields = [Fields.VARNAME, Fields.STARTDATE, Fields.ENDDATE]
for _, row in meta.iterrows():
vname, start_date, end_date = row[fields]
fchunk = pflags[vname].dropna()
assert fchunk.index.min() == start_date, printFailed("equal start dates")
assert fchunk.index.max() == end_date, printFailed("equal end dates")
printSuccess()
if __name__ == "__main__":
testTemporalPartitioning()
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
from .testfuncs import printSuccess, printFailed, initData, initEmptyFlags
from dsl.evaluator import evalCondition
def testConditions():
data = initData()
flags = initEmptyFlags(data)
tests = [
("this > 100",
data["var1"] > 100),
("var2 < 100",
data["var2"] < 100),
("abs(var2 - var1)/2 > 100",
abs(data["var2"] - data["var1"])/2 > 100),
("mean(var2) > max(var1)",
np.mean(data["var2"]) > np.max(data["var1"])),
("sum(var2)/len(var2) > max(var1)",
np.mean(data["var2"]) > np.max(data["var1"]))]
for test, expected in tests:
idx = evalCondition(test, data, flags, data.columns[0])
assert (idx == expected).all(), printFailed(test)
printSuccess()
def testMissingVariable():
data = initData()
flags = initEmptyFlags(data)
try:
evalCondition("var3 < 5", data, flags, data.columns[0])
except NameError:
return printSuccess()
print("test failed")
def testMissingIdentifier():
data = initData()
flags = initEmptyFlags(data)
tests = ["func(var2) < 5", "var3 != NODATA"]
for test in tests:
try:
evalCondition(test, data, flags, data.columns[0])
except NameError:
continue
else:
printFailed(test)
printSuccess()
if __name__ == "__main__":
testConditions()
testMissingIdentifier()
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
def printSuccess():
print("all tests passed")
def printFailed(test):
print("test failed: '{:}'".format(test))
def initData(start_date="2017-01-01", end_date="2017-12-31", freq="1h"):
dates = pd.date_range(start="2017-01-01", end="2017-12-31", freq="1h")
data = pd.DataFrame(
data={"var1": np.arange(len(dates)),
"var2": np.arange(len(dates), len(dates)*2)},
index=dates)
return data
def initEmptyFlags(data):
return pd.DataFrame(index=data.index, columns=data.columns)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment