Skip to content
Snippets Groups Projects
core.py 5.57 KiB
Newer Older
David Schäfer's avatar
David Schäfer committed
#! /usr/bin/env python
# -*- coding: utf-8 -*-

David Schäfer's avatar
David Schäfer committed
from math import ceil
David Schäfer's avatar
David Schäfer committed

import numpy as np
import pandas as pd

from config import Fields, Params
from funcs import flagDispatch
David Schäfer's avatar
David Schäfer committed
from dsl import parseFlag
from flagger import PositionalFlagger, BaseFlagger
class FlagParams:
    FLAG = "flag"
    PERIODE = "flag_period"
    VALUES = "flag_values"
David Schäfer's avatar
David Schäfer committed
def _inferFrequency(data):
    return pd.tseries.frequencies.to_offset(pd.infer_freq(data.index))


def _periodToTicks(period, freq):
    return int(ceil(pd.to_timedelta(period)/pd.to_timedelta(freq)))


David Schäfer's avatar
David Schäfer committed
def flagNext(flagger: BaseFlagger, flags: pd.Series, n: int) -> pd.Series:
    idx = np.where(flagger.isFlagged(flags))[0]
    for nn in range(1, n + 1):
        nn_idx = np.clip(idx + nn, a_min=None, a_max=len(flags) - 1)
        nn_idx_unflagged = nn_idx[~flagger.isFlagged(flags.iloc[nn_idx])]
        flags.values[nn_idx_unflagged] = flags.iloc[nn_idx_unflagged - nn]
David Schäfer's avatar
David Schäfer committed
    return flags


David Schäfer's avatar
David Schäfer committed
def runner(meta, flagger, data, flags=None, nodata=np.nan):

    if flags is None:
        flags = flagger.emptyFlags(data)
    else:
        if not all(flags.columns == flagger.emptyFlags(data.iloc[0]).columns):
            raise TypeError("structure of given flag does not "
                            "correspond to flagger requirements")
David Schäfer's avatar
David Schäfer committed

    # NOTE:
David Schäfer's avatar
David Schäfer committed
    # We need an index frequency in order to calculate ticks
    # from given periods further down the road. Maybe this
    # restriction should only be enforced when we really
    # need a time series...
David Schäfer's avatar
David Schäfer committed
    data.index.freq = _inferFrequency(data)
David Schäfer's avatar
David Schäfer committed
    if not data.index.freq:
        raise TypeError("cannot infer time frequency from dataset")
David Schäfer's avatar
David Schäfer committed

    # the required meta data columns
    fields = [Fields.VARNAME, Fields.STARTDATE, Fields.ENDDATE]

    # NOTE:
    # the outer loop runs over the flag tests, the inner one over the
    # variables. Switching the loop order would complicate the
    # reference to flags from other variables within the dataset
    flag_fields = meta.columns.to_series().filter(regex=Fields.FLAGS)
    for flag_pos, flag_field in enumerate(flag_fields):

        # NOTE: just an optimization
        if meta[flag_field].dropna().empty:
            continue

        for idx, configrow in meta.iterrows():
David Schäfer's avatar
David Schäfer committed

            flag_test = configrow[flag_field]
            if pd.isnull(flag_test):
                continue

            varname, start_date, end_date = configrow[fields]
            func_name, flag_params = parseFlag(flag_test)

            if flag_params.get(FlagParams.ASSIGN):
                dummy = pd.DataFrame(index=data.index, columns=[varname])
                flags[varname] = flagger.emptyFlags(dummy)
David Schäfer's avatar
David Schäfer committed
            if varname not in data:
David Schäfer's avatar
David Schäfer committed

            dchunk = data.loc[start_date:end_date]
            if dchunk.empty:
                continue
David Schäfer's avatar
David Schäfer committed
            # NOTE:
            # within the activation period of a variable, the flag will
            # be initialized if necessary
David Schäfer's avatar
David Schäfer committed
            fchunk = (flags
                      .loc[start_date:end_date]
                      .fillna({varname: flagger.no_flag}))

            try:
                dchunk, fchunk = flagDispatch(func_name,
                                              dchunk, fchunk, varname,
                                              flagger, nodata=nodata,
                                              **flag_params)
            except NameError:
                raise NameError(
                    f"function name {func_name} is not definied (variable '{varname}, 'line: {idx + 1})")


            # flag a timespan after the condition is met,
            # duration given in 'flag_period'
            flag_period = flag_params.pop(Params.FLAGPERIOD, None)
            if flag_period:
Bert Palm's avatar
Bert Palm committed
                flag_params[Params.FLAGVALUES] = _periodToTicks(flag_period, data.index.freq)

            # flag a certain amount of values after condition is met,
            # number given in 'flag_values'
            flag_values = flag_params.pop(Params.FLAGVALUES, None)
            if flag_values:
David Schäfer's avatar
David Schäfer committed
                fchunk[varname] = flagNext(flagger, fchunk[varname], flag_values)
David Schäfer's avatar
David Schäfer committed
            data.loc[start_date:end_date] = dchunk
            flags.loc[start_date:end_date] = fchunk
David Schäfer's avatar
David Schäfer committed

        flagger.nextTest()
David Schäfer's avatar
David Schäfer committed
    return data, flags


def prepareMeta(meta, data):
    # NOTE: an option needed to only pass tests within an file and deduce
David Schäfer's avatar
David Schäfer committed
    #       everything else from data
David Schäfer's avatar
David Schäfer committed
    # no dates given, fall back to the available date range
    if Fields.STARTDATE not in meta:
        meta = meta.assign(**{Fields.STARTDATE: np.nan})
    if Fields.ENDDATE not in meta:
        meta = meta.assign(**{Fields.ENDDATE: np.nan})
    meta = meta.fillna(
        {Fields.ENDDATE: data.index.max(),
Bert Palm's avatar
Bert Palm committed
         Fields.STARTDATE: data.index.min()})

    # rows without a variables name don't help much
David Schäfer's avatar
David Schäfer committed
    meta = meta.dropna(subset=[Fields.VARNAME])
David Schäfer's avatar
David Schäfer committed
    meta[Fields.STARTDATE] = pd.to_datetime(meta[Fields.STARTDATE])
    meta[Fields.ENDDATE] = pd.to_datetime(meta[Fields.ENDDATE])
    return meta


def readData(fname, index_col, nans):
David Schäfer's avatar
David Schäfer committed
    data = pd.read_csv(
        fname, index_col=index_col, parse_dates=True,
        na_values=nans, low_memory=False)
David Schäfer's avatar
David Schäfer committed
    data.columns = [c.split(" ")[0] for c in data.columns]
    data = data.reindex(
        pd.date_range(data.index.min(), data.index.max(), freq="10min"))
    return data


if __name__ == "__main__":

    datafname = "resources/data.csv"
    metafname = "resources/meta.csv"

    data = readData(datafname, index_col="Date Time", nans=["-9999", "-9999.0"])
David Schäfer's avatar
David Schäfer committed
    meta = prepareMeta(pd.read_csv(metafname), data)
David Schäfer's avatar
David Schäfer committed
    flagger = PositionalFlagger()
David Schäfer's avatar
David Schäfer committed
    pdata, pflags = runner(meta, flagger, data)