diff --git a/saqc/flagger/baseflagger.py b/saqc/flagger/baseflagger.py index d98b63ccb83780c0d02a7c829391b9c801729e4d..d4229502658a9d7e8207aed3967d7d7ce1d2d22d 100644 --- a/saqc/flagger/baseflagger.py +++ b/saqc/flagger/baseflagger.py @@ -66,7 +66,7 @@ class BaseFlagger(ABC): newflagger._flags = flags.astype(self.dtype) return newflagger - def setFlagger(self, other: BaseFlaggerT, join: str = "outer"): + def setFlagger(self, other: BaseFlaggerT, join: str = "merge"): """ Merge the given flagger 'other' into self """ diff --git a/saqc/flagger/dmpflagger.py b/saqc/flagger/dmpflagger.py index 90b356337eaaa67bf7ddb226573fdb93fc845e84..832a9684d1cdfedfc3fd8789aa00ca9b3412fd08 100644 --- a/saqc/flagger/dmpflagger.py +++ b/saqc/flagger/dmpflagger.py @@ -75,7 +75,7 @@ class DmpFlagger(CategoricalFlagger): newflagger._comments = self._comments.aloc[flags, ...] return newflagger - def setFlagger(self, other: DmpFlaggerT, join: str="outer"): + def setFlagger(self, other: DmpFlaggerT, join: str="merge"): assert isinstance(other, DmpFlagger) out = super().setFlagger(other, join) out._causes = mergeDios(out._causes, other._causes, join=join) diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index 70c32da8f297bd3beebcf1c4f493fc3894043914..472c2339e57cc89a10f6871171e12fe948f9febe 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -405,17 +405,25 @@ def groupConsecutives(series: pd.Series) -> Iterator[pd.Series]: yield pd.Series(data=values[start:stop], index=index[start:stop]) start = stop - -def mergeDios(left, right, join="outer"): +def mergeDios(left, right, join="merge"): # use dios.merge() as soon as it implemented # see https://git.ufz.de/rdm/dios/issues/15 + merged = left.copy() - cols = left.columns.intersection(right.columns) - for c in cols: + shared_cols = left.columns.intersection(right.columns) + for c in shared_cols: l, r = left[c], right[c] - l = l.align(r, join=join)[0] - l.loc[r.index] = r - merged[c] = l + if join == "merge": + # NOTE: + # our merge behavior is nothing more than an + # outer join, where the right join argument + # overwrites the left at the shared indices, + # while on a normal outer join common indices + # hold the values from the left join argument + r, l = l.align(r, join="outer") + else: + l, r= l.align(r, join=join) + merged[c] = l.combine_first(r) newcols = right.columns.difference(merged.columns) for c in newcols: diff --git a/test/core/test_core.py b/test/core/test_core.py index 0b263c7288f05a7d2b06353236b774480dc48028..a27abb5d9aabac6d9cd427e8f232b1227e89020f 100644 --- a/test/core/test_core.py +++ b/test/core/test_core.py @@ -6,7 +6,6 @@ import logging import pytest import numpy as np import pandas as pd -import dios.dios as dios from saqc.funcs import register, flagRange from saqc.core.core import run diff --git a/test/flagger/test_dmpflagger.py b/test/flagger/test_dmpflagger.py index eacd2d579372d863289bd382af0c5a8e9549222e..fb6c612867548b07885321a759d63011bd9231ab 100644 --- a/test/flagger/test_dmpflagger.py +++ b/test/flagger/test_dmpflagger.py @@ -32,28 +32,34 @@ def test_setFlaggerOuter(data): field = data.columns[0] - df = data[field].iloc[::2].to_frame() - data_right = pd.DataFrame(data=df.values, columns=[field], index=df.index + pd.Timedelta("1Min")) - data_left = data[field].to_frame() + data_left = data + + data_right = data.to_df() + dates = data_right.index.to_series() + dates[len(dates)//2:] += pd.Timedelta("1Min") + data_right.index = dates + data_right = data_right.to_dios() left = (flagger .initFlags(data=data_left) - .setFlags(field=field, flag=flagger.BAD, comment="left", cause="left")) + .setFlags(field=field, flag=flagger.BAD, cause="SaQCLeft", comment="testLeft")) + right = (flagger .initFlags(data=data_right) - .setFlags(field, flag=flagger.GOOD, comment="right", cause="right")) + .setFlags(field=field, flag=flagger.GOOD, cause="SaQCRight", comment="testRight")) merged = left.setFlagger(right, join="outer") - assert (merged._flags.loc[data_right.index] == flagger.GOOD).all(axis=None) - assert (merged._causes.loc[data_right.index] == "right").all(axis=None) - assert np.all(parseComments(merged._comments.loc[data_right.index]) == "right") - - assert (merged._flags.loc[data_left.index] == flagger.BAD).all(axis=None) - assert (merged._causes.loc[data_left.index] == "left").all(axis=None) - assert np.all(parseComments(merged._comments.loc[data_left.index]) == "left") + right_index = data_right[field].index.difference(data_left[field].index) + assert (merged._flags.loc[right_index] == flagger.GOOD).all(axis=None) + assert (merged._causes.loc[right_index] == "SaQCRight").all(axis=None) + assert np.all(parseComments(merged._comments.loc[right_index]) == "testRight") + left_index = data_left[field].index + assert (merged._flags.loc[left_index] == flagger.BAD).all(axis=None) + assert (merged._causes.loc[left_index] == "SaQCLeft").all(axis=None) + assert np.all(parseComments(merged._comments.loc[left_index]) == "testLeft") def test_setFlaggerInner(data): @@ -61,20 +67,27 @@ def test_setFlaggerInner(data): field = data.columns[0] - data_right = data[field].iloc[::2].to_frame() - data_left = data[field].to_frame() + data_left = data + data_right = data.iloc[::2] left = (flagger .initFlags(data=data_left) - .setFlags(field=field, flag=flagger.BAD, comment="left", cause="left")) + .setFlags(field=field, flag=flagger.BAD, cause="SaQCLeft", comment="testLeft")) + right = (flagger .initFlags(data=data_right) - .setFlags(field, flag=flagger.GOOD, comment="right", cause="right")) + .setFlags(field=field, flag=flagger.GOOD, cause="SaQCRight", comment="testRight")) + + merged = left.setFlagger(right, join="inner") + + assert (merged._flags[field].index == data_right[field].index).all() + assert (merged._causes[field].index == data_right[field].index).all() + assert (merged._comments[field].index == data_right[field].index).all() - merged = left.setFlagger(right, join="inner").getFlags().to_df() - assert (merged.index == data_right.index).all(axis=None) - assert (merged == flagger.GOOD).all(axis=None) + assert (merged._flags[field] == flagger.BAD).all() + assert (merged._causes[field] == "SaQCLeft").all(axis=None) + assert np.all(parseComments(merged._comments) == "testLeft") def test_getFlaggerDrop(data): diff --git a/test/flagger/test_flagger.py b/test/flagger/test_flagger.py index 1112e42c3885bde520456fcd07f649e9ef4811c1..8bbb92949d8dfb4db885df788e1445894683b829 100644 --- a/test/flagger/test_flagger.py +++ b/test/flagger/test_flagger.py @@ -310,6 +310,78 @@ def test_setFlaggerIndexDiff(data, flagger): assert (r[both] == o[both]).all() +@pytest.mark.parametrize("data", DATASETS) +@pytest.mark.parametrize("flagger", TESTFLAGGER) +def test_setFlaggerOuter(data, flagger): + + + field = data.columns[0] + + data_left = data + data_right = data.iloc[::2] + + left = (flagger + .initFlags(data=data_left) + .setFlags(field=field, flag=flagger.BAD)) + + right = (flagger + .initFlags(data=data_right) + .setFlags(field, flag=flagger.GOOD)) + + merged = left.setFlagger(right, join="outer") + + loc = data_right[field].index.difference(data_left[field].index) + assert (merged.getFlags(field, loc=loc) == flagger.GOOD).all(axis=None) + assert (merged.getFlags(field, loc=data_left[field].index) == flagger.BAD).all(axis=None) + + +@pytest.mark.parametrize("data", DATASETS) +@pytest.mark.parametrize("flagger", TESTFLAGGER) +def test_setFlaggerInner(data, flagger): + + + field = data.columns[0] + + data_left = data + data_right = data.iloc[::2] + + left = (flagger + .initFlags(data=data_left) + .setFlags(field=field, flag=flagger.BAD)) + + right = (flagger + .initFlags(data=data_right) + .setFlags(field, flag=flagger.GOOD)) + + merged = left.setFlagger(right, join="inner") + + assert (merged.getFlags(field).index == data_right[field].index).all() + assert (merged.getFlags(field) == flagger.BAD).all() + + +@pytest.mark.parametrize("data", DATASETS) +@pytest.mark.parametrize("flagger", TESTFLAGGER) +def test_setFlaggerMerge(data, flagger): + + field = data.columns[0] + data_left = data + data_right = data.iloc[::2] + + left = (flagger + .initFlags(data=data_left) + .setFlags(field=field, flag=flagger.BAD)) + + right = (flagger + .initFlags(data=data_right) + .setFlags(field, flag=flagger.GOOD)) + + merged = left.setFlagger(right, join="merge") + + loc = data_left[field].index.difference(data_right[field].index) + assert (merged.getFlags(field, loc=data_right[field].index) == flagger.GOOD).all(axis=None) + assert (merged.getFlags(field,loc=loc) == flagger.BAD).all(axis=None) + + @pytest.mark.parametrize("data", DATASETS) @pytest.mark.parametrize("flagger", TESTFLAGGER) def test_isFlaggedDios(data, flagger):