diff --git a/docs/funcs/DTW.md b/docs/funcs/DTW.md new file mode 100644 index 0000000000000000000000000000000000000000..b0c5a5a597fc1532226d9cfcb650fc9014540089 --- /dev/null +++ b/docs/funcs/DTW.md @@ -0,0 +1,24 @@ +## DTW + + +## Index +[flagDTW](#flagDTW) + +## flagDTW + +``` +flagDTW(refdatafield='SM1', window = 25, min_distance = 0.25, method_dtw = "fast") +``` + + +| parameter | data type | default value | description | +|-----------------------|---------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------| +| window | int | `25` |The number of datapoints to be included in each comparison window. | +| min_distance | float | `0.5` |The minimum distance of two graphs to be classified as "different". | +| method_dtw | string | `"fast"` |Implementation of DTW algorithm - "exact" for the normal implementation of DTW, "fast" for the fast implementation. | +| ref_datafield | string | |Name of the reference datafield ("correct" values) with which the actual datafield is compared. | + + +This function compares the data with a reference datafield (given in `ref_datafield`) of values we assume to be correct. The comparison is undertaken window-based, i.e. the two data fields are compared window by window, with overlapping windows. The function flags those values that lie in the middle of a window that exceeds a minimum distance value (given in `min_distance`). + +As comparison algorithm, we use the [Dynamic Time Warping (DTW) Algorithm](https://en.wikipedia.org/wiki/Dynamic_time_warping) that accounts for temporal and spacial offsets when calculating the distance. For a demonstration of the DTW, see the Wiki entry "Results for rain data set" in [Pattern Recognition with Wavelets](https://git.ufz.de/rdm-software/saqc/-/wikis/Pattern-Recognition-with-Wavelets#Results). diff --git a/docs/funcs/SoilMoisture.md b/docs/funcs/SoilMoisture.md index 107371e2de4eebd398c984bfcb3d77cb16b50215..705456715f6f2ca78525df44a0bb86ca304c4b76 100644 --- a/docs/funcs/SoilMoisture.md +++ b/docs/funcs/SoilMoisture.md @@ -31,7 +31,7 @@ sm_flagSpikes(raise_factor=0.15, deriv_factor=0.2, | smooth_window | [offset string](docs/ParameterDescriptions.md#offset-strings) | `"3h"` | | smooth_poly_deg | integer | `2` | -The Function is a wrapper around `spikes_flagSpektrumBased` +The function is a wrapper around `spikes_flagSpektrumBased` with a set of default parameters referring to [1]. For a complete description of the algorithm and the available parameters please refer to the documentation of [flagSpikes_spektrumBased](docs/funcs/SpikeDetection.md#spikes_spektrumbased) @@ -63,7 +63,7 @@ sm_flagBreaks(thresh_rel=0.1, thresh_abs=0.01, | smooth_poly_deg | integer | `2` | -The Function is a wrapper around `breaks_flagSpektrumBased` +The function is a wrapper around `breaks_flagSpektrumBased` with a set of default parameters referring to [1]. For a complete description of the algorithm and the available parameters please refer to the documentation of [breaks_spektrumBased](docs/funcs/BreakDetection.md#breaks_spektrumbased). @@ -119,7 +119,7 @@ NOTE: - The time series is expected to be harmonized to an [equidistant frequency grid](docs/funcs/TimeSeriesHarmonization.md) -This Function is based on [1] and all default parameter values are taken from this publication. +This function is based on [1] and all default parameter values are taken from this publication. [1] Dorigo, W. et al: Global Automated Quality Control of In Situ Soil Moisture Data from the international Soil Moisture Network. 2013. Vadoze Zone J. @@ -143,7 +143,7 @@ This function flags soil moisture values if the soil temperature (given in `soil_temp_variable`) drops below `frost_thresh` within a period of +/- `window`. -This Function is an implementation of the soil temperature based flagging +This function is an implementation of the soil temperature based flagging presented in [1] and all default parameter values are taken from this publication. @@ -199,7 +199,7 @@ is flagged, if: $` y_{k-j} + y_{k-j+1} + ... + y_{k} \le `$ `sensor_depth` $`\cdot`$ `sensor_accuracy` $`\cdot`$ `soil_porosity` -This Function is an implementation of the precipitation based flagging +This function is an implementation of the precipitation based flagging presented in [1] and all default parameter values are taken from this publication. @@ -222,7 +222,7 @@ sm_flagRandomForest(references, window_values, window_flags, path) | path | string | | Path to the respective model object, i.e. its name and the respective value of the grouping variable. e.g. "models/model_0.2.pkl" | -This Function uses pre-trained machine-learning model objects for flagging. +This function uses pre-trained machine-learning model objects for flagging. This requires training a model by use of the [training script](../ressources/machine_learning/train_machine_learning.py) provided. For flagging, inputs to the model are the data of the variable of interest, data of reference variables and the automatic flags that were assigned by other diff --git a/saqc/core/core.py b/saqc/core/core.py index b456d6b881de6ce3e74db8e98f57c4be77ab8e37..a0c25c9f1e7a57a6f7cc9906519d3abe87529567 100644 --- a/saqc/core/core.py +++ b/saqc/core/core.py @@ -173,11 +173,17 @@ def run( continue if configrow[Fields.PLOT]: - plotHook( - data_old=data_chunk, data_new=data_chunk_result, - flagger_old=flagger_chunk, flagger_new=flagger_chunk_result, - sources=[], targets=[varname], plot_name=func, - ) + try: + plotHook( + data_old=data_chunk, data_new=data_chunk_result, + flagger_old=flagger_chunk, flagger_new=flagger_chunk_result, + sources=[], targets=[varname], plot_name=func, + ) + except Exception: + logger.exception(f"Plotting failed. \n" + f" config line: {configrow[Fields.LINENUMBER]}\n" + f" expression: {func}\n" + f" variable(s): {[varname]}.") # NOTE: # time slicing support is currently disabled @@ -188,6 +194,11 @@ def run( plotfields = config[Fields.VARNAME][config[Fields.PLOT]] if len(plotfields) > 0: - plotAllHook(data, flagger) + try: + # to only show variables that have set the plot-flag + # use: plotAllHook(data, flagger, targets=plotfields) + plotAllHook(data, flagger) + except Exception: + logger.exception(f"Final plotting failed.") return data, flagger diff --git a/saqc/flagger/baseflagger.py b/saqc/flagger/baseflagger.py index 1817fd991c124b07e3fa444a2c22885cf6b9a20a..d98b63ccb83780c0d02a7c829391b9c801729e4d 100644 --- a/saqc/flagger/baseflagger.py +++ b/saqc/flagger/baseflagger.py @@ -3,15 +3,13 @@ import operator as op from copy import deepcopy -from collections import OrderedDict from abc import ABC, abstractmethod -from typing import TypeVar, Union, Any +from typing import TypeVar, Union, Any, List -import numpy as np import pandas as pd import dios.dios as dios -from saqc.lib.tools import toSequence, assertScalar, assertDictOfSeries +from saqc.lib.tools import assertScalar, mergeDios COMPARATOR_MAP = { "!=": op.ne, @@ -28,6 +26,7 @@ FlagT = Any diosT = dios.DictOfSeries BaseFlaggerT = TypeVar("BaseFlaggerT") PandasT = Union[pd.Series, diosT] +FieldsT = Union[str, List[str]] class BaseFlagger(ABC): @@ -67,7 +66,7 @@ class BaseFlagger(ABC): newflagger._flags = flags.astype(self.dtype) return newflagger - def setFlagger(self, other: BaseFlaggerT): + def setFlagger(self, other: BaseFlaggerT, join: str = "outer"): """ Merge the given flagger 'other' into self """ @@ -75,36 +74,23 @@ class BaseFlagger(ABC): if not isinstance(other, self.__class__): raise TypeError(f"flagger of type '{self.__class__}' needed") - this = self.flags - other = other.flags - - # use dios.merge() as soon as it implemented - # see https://git.ufz.de/rdm/dios/issues/15 - new = this.copy() - cols = this.columns.intersection(other.columns) - for c in cols: - l, r = this[c], other[c] - l = l.align(r, join='outer')[0] - l.loc[r.index] = r - new[c] = l - - newcols = other.columns.difference(new.columns) - for c in newcols: - new[c] = other[c].copy() - - newflagger = self.copy() - newflagger._flags = new + newflagger = self.copy( + flags=mergeDios(self.flags, other.flags, join=join) + ) return newflagger - def getFlagger(self, field: str = None, loc: LocT = None) -> BaseFlaggerT: + def getFlagger(self, field: FieldsT = None, loc: LocT = None, drop: FieldsT = None) -> BaseFlaggerT: """ Return a potentially trimmed down copy of self. """ + if drop is not None: + if field is not None: + raise TypeError("either 'field' or 'drop' can be given, but not both") + field = self._flags.columns.drop(drop, errors="ignore") flags = self.getFlags(field=field, loc=loc) flags = dios.to_dios(flags) - newflagger = self.copy() - newflagger._flags = flags + newflagger = self.copy(flags=flags) return newflagger - def getFlags(self, field: str = None, loc: LocT = None) -> PandasT: + def getFlags(self, field: FieldsT = None, loc: LocT = None) -> PandasT: """ Return a potentially, to `loc`, trimmed down version of flags. Return @@ -172,8 +158,11 @@ class BaseFlagger(ABC): flagged = flags.notna() & cp(flags, flag) return flagged - def copy(self) -> BaseFlaggerT: - return deepcopy(self) + def copy(self, flags=None) -> BaseFlaggerT: + out = deepcopy(self) + if flags is not None: + out._flags = flags + return out def _check_field(self, field): """ Check if (all) field(s) in self._flags. """ @@ -202,22 +191,18 @@ class BaseFlagger(ABC): @abstractmethod def UNFLAGGED(self) -> FlagT: """ Return the flag that indicates unflagged data """ - pass @property @abstractmethod def GOOD(self) -> FlagT: """ Return the flag that indicates the very best data """ - pass @property @abstractmethod def BAD(self) -> FlagT: """ Return the flag that indicates the worst data """ - pass @abstractmethod def isSUSPICIOUS(self, flag: FlagT) -> bool: """ Return bool that indicates if the given flag is valid, but neither UNFLAGGED, BAD, nor GOOD.""" - pass diff --git a/saqc/flagger/dmpflagger.py b/saqc/flagger/dmpflagger.py index def078183359d4eefe6db9b6563973352da901d0..90b356337eaaa67bf7ddb226573fdb93fc845e84 100644 --- a/saqc/flagger/dmpflagger.py +++ b/saqc/flagger/dmpflagger.py @@ -1,13 +1,18 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- + import subprocess import json from copy import deepcopy +from typing import TypeVar import dios.dios as dios from saqc.flagger.categoricalflagger import CategoricalFlagger -from saqc.lib.tools import assertScalar +from saqc.lib.tools import assertScalar, mergeDios + + +DmpFlaggerT = TypeVar("DmpFlaggerT") class Keywords: @@ -60,16 +65,23 @@ class DmpFlagger(CategoricalFlagger): newflagger = super().initFlags(data=data, flags=flags) newflagger._causes = newflagger.flags.astype(str) newflagger._comments = newflagger.flags.astype(str) - newflagger.causes[:], newflagger.comments[:] = "", "" + newflagger._causes[:], newflagger._comments[:] = "", "" return newflagger - def getFlagger(self, field=None, loc=None): - newflagger = super().getFlagger(field=field, loc=loc) + def getFlagger(self, field=None, loc=None, drop=None): + newflagger = super().getFlagger(field=field, loc=loc, drop=drop) flags = newflagger.flags newflagger._causes = self._causes.aloc[flags, ...] newflagger._comments = self._comments.aloc[flags, ...] return newflagger + def setFlagger(self, other: DmpFlaggerT, join: str="outer"): + assert isinstance(other, DmpFlagger) + out = super().setFlagger(other, join) + out._causes = mergeDios(out._causes, other._causes, join=join) + out._comments = mergeDios(out._comments, other._comments, join=join) + return out + def setFlags(self, field, loc=None, flag=None, force=False, comment="", cause="", **kwargs): assert "iloc" not in kwargs, "deprecated keyword, iloc" assertScalar("field", field, optional=False) diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index 4137b6ea560442412d017cddcc863dd57cce2bf8..9825b39342389deabf2cd7e872b0a428c878050c 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -14,7 +14,7 @@ import dios import inspect # from saqc.flagger import BaseFlagger -from saqc.lib.types import T, PandasLike +from saqc.lib.types import T SAQC_OPERATORS = { "exp": np.exp, @@ -395,7 +395,6 @@ def groupConsecutives(series: pd.Series) -> Iterator[pd.Series]: """ index = series.index values = series.values - target = values[0] start = 0 while True: @@ -404,3 +403,22 @@ def groupConsecutives(series: pd.Series) -> Iterator[pd.Series]: break yield pd.Series(data=values[start:stop], index=index[start:stop]) start = stop + + +def mergeDios(left, right, join="outer"): + # use dios.merge() as soon as it implemented + # see https://git.ufz.de/rdm/dios/issues/15 + merged = left.copy() + cols = left.columns.intersection(right.columns) + for c in cols: + l, r = left[c], right[c] + l = l.align(r, join=join)[0] + l.loc[r.index] = r + merged[c] = l + + newcols = right.columns.difference(merged.columns) + for c in newcols: + merged[c] = right[c].copy() + + return merged + diff --git a/test/flagger/test_dmpflagger.py b/test/flagger/test_dmpflagger.py new file mode 100644 index 0000000000000000000000000000000000000000..eacd2d579372d863289bd382af0c5a8e9549222e --- /dev/null +++ b/test/flagger/test_dmpflagger.py @@ -0,0 +1,97 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + +import json + +import numpy as np +import pandas as pd +import pytest + +from test.common import initData +from saqc.flagger import DmpFlagger + +@pytest.fixture +def data(): + return initData(cols=1) + + +def parseComments(data): + return np.array([json.loads(v)["comment"] for v in data.to_df().values.flatten()]) + + +def test_initFlags(data): + flagger = DmpFlagger().initFlags(data=data) + assert (flagger._flags == flagger.UNFLAGGED).all(axis=None) + assert (flagger._causes == "").all(axis=None) + assert (flagger._comments == "").all(axis=None) + + +def test_setFlaggerOuter(data): + + flagger = DmpFlagger() + + field = data.columns[0] + + df = data[field].iloc[::2].to_frame() + data_right = pd.DataFrame(data=df.values, columns=[field], index=df.index + pd.Timedelta("1Min")) + data_left = data[field].to_frame() + + left = (flagger + .initFlags(data=data_left) + .setFlags(field=field, flag=flagger.BAD, comment="left", cause="left")) + + right = (flagger + .initFlags(data=data_right) + .setFlags(field, flag=flagger.GOOD, comment="right", cause="right")) + + merged = left.setFlagger(right, join="outer") + + assert (merged._flags.loc[data_right.index] == flagger.GOOD).all(axis=None) + assert (merged._causes.loc[data_right.index] == "right").all(axis=None) + assert np.all(parseComments(merged._comments.loc[data_right.index]) == "right") + + assert (merged._flags.loc[data_left.index] == flagger.BAD).all(axis=None) + assert (merged._causes.loc[data_left.index] == "left").all(axis=None) + assert np.all(parseComments(merged._comments.loc[data_left.index]) == "left") + + +def test_setFlaggerInner(data): + + flagger = DmpFlagger() + + field = data.columns[0] + + data_right = data[field].iloc[::2].to_frame() + data_left = data[field].to_frame() + + left = (flagger + .initFlags(data=data_left) + .setFlags(field=field, flag=flagger.BAD, comment="left", cause="left")) + + right = (flagger + .initFlags(data=data_right) + .setFlags(field, flag=flagger.GOOD, comment="right", cause="right")) + + merged = left.setFlagger(right, join="inner").getFlags().to_df() + assert (merged.index == data_right.index).all(axis=None) + assert (merged == flagger.GOOD).all(axis=None) + + +def test_getFlaggerDrop(data): + flagger = DmpFlagger().initFlags(data) + with pytest.raises(TypeError): + flagger.getFlags(field=data.columns, drop="var") + + field = data.columns[0] + expected = data[data.columns.drop(field)].to_df() + + filtered = flagger.getFlagger(drop=field) + + assert (filtered._flags.columns == expected.columns).all(axis=None) + assert (filtered._comments.columns == expected.columns).all(axis=None) + assert (filtered._causes.columns == expected.columns).all(axis=None) + + assert (filtered._flags.to_df().index== expected.index).all(axis=None) + assert (filtered._comments.to_df().index== expected.index).all(axis=None) + assert (filtered._causes.to_df().index== expected.index).all(axis=None) + diff --git a/test/flagger/test_flagger.py b/test/flagger/test_flagger.py index f91a17f5c6eb70447b11ba36e2abcc5884b2db4d..1112e42c3885bde520456fcd07f649e9ef4811c1 100644 --- a/test/flagger/test_flagger.py +++ b/test/flagger/test_flagger.py @@ -161,6 +161,21 @@ def test_getFlagger(data, flagger): assert check_all_dios_index_length(newflags, data[sl]) +@pytest.mark.parametrize("data", DATASETS) +@pytest.mark.parametrize("flagger", TESTFLAGGER) +def test_getFlaggerDrop(data, flagger): + flagger = flagger.initFlags(data) + with pytest.raises(TypeError): + flagger.getFlags(field=data.columns, drop="var") + + field = data.columns[0] + expected = data.columns.drop(field) + + filtered = flagger.getFlagger(drop=field) + assert (filtered.getFlags().columns == expected).all(axis=None) + assert (filtered.getFlags().to_df().index== data[expected].to_df().index).all(axis=None) + + @pytest.mark.parametrize("data", DATASETS) @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_setFlagger(data, flagger):