diff --git a/README.md b/README.md index 454a87a6bcc088fe811862e538ceaf66f2ca2f7b..4233c97a77241a266b55f5779ff1aeaaaaddf9df 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ In order to make your test available for the system you need to: (i.e. a column index into `data` and `columns`). The data and flags for this variable are available via `data[field]` and `flags[field]` respectively - + `flagger: flagger.CategoricalBaseFlagger`: An instance of the `CategoricalBaseFlagger` class + + `flagger: flagger.CategoricalFlagger`: An instance of the `CategoricalFlagger` class (more likely one of its subclasses). To initialize, create or check against existing flags you should use the respective `flagger`-methods (`flagger.empytFlags`, `flagger.isFlagged` and `flagger.setFlag`) diff --git a/docs/FunctionDescriptions.md b/docs/FunctionDescriptions.md index d837345182957bbe16e9c576daad55010430ed40..4936c03b95e81096415f1b42a969026cda646695 100644 --- a/docs/FunctionDescriptions.md +++ b/docs/FunctionDescriptions.md @@ -25,7 +25,10 @@ Main documentation of the implemented functions, their purpose and parameters an - [machinelearning](#machinelearning) - [harmonize](#harmonize) - [deharmonize](#deharmonize) - - [harmonize_shift2Grid](#harmonize_shift2Grid) + - [harmonize_shift2Grid](#harmonize_shift2grid) + - [harmonize_aggregate2Grid](#harmonize_aggregate2grid) + - [harmonize_linear2Grid](#harmonize_linear2grid) + - [harmonize_interpolate2Grid](#harmonize_interpolate2grid) ## range @@ -774,7 +777,7 @@ and than: (According to the flagging order of the current flagger.) -## harmonize_shift2Grid +## harmonize_shift2grid ``` harmonize_shift2Grid(freq, shift_method='nearest_shift', drop_flags=None) @@ -819,7 +822,7 @@ In detail, the process includes: if there is one available in the succeeding sampling interval. If not, BAD/np.nan - flag gets assigned. * `"nearest_shift"`: every grid point gets assigned the closest flag/datapoint in its range. ( range = +/- `freq`/2 ). -## harmonize_aggregate2Grid +## harmonize_aggregate2grid ``` harmonize_aggregate2Grid(freq, agg_func, agg_method='nearest_agg', flag_agg_func=max, drop_flags=None) @@ -876,7 +879,7 @@ In detail, the process includes: aggregated with the function passed to agg_method and assigned to it. -## harmonize_linear2Grid +## harmonize_linear2grid ``` harmonize_linear2Grid(freq, flag_assignment_method='nearest_agg', flag_agg_func=max, drop_flags=None) @@ -929,7 +932,7 @@ Linear interpolation of an inserted equidistant frequency grid of sampling rate aggregated with the function passed to `agg_func` and assigned to it. -## harmonize_interpolate2Grid +## harmonize_interpolate2grid ``` harmonize_interpolate2Grid(freq, interpolation_method, interpolation_order=1, flag_assignment_method='nearest_agg', diff --git a/saqc/core/core.py b/saqc/core/core.py index 5d896ecb8307cb36bc56abbad895da0769d1b8a9..0d0b1efe7c8fe0446a2197bd22a66b563b10a62c 100644 --- a/saqc/core/core.py +++ b/saqc/core/core.py @@ -9,7 +9,7 @@ from saqc.core.reader import readConfig, prepareConfig, checkConfig from saqc.core.config import Fields from saqc.core.evaluator import evalExpression from saqc.lib.plotting import plotHook, plotAllHook -from saqc.flagger import BaseFlagger, CategoricalBaseFlagger, SimpleFlagger, DmpFlagger +from saqc.flagger import BaseFlagger, CategoricalFlagger, SimpleFlagger, DmpFlagger def _collectVariables(meta, data): @@ -39,7 +39,7 @@ def _checkInput(data, flags, flagger): raise TypeError("the columns of data is not allowed to be a multiindex") if not isinstance(flagger, BaseFlagger): - flaggerlist = [CategoricalBaseFlagger, SimpleFlagger, DmpFlagger] + flaggerlist = [CategoricalFlagger, SimpleFlagger, DmpFlagger] raise TypeError( f"flagger must be of type {flaggerlist} or any inherit class from {BaseFlagger}" ) @@ -63,10 +63,10 @@ def _setup(): pd.set_option("mode.chained_assignment", "warn") -def runner(metafname, flagger, data, flags=None, nodata=np.nan, error_policy="raise"): +def runner(config_file, flagger, data, flags=None, nodata=np.nan, error_policy="raise"): _setup() _checkInput(data, flags, flagger) - config = prepareConfig(readConfig(metafname), data) + config = prepareConfig(readConfig(config_file), data) # split config into the test and some 'meta' data tests = config.filter(regex=Fields.TESTS) diff --git a/saqc/core/reader.py b/saqc/core/reader.py index a75aecb5b7eddd046d3eb8f6af2a512dae9b69e5..97f50969b7213e36233fa0191855911ef1436a25 100644 --- a/saqc/core/reader.py +++ b/saqc/core/reader.py @@ -90,4 +90,4 @@ def prepareConfig(config_df, data): def readConfig(fname): - return pd.read_csv(fname, delimiter=",", skipinitialspace=True) + return pd.read_csv(fname, delimiter=";", skipinitialspace=True) diff --git a/saqc/flagger/__init__.py b/saqc/flagger/__init__.py index 0561497bf5dfb061c7e3cae75aed1eacd0e553d3..3c942296fd455c1d1632a5880afa5759f394c787 100644 --- a/saqc/flagger/__init__.py +++ b/saqc/flagger/__init__.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- from saqc.flagger.baseflagger import BaseFlagger -from saqc.flagger.categoricalflagger import CategoricalBaseFlagger +from saqc.flagger.categoricalflagger import CategoricalFlagger from saqc.flagger.simpleflagger import SimpleFlagger from saqc.flagger.dmpflagger import DmpFlagger -from saqc.flagger.continuousflagger import ContinuousBaseFlagger +from saqc.flagger.continuousflagger import ContinuousFlagger diff --git a/saqc/flagger/baseflagger.py b/saqc/flagger/baseflagger.py index 898df1ddaac836fe4c345eb37aa482273cf40790..c0bd22f0856dceb71bbbfa4ab7a129cf4022b962 100644 --- a/saqc/flagger/baseflagger.py +++ b/saqc/flagger/baseflagger.py @@ -165,8 +165,8 @@ class BaseFlagger(ABC): ) -> PandasT: field = field or slice(None) locator = [l for l in (loc, iloc, slice(None)) if l is not None][0] - flags = self._flags[toSequence(field)] - mask = pd.Series(data=np.zeros(len(flags), dtype=bool), index=flags.index) + index = self._flags.index + mask = pd.Series(data=np.zeros(len(index), dtype=bool), index=index) mask[locator] = True return mask diff --git a/saqc/flagger/categoricalflagger.py b/saqc/flagger/categoricalflagger.py index 33784ef8a052c29edd8868df501d7fd3f86ad947..7e36ea563802a5779a4d5ae87ae59fe27a6cedd1 100644 --- a/saqc/flagger/categoricalflagger.py +++ b/saqc/flagger/categoricalflagger.py @@ -20,7 +20,7 @@ class Flags(pd.CategoricalDtype): super().__init__(flags, ordered=True) -class CategoricalBaseFlagger(BaseFlagger): +class CategoricalFlagger(BaseFlagger): def __init__(self, flags): super().__init__(dtype=Flags(flags)) self._categories = self.dtype.categories diff --git a/saqc/flagger/continuousflagger.py b/saqc/flagger/continuousflagger.py index a338dce806623f4fba1782df4e2b33941eadd034..7309308e271e8d1d4e46b8b3e55c2cbc4241d2ab 100644 --- a/saqc/flagger/continuousflagger.py +++ b/saqc/flagger/continuousflagger.py @@ -8,7 +8,7 @@ import intervals from saqc.flagger.baseflagger import BaseFlagger -class ContinuousBaseFlagger(BaseFlagger): +class ContinuousFlagger(BaseFlagger): def __init__(self, min_=0.0, max_=1.0, unflagged=-1.0): assert unflagged < 0 <= min_ < max_ super().__init__(dtype=np.float64) diff --git a/saqc/flagger/dmpflagger.py b/saqc/flagger/dmpflagger.py index 2c0d62d6bc0cc28a1de833441701c799aafc473a..f2042c8ad58f387ad4029823233eec53d30c8165 100644 --- a/saqc/flagger/dmpflagger.py +++ b/saqc/flagger/dmpflagger.py @@ -8,7 +8,7 @@ from typing import Union, Sequence import pandas as pd -from saqc.flagger.categoricalflagger import CategoricalBaseFlagger +from saqc.flagger.categoricalflagger import CategoricalFlagger from saqc.lib.tools import assertDataFrame, toSequence, assertScalar @@ -30,7 +30,7 @@ class ColumnLevels: FLAGS = ["NIL", "OK", "DOUBTFUL", "BAD"] -class DmpFlagger(CategoricalBaseFlagger): +class DmpFlagger(CategoricalFlagger): def __init__(self): super().__init__(FLAGS) self.flags_fields = [FlagFields.FLAG, FlagFields.CAUSE, FlagFields.COMMENT] diff --git a/saqc/flagger/simpleflagger.py b/saqc/flagger/simpleflagger.py index a392aec402225c0f4877091827336ea3ad780a5b..1b0555794eacfb839da37b20be2af6cc03185081 100644 --- a/saqc/flagger/simpleflagger.py +++ b/saqc/flagger/simpleflagger.py @@ -1,13 +1,16 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- +""" +TODO: remove +""" -from saqc.flagger.categoricalflagger import CategoricalBaseFlagger +from saqc.flagger.categoricalflagger import CategoricalFlagger FLAGS = [-1, 0, 1] -class SimpleFlagger(CategoricalBaseFlagger): +class SimpleFlagger(CategoricalFlagger): def __init__(self): super().__init__(FLAGS) diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index f626cb23eb0a00c401ad5652a4a972d2ff56f97a..55ea5b172999f08faa423f93b4242808ff363ac9 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -21,7 +21,9 @@ def flagGeneric(data, field, flagger, func, **kwargs): # DmpFlagger.isFlagged does not preserve the name of the column # it was executed on -> would be nice to overcome this restriction flags_field = func.name if func.name in data.columns else field - mask = func.squeeze() | flagger.isFlagged(flags_field) + mask = func.squeeze() + if flags_field in flagger.getFlags(): + mask |= flagger.isFlagged(flags_field) if np.isscalar(mask): raise TypeError(f"generic expression does not return an array") if not np.issubdtype(mask.dtype, np.bool_): diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index 9f96141d1ef884d06db4dca6743fd84f5be4a03b..d5bb8f577946ca246b114420b4ff71a62f2059c6 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -829,7 +829,8 @@ def linear2Grid(data, field, flagger, freq, flag_assignment_method='nearest_agg' @register('harmonize_interpolate2Grid') -def interpolate2Grid(data, field, flagger, freq, interpolation_method, interpolation_order=1, flag_assignment_method='nearest_agg', flag_agg_func=max, drop_flags=None, **kwargs): +def interpolate2Grid(data, field, flagger, freq, interpolation_method, interpolation_order=1, + flag_assignment_method='nearest_agg', flag_agg_func=max, drop_flags=None, **kwargs): return harmonize( data, field, @@ -842,4 +843,61 @@ def interpolate2Grid(data, field, flagger, freq, interpolation_method, interpola drop_flags=drop_flags, **kwargs) -#def aggregate(data, field, flagger, freq_base, freq_target, agg_func, **kwargs): \ No newline at end of file + +def aggregate(data, field, flagger, source_freq, target_freq, agg_func=np.mean, sample_func=np.mean, + invalid_flags=None, max_invalid=np.inf, **kwargs): + + # define the "fastest possible" aggregator + if sample_func is None: + if max_invalid < np.inf: + + def aggregator(x): + if x.isna().sum() < max_invalid: + return agg_func(x) + else: + return np.nan + else: + + def aggregator(x): + return agg_func(x) + else: + + dummy_resampler = pd.Series(np.nan, index=[pd.Timedelta('1min')]).resample('1min') + if hasattr(dummy_resampler, sample_func.__name__): + + sample_func_name = sample_func.__name__ + if max_invalid < np.inf: + def aggregator(x): + y = getattr(x.resample(source_freq), sample_func_name)() + if y.isna().sum() < max_invalid: + return agg_func(y) + else: + return np.nan + + else: + def aggregator(x): + return agg_func(getattr(x.resample(source_freq), sample_func_name)()) + + else: + if max_invalid < np.inf: + def aggregator(x): + y = x.resample(source_freq).apply(sample_func) + if y.isna().sum() < max_invalid: + return agg_func(y) + else: + return np.nan + else: + def aggregator(x): + return agg_func(x.resample(source_freq).apply(sample_func)) + + return harmonize( + data, + field, + flagger, + target_freq, + inter_method='bagg', + reshape_method='bagg', + inter_agg=aggregator, + reshape_agg=max, + drop_flags=invalid_flags, + **kwargs) diff --git a/saqc/lib/plotting.py b/saqc/lib/plotting.py index 3ea8dea32475651bcbfaf9f12347c873b55b404e..900efcad2d15634bbe209ac77a8c80a50851da23 100644 --- a/saqc/lib/plotting.py +++ b/saqc/lib/plotting.py @@ -1,11 +1,9 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- -# TODO: use the logging module import logging import pandas as pd import numpy as np -from warnings import warn __plotvars = [] @@ -24,7 +22,9 @@ def plotHook(data, old, new, varname, do_plot, flag_test, plot_nans=True): __plotvars.append(varname) # cannot use getFlags here, because if a flag was set (e.g. with force) the # flag may be the same, but any additional row (e.g. comment-field) would differ - mask = (old._flags[varname] != new._flags[varname]).any(axis=1) + mask = (old._flags[varname] != new._flags[varname]) + if isinstance(mask, pd.DataFrame): + mask = mask.any(axis=1) _plot(data, new, mask, varname, title=flag_test, plot_nans=plot_nans) diff --git a/test/common.py b/test/common.py index f0d53d79e71050a15aae1cea1a1aca70f852f1c5..fb86083b3bb37b4ee0906aebb01d110603070c19 100644 --- a/test/common.py +++ b/test/common.py @@ -9,8 +9,8 @@ import pandas as pd from saqc.core.core import prepareConfig, readConfig from saqc.flagger import ( - ContinuousBaseFlagger, - CategoricalBaseFlagger, + ContinuousFlagger, + CategoricalFlagger, SimpleFlagger, DmpFlagger, ) @@ -20,10 +20,10 @@ TESTNODATA = (np.nan, -9999) TESTFLAGGER = ( - CategoricalBaseFlagger(["NIL", "GOOD", "BAD"]), + CategoricalFlagger(["NIL", "GOOD", "BAD"]), SimpleFlagger(), DmpFlagger(), - ContinuousBaseFlagger(), + ContinuousFlagger(), ) @@ -59,6 +59,6 @@ def initMetaDict(config_dict, data): df = pd.DataFrame(config_dict)[_getKeys(config_dict)] meta = prepareConfig(df, data) fobj = io.StringIO() - meta.to_csv(fobj, index=False) + meta.to_csv(fobj, index=False, sep=";") fobj.seek(0) return fobj, meta diff --git a/test/funcs/test_harm_funcs.py b/test/funcs/test_harm_funcs.py index 2419370e319243a54417d18225c9aba99586a48d..825f442651cade79ecdf64184e317bbe34458ef3 100644 --- a/test/funcs/test_harm_funcs.py +++ b/test/funcs/test_harm_funcs.py @@ -18,7 +18,8 @@ from saqc.funcs.harm_functions import ( linear2Grid, interpolate2Grid, shift2Grid, - aggregate2Grid + aggregate2Grid, + aggregate ) @@ -334,20 +335,10 @@ def test_wrapper(data, flagger): field = data.columns[0] freq = '15min' flagger = flagger.initFlags(data) + aggregate(data, field, flagger, '15min', '30min', agg_func=np.sum, sample_func=np.mean) + linear2Grid(data, field, flagger, freq, flag_assignment_method='nearest_agg', flag_agg_func=max, drop_flags=None) aggregate2Grid(data, field, flagger, freq, agg_func=sum, agg_method='nearest_agg', flag_agg_func=max, drop_flags=None) shift2Grid(data, field, flagger, freq, shift_method='nearest_shift', drop_flags=None) - -if __name__ == "__main__": - dat = data() - flagger = TESTFLAGGER[1] - test_gridInterpolation(data(), 'polynomial') - flagger2 = TESTFLAGGER[2] - flagger = flagger.initFlags(dat) - flagger2 = flagger.initFlags(dat2) - dat_out, flagger = interpolate2Grid(dat, 'data', flagger, '15min', interpolation_method="polynomial", flag_assignment_method='nearest_agg', - flag_agg_func=max, drop_flags=None) - - print("stop") \ No newline at end of file