Skip to content
Snippets Groups Projects
Commit 151a344c authored by Lennart Schmidt's avatar Lennart Schmidt
Browse files

Merge branch 'master' of https://git.ufz.de/rdm/saqc

parents 9330c717 7905c763
No related branches found
No related tags found
No related merge requests found
...@@ -125,7 +125,7 @@ In order to make your test available for the system you need to: ...@@ -125,7 +125,7 @@ In order to make your test available for the system you need to:
(i.e. a column index into `data` and `columns`). (i.e. a column index into `data` and `columns`).
The data and flags for this variable are available via `data[field]` and The data and flags for this variable are available via `data[field]` and
`flags[field]` respectively `flags[field]` respectively
+ `flagger: flagger.CategoricalBaseFlagger`: An instance of the `CategoricalBaseFlagger` class + `flagger: flagger.CategoricalFlagger`: An instance of the `CategoricalFlagger` class
(more likely one of its subclasses). To initialize, create or check (more likely one of its subclasses). To initialize, create or check
against existing flags you should use the respective `flagger`-methods against existing flags you should use the respective `flagger`-methods
(`flagger.empytFlags`, `flagger.isFlagged` and `flagger.setFlag`) (`flagger.empytFlags`, `flagger.isFlagged` and `flagger.setFlag`)
......
...@@ -25,7 +25,10 @@ Main documentation of the implemented functions, their purpose and parameters an ...@@ -25,7 +25,10 @@ Main documentation of the implemented functions, their purpose and parameters an
- [machinelearning](#machinelearning) - [machinelearning](#machinelearning)
- [harmonize](#harmonize) - [harmonize](#harmonize)
- [deharmonize](#deharmonize) - [deharmonize](#deharmonize)
- [harmonize_shift2Grid](#harmonize_shift2Grid) - [harmonize_shift2Grid](#harmonize_shift2grid)
- [harmonize_aggregate2Grid](#harmonize_aggregate2grid)
- [harmonize_linear2Grid](#harmonize_linear2grid)
- [harmonize_interpolate2Grid](#harmonize_interpolate2grid)
## range ## range
...@@ -774,7 +777,7 @@ and than: ...@@ -774,7 +777,7 @@ and than:
(According to the flagging order of the current flagger.) (According to the flagging order of the current flagger.)
## harmonize_shift2Grid ## harmonize_shift2grid
``` ```
harmonize_shift2Grid(freq, shift_method='nearest_shift', drop_flags=None) harmonize_shift2Grid(freq, shift_method='nearest_shift', drop_flags=None)
...@@ -819,7 +822,7 @@ In detail, the process includes: ...@@ -819,7 +822,7 @@ In detail, the process includes:
if there is one available in the succeeding sampling interval. If not, BAD/np.nan - flag gets assigned. if there is one available in the succeeding sampling interval. If not, BAD/np.nan - flag gets assigned.
* `"nearest_shift"`: every grid point gets assigned the closest flag/datapoint in its range. ( range = +/- `freq`/2 ). * `"nearest_shift"`: every grid point gets assigned the closest flag/datapoint in its range. ( range = +/- `freq`/2 ).
## harmonize_aggregate2Grid ## harmonize_aggregate2grid
``` ```
harmonize_aggregate2Grid(freq, agg_func, agg_method='nearest_agg', flag_agg_func=max, drop_flags=None) harmonize_aggregate2Grid(freq, agg_func, agg_method='nearest_agg', flag_agg_func=max, drop_flags=None)
...@@ -876,7 +879,7 @@ In detail, the process includes: ...@@ -876,7 +879,7 @@ In detail, the process includes:
aggregated with the function passed to agg_method and assigned to it. aggregated with the function passed to agg_method and assigned to it.
## harmonize_linear2Grid ## harmonize_linear2grid
``` ```
harmonize_linear2Grid(freq, flag_assignment_method='nearest_agg', flag_agg_func=max, drop_flags=None) harmonize_linear2Grid(freq, flag_assignment_method='nearest_agg', flag_agg_func=max, drop_flags=None)
...@@ -929,7 +932,7 @@ Linear interpolation of an inserted equidistant frequency grid of sampling rate ...@@ -929,7 +932,7 @@ Linear interpolation of an inserted equidistant frequency grid of sampling rate
aggregated with the function passed to `agg_func` and assigned to it. aggregated with the function passed to `agg_func` and assigned to it.
## harmonize_interpolate2Grid ## harmonize_interpolate2grid
``` ```
harmonize_interpolate2Grid(freq, interpolation_method, interpolation_order=1, flag_assignment_method='nearest_agg', harmonize_interpolate2Grid(freq, interpolation_method, interpolation_order=1, flag_assignment_method='nearest_agg',
......
...@@ -9,7 +9,7 @@ from saqc.core.reader import readConfig, prepareConfig, checkConfig ...@@ -9,7 +9,7 @@ from saqc.core.reader import readConfig, prepareConfig, checkConfig
from saqc.core.config import Fields from saqc.core.config import Fields
from saqc.core.evaluator import evalExpression from saqc.core.evaluator import evalExpression
from saqc.lib.plotting import plotHook, plotAllHook from saqc.lib.plotting import plotHook, plotAllHook
from saqc.flagger import BaseFlagger, CategoricalBaseFlagger, SimpleFlagger, DmpFlagger from saqc.flagger import BaseFlagger, CategoricalFlagger, SimpleFlagger, DmpFlagger
def _collectVariables(meta, data): def _collectVariables(meta, data):
...@@ -39,7 +39,7 @@ def _checkInput(data, flags, flagger): ...@@ -39,7 +39,7 @@ def _checkInput(data, flags, flagger):
raise TypeError("the columns of data is not allowed to be a multiindex") raise TypeError("the columns of data is not allowed to be a multiindex")
if not isinstance(flagger, BaseFlagger): if not isinstance(flagger, BaseFlagger):
flaggerlist = [CategoricalBaseFlagger, SimpleFlagger, DmpFlagger] flaggerlist = [CategoricalFlagger, SimpleFlagger, DmpFlagger]
raise TypeError( raise TypeError(
f"flagger must be of type {flaggerlist} or any inherit class from {BaseFlagger}" f"flagger must be of type {flaggerlist} or any inherit class from {BaseFlagger}"
) )
...@@ -63,10 +63,10 @@ def _setup(): ...@@ -63,10 +63,10 @@ def _setup():
pd.set_option("mode.chained_assignment", "warn") pd.set_option("mode.chained_assignment", "warn")
def runner(metafname, flagger, data, flags=None, nodata=np.nan, error_policy="raise"): def runner(config_file, flagger, data, flags=None, nodata=np.nan, error_policy="raise"):
_setup() _setup()
_checkInput(data, flags, flagger) _checkInput(data, flags, flagger)
config = prepareConfig(readConfig(metafname), data) config = prepareConfig(readConfig(config_file), data)
# split config into the test and some 'meta' data # split config into the test and some 'meta' data
tests = config.filter(regex=Fields.TESTS) tests = config.filter(regex=Fields.TESTS)
......
...@@ -90,4 +90,4 @@ def prepareConfig(config_df, data): ...@@ -90,4 +90,4 @@ def prepareConfig(config_df, data):
def readConfig(fname): def readConfig(fname):
return pd.read_csv(fname, delimiter=",", skipinitialspace=True) return pd.read_csv(fname, delimiter=";", skipinitialspace=True)
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from saqc.flagger.baseflagger import BaseFlagger from saqc.flagger.baseflagger import BaseFlagger
from saqc.flagger.categoricalflagger import CategoricalBaseFlagger from saqc.flagger.categoricalflagger import CategoricalFlagger
from saqc.flagger.simpleflagger import SimpleFlagger from saqc.flagger.simpleflagger import SimpleFlagger
from saqc.flagger.dmpflagger import DmpFlagger from saqc.flagger.dmpflagger import DmpFlagger
from saqc.flagger.continuousflagger import ContinuousBaseFlagger from saqc.flagger.continuousflagger import ContinuousFlagger
...@@ -165,8 +165,8 @@ class BaseFlagger(ABC): ...@@ -165,8 +165,8 @@ class BaseFlagger(ABC):
) -> PandasT: ) -> PandasT:
field = field or slice(None) field = field or slice(None)
locator = [l for l in (loc, iloc, slice(None)) if l is not None][0] locator = [l for l in (loc, iloc, slice(None)) if l is not None][0]
flags = self._flags[toSequence(field)] index = self._flags.index
mask = pd.Series(data=np.zeros(len(flags), dtype=bool), index=flags.index) mask = pd.Series(data=np.zeros(len(index), dtype=bool), index=index)
mask[locator] = True mask[locator] = True
return mask return mask
......
...@@ -20,7 +20,7 @@ class Flags(pd.CategoricalDtype): ...@@ -20,7 +20,7 @@ class Flags(pd.CategoricalDtype):
super().__init__(flags, ordered=True) super().__init__(flags, ordered=True)
class CategoricalBaseFlagger(BaseFlagger): class CategoricalFlagger(BaseFlagger):
def __init__(self, flags): def __init__(self, flags):
super().__init__(dtype=Flags(flags)) super().__init__(dtype=Flags(flags))
self._categories = self.dtype.categories self._categories = self.dtype.categories
......
...@@ -8,7 +8,7 @@ import intervals ...@@ -8,7 +8,7 @@ import intervals
from saqc.flagger.baseflagger import BaseFlagger from saqc.flagger.baseflagger import BaseFlagger
class ContinuousBaseFlagger(BaseFlagger): class ContinuousFlagger(BaseFlagger):
def __init__(self, min_=0.0, max_=1.0, unflagged=-1.0): def __init__(self, min_=0.0, max_=1.0, unflagged=-1.0):
assert unflagged < 0 <= min_ < max_ assert unflagged < 0 <= min_ < max_
super().__init__(dtype=np.float64) super().__init__(dtype=np.float64)
......
...@@ -8,7 +8,7 @@ from typing import Union, Sequence ...@@ -8,7 +8,7 @@ from typing import Union, Sequence
import pandas as pd import pandas as pd
from saqc.flagger.categoricalflagger import CategoricalBaseFlagger from saqc.flagger.categoricalflagger import CategoricalFlagger
from saqc.lib.tools import assertDataFrame, toSequence, assertScalar from saqc.lib.tools import assertDataFrame, toSequence, assertScalar
...@@ -30,7 +30,7 @@ class ColumnLevels: ...@@ -30,7 +30,7 @@ class ColumnLevels:
FLAGS = ["NIL", "OK", "DOUBTFUL", "BAD"] FLAGS = ["NIL", "OK", "DOUBTFUL", "BAD"]
class DmpFlagger(CategoricalBaseFlagger): class DmpFlagger(CategoricalFlagger):
def __init__(self): def __init__(self):
super().__init__(FLAGS) super().__init__(FLAGS)
self.flags_fields = [FlagFields.FLAG, FlagFields.CAUSE, FlagFields.COMMENT] self.flags_fields = [FlagFields.FLAG, FlagFields.CAUSE, FlagFields.COMMENT]
......
#! /usr/bin/env python #! /usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
TODO: remove
"""
from saqc.flagger.categoricalflagger import CategoricalBaseFlagger from saqc.flagger.categoricalflagger import CategoricalFlagger
FLAGS = [-1, 0, 1] FLAGS = [-1, 0, 1]
class SimpleFlagger(CategoricalBaseFlagger): class SimpleFlagger(CategoricalFlagger):
def __init__(self): def __init__(self):
super().__init__(FLAGS) super().__init__(FLAGS)
...@@ -21,7 +21,9 @@ def flagGeneric(data, field, flagger, func, **kwargs): ...@@ -21,7 +21,9 @@ def flagGeneric(data, field, flagger, func, **kwargs):
# DmpFlagger.isFlagged does not preserve the name of the column # DmpFlagger.isFlagged does not preserve the name of the column
# it was executed on -> would be nice to overcome this restriction # it was executed on -> would be nice to overcome this restriction
flags_field = func.name if func.name in data.columns else field flags_field = func.name if func.name in data.columns else field
mask = func.squeeze() | flagger.isFlagged(flags_field) mask = func.squeeze()
if flags_field in flagger.getFlags():
mask |= flagger.isFlagged(flags_field)
if np.isscalar(mask): if np.isscalar(mask):
raise TypeError(f"generic expression does not return an array") raise TypeError(f"generic expression does not return an array")
if not np.issubdtype(mask.dtype, np.bool_): if not np.issubdtype(mask.dtype, np.bool_):
......
...@@ -829,7 +829,8 @@ def linear2Grid(data, field, flagger, freq, flag_assignment_method='nearest_agg' ...@@ -829,7 +829,8 @@ def linear2Grid(data, field, flagger, freq, flag_assignment_method='nearest_agg'
@register('harmonize_interpolate2Grid') @register('harmonize_interpolate2Grid')
def interpolate2Grid(data, field, flagger, freq, interpolation_method, interpolation_order=1, flag_assignment_method='nearest_agg', flag_agg_func=max, drop_flags=None, **kwargs): def interpolate2Grid(data, field, flagger, freq, interpolation_method, interpolation_order=1,
flag_assignment_method='nearest_agg', flag_agg_func=max, drop_flags=None, **kwargs):
return harmonize( return harmonize(
data, data,
field, field,
...@@ -842,4 +843,61 @@ def interpolate2Grid(data, field, flagger, freq, interpolation_method, interpola ...@@ -842,4 +843,61 @@ def interpolate2Grid(data, field, flagger, freq, interpolation_method, interpola
drop_flags=drop_flags, drop_flags=drop_flags,
**kwargs) **kwargs)
#def aggregate(data, field, flagger, freq_base, freq_target, agg_func, **kwargs):
\ No newline at end of file def aggregate(data, field, flagger, source_freq, target_freq, agg_func=np.mean, sample_func=np.mean,
invalid_flags=None, max_invalid=np.inf, **kwargs):
# define the "fastest possible" aggregator
if sample_func is None:
if max_invalid < np.inf:
def aggregator(x):
if x.isna().sum() < max_invalid:
return agg_func(x)
else:
return np.nan
else:
def aggregator(x):
return agg_func(x)
else:
dummy_resampler = pd.Series(np.nan, index=[pd.Timedelta('1min')]).resample('1min')
if hasattr(dummy_resampler, sample_func.__name__):
sample_func_name = sample_func.__name__
if max_invalid < np.inf:
def aggregator(x):
y = getattr(x.resample(source_freq), sample_func_name)()
if y.isna().sum() < max_invalid:
return agg_func(y)
else:
return np.nan
else:
def aggregator(x):
return agg_func(getattr(x.resample(source_freq), sample_func_name)())
else:
if max_invalid < np.inf:
def aggregator(x):
y = x.resample(source_freq).apply(sample_func)
if y.isna().sum() < max_invalid:
return agg_func(y)
else:
return np.nan
else:
def aggregator(x):
return agg_func(x.resample(source_freq).apply(sample_func))
return harmonize(
data,
field,
flagger,
target_freq,
inter_method='bagg',
reshape_method='bagg',
inter_agg=aggregator,
reshape_agg=max,
drop_flags=invalid_flags,
**kwargs)
#! /usr/bin/env python #! /usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# TODO: use the logging module
import logging import logging
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from warnings import warn
__plotvars = [] __plotvars = []
...@@ -24,7 +22,9 @@ def plotHook(data, old, new, varname, do_plot, flag_test, plot_nans=True): ...@@ -24,7 +22,9 @@ def plotHook(data, old, new, varname, do_plot, flag_test, plot_nans=True):
__plotvars.append(varname) __plotvars.append(varname)
# cannot use getFlags here, because if a flag was set (e.g. with force) the # cannot use getFlags here, because if a flag was set (e.g. with force) the
# flag may be the same, but any additional row (e.g. comment-field) would differ # flag may be the same, but any additional row (e.g. comment-field) would differ
mask = (old._flags[varname] != new._flags[varname]).any(axis=1) mask = (old._flags[varname] != new._flags[varname])
if isinstance(mask, pd.DataFrame):
mask = mask.any(axis=1)
_plot(data, new, mask, varname, title=flag_test, plot_nans=plot_nans) _plot(data, new, mask, varname, title=flag_test, plot_nans=plot_nans)
......
...@@ -9,8 +9,8 @@ import pandas as pd ...@@ -9,8 +9,8 @@ import pandas as pd
from saqc.core.core import prepareConfig, readConfig from saqc.core.core import prepareConfig, readConfig
from saqc.flagger import ( from saqc.flagger import (
ContinuousBaseFlagger, ContinuousFlagger,
CategoricalBaseFlagger, CategoricalFlagger,
SimpleFlagger, SimpleFlagger,
DmpFlagger, DmpFlagger,
) )
...@@ -20,10 +20,10 @@ TESTNODATA = (np.nan, -9999) ...@@ -20,10 +20,10 @@ TESTNODATA = (np.nan, -9999)
TESTFLAGGER = ( TESTFLAGGER = (
CategoricalBaseFlagger(["NIL", "GOOD", "BAD"]), CategoricalFlagger(["NIL", "GOOD", "BAD"]),
SimpleFlagger(), SimpleFlagger(),
DmpFlagger(), DmpFlagger(),
ContinuousBaseFlagger(), ContinuousFlagger(),
) )
...@@ -59,6 +59,6 @@ def initMetaDict(config_dict, data): ...@@ -59,6 +59,6 @@ def initMetaDict(config_dict, data):
df = pd.DataFrame(config_dict)[_getKeys(config_dict)] df = pd.DataFrame(config_dict)[_getKeys(config_dict)]
meta = prepareConfig(df, data) meta = prepareConfig(df, data)
fobj = io.StringIO() fobj = io.StringIO()
meta.to_csv(fobj, index=False) meta.to_csv(fobj, index=False, sep=";")
fobj.seek(0) fobj.seek(0)
return fobj, meta return fobj, meta
...@@ -18,7 +18,8 @@ from saqc.funcs.harm_functions import ( ...@@ -18,7 +18,8 @@ from saqc.funcs.harm_functions import (
linear2Grid, linear2Grid,
interpolate2Grid, interpolate2Grid,
shift2Grid, shift2Grid,
aggregate2Grid aggregate2Grid,
aggregate
) )
...@@ -334,20 +335,10 @@ def test_wrapper(data, flagger): ...@@ -334,20 +335,10 @@ def test_wrapper(data, flagger):
field = data.columns[0] field = data.columns[0]
freq = '15min' freq = '15min'
flagger = flagger.initFlags(data) flagger = flagger.initFlags(data)
aggregate(data, field, flagger, '15min', '30min', agg_func=np.sum, sample_func=np.mean)
linear2Grid(data, field, flagger, freq, flag_assignment_method='nearest_agg', flag_agg_func=max, linear2Grid(data, field, flagger, freq, flag_assignment_method='nearest_agg', flag_agg_func=max,
drop_flags=None) drop_flags=None)
aggregate2Grid(data, field, flagger, freq, agg_func=sum, agg_method='nearest_agg', aggregate2Grid(data, field, flagger, freq, agg_func=sum, agg_method='nearest_agg',
flag_agg_func=max, drop_flags=None) flag_agg_func=max, drop_flags=None)
shift2Grid(data, field, flagger, freq, shift_method='nearest_shift', drop_flags=None) shift2Grid(data, field, flagger, freq, shift_method='nearest_shift', drop_flags=None)
if __name__ == "__main__":
dat = data()
flagger = TESTFLAGGER[1]
test_gridInterpolation(data(), 'polynomial')
flagger2 = TESTFLAGGER[2]
flagger = flagger.initFlags(dat)
flagger2 = flagger.initFlags(dat2)
dat_out, flagger = interpolate2Grid(dat, 'data', flagger, '15min', interpolation_method="polynomial", flag_assignment_method='nearest_agg',
flag_agg_func=max, drop_flags=None)
print("stop")
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment