Skip to content
Snippets Groups Projects
Commit 961bd4b5 authored by David Schäfer's avatar David Schäfer
Browse files

Merge branch 'concatFlags-auto' into 'develop'

Option to automatically infer inversion method in concatFlags

See merge request !659
parents 51b55cda 4c521bbf
No related branches found
No related tags found
3 merge requests!685Release 2.4,!684Release 2.4,!659Option to automatically infer inversion method in concatFlags
Pipeline #164373 passed with stages
in 6 minutes and 51 seconds
......@@ -17,9 +17,9 @@ from typing_extensions import Literal
from saqc.constants import UNFLAGGED
from saqc.core import register
from saqc.funcs.interpolation import _SUPPORTED_METHODS
from saqc.core.history import History
from saqc.lib.tools import filterKwargs, getFreqDelta, isflagged
from saqc.lib.ts_operators import aggregate2Freq, shift2Freq
from saqc.lib.ts_operators import aggregate2Freq
if TYPE_CHECKING:
from saqc import SaQC
......@@ -233,7 +233,7 @@ class ResamplingMixin:
def concatFlags(
self: "SaQC",
field: str,
target: str,
target: str | None = None,
method: Literal[
"inverse_fagg",
"inverse_bagg",
......@@ -243,6 +243,7 @@ class ResamplingMixin:
"inverse_nshift",
"inverse_interpolation",
"match",
"auto",
] = "match",
freq: str | None = None,
drop: bool = False,
......@@ -251,8 +252,8 @@ class ResamplingMixin:
**kwargs,
) -> "SaQC":
"""
Append the flags/history of ``field`` to ``target``. If necessary the flags are
projected to the ``target`` frequency grid.
Project flags/history of ``field`` to ``target`` and adjust to the frequeny grid
of ``target`` by 'undoing' former interpolation, shifting or resampling operations
Note
----
......@@ -262,15 +263,16 @@ class ResamplingMixin:
Parameters
----------
field : str
field:
Fieldname of flags history to append.
target : str
target:
Field name of flags history to append to.
method : {'inverse_fagg', 'inverse_bagg', 'inverse_nagg', 'inverse_fshift', 'inverse_bshift', 'inverse_nshift', 'match'}, default 'match'
method:
Method to project the flags of ``field`` the flags to ``target``:
* 'auto': inverse the last alignment/resampling operations
* 'inverse_nagg': project a flag of ``field`` to all timestamps of ``target`` within the range +/- ``freq``/2.
* 'inverse_bagg': project a flag of ``field`` to all preceeding timestamps of ``target`` within the range ``freq``
* 'inverse_fagg': project a flag of ``field`` to all succeeding timestamps of ``target`` within the range ``freq``
......@@ -296,6 +298,10 @@ class ResamplingMixin:
-------
saqc.SaQC
"""
if target is None:
target = field
flagscol = self._flags[field]
target_datcol = self._data[target]
target_flagscol = self._flags[target]
......@@ -313,18 +319,36 @@ class ResamplingMixin:
"pass custom projection range to freq parameter."
)
if method[-13:] == "interpolation":
if method == "auto":
stack = []
for meta in self._flags.history[field].meta:
func = meta["func"]
meth = meta["kwargs"].get("method")
if func in ("align", "resample"):
if meth[1:] in ("agg", "shift"):
stack.append(f"inverse_{meth}")
else:
stack.append("inverse_interpolation")
elif func == "concatFlags":
stack.pop()
if not stack:
raise ValueError(
"unable to derive an inversion method, please specify an appropiate 'method'"
)
method = stack[-1]
if method.endswith("interpolation"):
ignore = _getChunkBounds(target_datcol, flagscol, freq)
func = _inverseInterpolation
func_kws = dict(freq=freq, chunk_bounds=ignore, target=dummy)
elif method[-3:] == "agg":
elif method.endswith("agg"):
projection_method = METHOD2ARGS[method][0]
tolerance = METHOD2ARGS[method][1](freq)
func = _inverseAggregation
func_kws = dict(freq=tolerance, method=projection_method, target=dummy)
elif method[-5:] == "shift":
elif method.endswith("shift"):
drop_mask = target_datcol.isna() | isflagged(
target_flagscol, kwargs["dfilter"]
)
......@@ -347,30 +371,34 @@ class ResamplingMixin:
raise ValueError(f"unknown method {method}")
history = self._flags.history[field].apply(dummy.index, func, func_kws)
if overwrite is False:
mask = isflagged(self._flags[target], thresh=kwargs["dfilter"])
history._hist[mask] = np.nan
# append a dummy column
meta = {
"func": f"concatFlags",
"args": (),
"kwargs": {
"field": field,
"target": target,
"method": method,
"freq": freq,
"drop": drop,
"squeeze": squeeze,
"overwrite": overwrite,
**kwargs,
},
}
if squeeze:
history = history.squeeze(raw=True)
meta = {
"func": f"concatFlags",
"args": (field,),
"kwargs": {
"target": target,
"method": method,
"freq": freq,
"drop": drop,
"squeeze": squeeze,
"overwrite": overwrite,
**kwargs,
},
}
self._flags.history[target].append(history, meta)
flags = history.squeeze(raw=True)
history = History(index=history.index)
else:
self._flags.history[target].append(history)
flags = pd.Series(np.nan, index=history.index, dtype=float)
history.append(flags, meta)
self._flags.history[target].append(history)
if drop:
return self.dropField(field=field)
......
......@@ -375,7 +375,7 @@ def detectDeviants(
return [i for i, x in enumerate(cluster) if x != norm_cluster]
def getFreqDelta(index):
def getFreqDelta(index: pd.Index) -> None | pd.Timedelta:
"""
Function checks if the passed index is regularly sampled.
......
......@@ -10,7 +10,7 @@ import numpy as np
import pandas as pd
import pytest
from saqc import BAD, UNFLAGGED, SaQC
from saqc import SaQC
from saqc.core import DictOfSeries, initFlagsLike
from tests.common import checkInvariants
......@@ -298,5 +298,97 @@ def test_concatFlags(data, overwrite, expected_col0, expected_col1):
"data_", target="data", overwrite=overwrite, squeeze=True
)
hist_concat = qc_concat._flags.history["data"].hist.astype(float)
meta_concat = qc_concat._flags.history["data"].meta
assert hist_concat[0].equals(pd.Series(expected_col0, index=data["data"].index))
assert hist_concat[1].equals(pd.Series(expected_col1, index=data["data"].index))
assert meta_concat[-1]["func"] == "concatFlags"
@pytest.mark.parametrize(
"method, inversion_method, freq",
[
("linear", "inverse_interpolation", "15min"),
("bshift", "inverse_bshift", "15Min"),
("fshift", "inverse_fshift", "15Min"),
("nshift", "inverse_nshift", "15min"),
("pad", "inverse_interpolation", "15min"),
],
)
def test_alignAutoInvert(data, method, inversion_method, freq):
flags = initFlagsLike(data)
field = data.columns[0]
field_aligned = f"{field}_aligned"
qc = SaQC(data, flags)
qc = qc.align(field=field, target=field_aligned, method=method, freq=freq)
qc = qc.flagDummy(field=field_aligned)
qc_expected = qc.concatFlags(
field=field_aligned, target=field, method=inversion_method
)
qc_got = qc.concatFlags(field=field_aligned, target=field, method="auto")
_assertEqual(qc_expected, qc_got)
def test_alignMultiAutoInvert(data):
flags = initFlagsLike(data)
field = data.columns[0]
field_aligned = f"{field}_aligned"
qc = SaQC(data, flags)
qc = qc.align(field=field, target=field_aligned, method="fshift", freq="30Min")
qc = qc.align(field=field_aligned, method="time", freq="10Min")
qc = qc.flagDummy(field=field_aligned)
# resolve the last alignment operation
_assertEqual(
qc.concatFlags(field=field_aligned, target=field, method="auto"),
qc.concatFlags(
field=field_aligned, target=field, method="inverse_interpolation"
),
)
# resolve the first alignment operation
_assertEqual(
(
qc.concatFlags(field=field_aligned, method="auto").concatFlags(
field=field_aligned, target=field, method="auto"
)
),
(
qc.concatFlags(
field=field_aligned, method="inverse_interpolation"
).concatFlags(field=field_aligned, target=field, method="inverse_fshift")
),
)
def _assertEqual(left: SaQC, right: SaQC):
for field in left.data.columns:
assert left._data[field].equals(right._data[field])
assert left._flags[field].equals(right._flags[field])
assert left._flags.history[field].hist.equals(right._flags.history[field].hist)
assert left._flags.history[field].meta == right._flags.history[field].meta
@pytest.mark.parametrize(
"method, inversion_method, freq",
[
("bagg", "inverse_bagg", "15Min"),
("fagg", "inverse_fagg", "15Min"),
("nagg", "inverse_nagg", "15min"),
],
)
def test_resampleAutoInvert(data, method, inversion_method, freq):
flags = initFlagsLike(data)
field = data.columns[0]
field_aligned = f"{field}_aligned"
qc = SaQC(data, flags)
qc = qc.resample(field=field, target=field_aligned, method=method, freq=freq)
qc = qc.flagRange(field=field_aligned, min=0, max=100)
qc_expected = qc.concatFlags(
field=field_aligned, target=field, method=inversion_method
)
qc_got = qc.concatFlags(field=field_aligned, target=field, method="auto")
_assertEqual(qc_got, qc_expected)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment