Skip to content
Snippets Groups Projects
Commit 08ddc91b authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

major dios implied simplifications of harmonization module carried out

parent 368663be
No related branches found
No related tags found
3 merge requests!193Release 1.4,!188Release 1.4,!24Dios integration
Pipeline #3298 passed with stage
in 7 minutes and 6 seconds
......@@ -60,14 +60,6 @@ def harmWrapper(heap={}):
inter_agg = getFuncFromInput(inter_agg)
reshape_agg = getFuncFromInput(reshape_agg)
# for some tingle tangle reasons, resolving the harmonization will not be sound, if not all missing/np.nan
# values get flagged initially:
data, flagger = flagMissing(data, field, flagger, nodata=data_missing_value, **kwargs)
# and dropped for harmonization:
if drop_flags is not None:
if flagger.BAD not in drop_flags:
drop_flags.append(flagger.BAD)
# get data of variable
flagger_merged = flagger.getFlagger(field=field)
dat_col = data[field]
......@@ -82,7 +74,7 @@ def harmWrapper(heap={}):
}
# now we can manipulate it without loosing information gathered before harmonization
dat_col, flagger_merged_clean = _outsortCrap(dat_col, field, flagger_merged, drop_flags=drop_flags,)
dat_col, flagger_merged_clean, _ = _outsortCrap(dat_col, field, flagger_merged, drop_flags=drop_flags)
# interpolation! (yeah)
dat_col, chunk_bounds = _interpolateGrid(
......@@ -134,31 +126,27 @@ def harmWrapper(heap={}):
resolve_method = HARM_2_DEHARM[harm_info[Heap.METHOD]]
# retrieve data and flags from the merged saqc-conform data frame (and by that get rid of blow-up entries).
flagger_merged = flagger.getFlagger(field=field)
flagger_harmony = flagger.getFlagger(field=field)
dat_col = data[field]
# reconstruct the drops that were performed before harmonization
drops, flagger_original_clean = _outsortCrap(
dat_col, field, harm_info[Heap.FLAGGER], drop_flags=harm_info[Heap.DROP], return_drops=True,
_, flagger_original_clean, drop_mask = _outsortCrap(
dat_col, field, harm_info[Heap.FLAGGER], drop_flags=harm_info[Heap.DROP]
)
drops = flagger.getFlags(field=field, loc=drop_mask)
# with reconstructed pre-harmonization flags-frame -> perform the projection of the flags calculated for
# the harmonized timeseries, onto the original timestamps
flagger_back = _backtrackFlags(
flagger_merged,
flagger_harmony,
flagger_original_clean,
harm_info[Heap.FLAGGER],
harm_info[Heap.FREQ],
track_method=resolve_method,
co_flagging=co_flagging,
)
flags_back = flagger_back.getFlags(field)
# now: re-insert the pre-harmonization-drops
flags_col = flags_back.reindex(flags_back.index.join(drops.index, how="outer"))
flags_col.loc[drops.index] = drops
# but to stick with the policy of always having flags as dios.DictOfSeriess we blow up the flags col again:
# flagger_back_full = flagger.initFlags(flags=flags_col)
flags_col = flagger_back.getFlags(field)
dat_col = harm_info[Heap.DATA].reindex(flags_col.index, fill_value=np.nan)
dat_col.name = field
......@@ -188,11 +176,6 @@ def _outsortCrap(
Depending on passed key word options the function will remove nan entries and as-suspicious-flagged values from
the data and the flags passed. In deharmonization the function is used to reconstruct original flags field shape.
<<<<<<< HEAD
FIXME: parameter
=======
FIXME: documentation deprecated
>>>>>>> 742f14d5386df2036f646b20bb39c4cbfeb0dd1f
:param data: pd.Series. ['data'].
:param flagger: saqc.flagger.
......@@ -216,9 +199,7 @@ def _outsortCrap(
drop_mask = drop_mask | flagger.isFlagged(field, flag=drop_flag, comparator="==")
flagger_out = flagger.getFlagger(loc=~drop_mask)
if return_drops:
return flagger.getFlags(field=field, loc=drop_mask), flagger_out
return data[~drop_mask], flagger_out
return data[~drop_mask], flagger_out, drop_mask
def _makeGrid(t0, t1, freq, name=None):
......@@ -631,22 +612,24 @@ def _reshapeFlags(
return flagger_new
def _backtrackFlags(flagger_post, flagger_pre, freq, track_method="invert_fshift", co_flagging=False):
def _backtrackFlags(flagger_harmony, flagger_original_clean, flagger_original, freq, track_method="invert_fshift", co_flagging=False):
# in the case of "real" up/downsampling - evaluating the harm flags against the original flags makes no sence!
if track_method in ["regain"]:
return flagger_pre
return flagger_original_clean
flags_post = flagger_post.getFlags()
flags_pre = flagger_pre.getFlags()
flags_harmony = flagger_harmony.getFlags()
flags_original_clean = flagger_original_clean.getFlags()
flags_original = flagger_original.getFlags()
flags_header = flags_post.columns
flags_header = flags_harmony.columns
assert len(flags_header) == 1
flags_pre = flags_pre.squeeze()
flags_post = flags_post.squeeze()
assert isinstance(flags_post, pd.Series)
assert isinstance(flags_pre, pd.Series)
flags_original_clean = flags_original_clean.squeeze()
flags_original = flags_original.squeeze()
flags_harmony = flags_harmony.squeeze()
assert isinstance(flags_harmony, pd.Series)
assert isinstance(flags_original_clean, pd.Series)
if track_method in ["invert_fshift", "invert_bshift", "invert_nearest"] and co_flagging is True:
if track_method == "invert_fshift":
......@@ -661,9 +644,9 @@ def _backtrackFlags(flagger_post, flagger_pre, freq, track_method="invert_fshift
method = "nearest"
tolerance = pd.Timedelta(freq) / 2
flags_post = flags_post.reindex(flags_pre.index, method=method, tolerance=tolerance)
replacement_mask = flags_post > flags_pre
flags_pre.loc[replacement_mask] = flags_post.loc[replacement_mask]
flags_harmony = flags_harmony.reindex(flags_original.index, method=method, tolerance=tolerance)
replacement_mask = flags_harmony > flags_original
flags_original.loc[replacement_mask] = flags_harmony.loc[replacement_mask]
if track_method in ["invert_fshift", "invert_bshift", "invert_nearest"] and co_flagging is False:
if track_method == "invert_fshift":
......@@ -677,25 +660,31 @@ def _backtrackFlags(flagger_post, flagger_pre, freq, track_method="invert_fshift
method = "nearest"
tolerance = pd.Timedelta(freq) / 2
flags_post = pd.merge_asof(
flags_post.to_frame(),
pd.DataFrame(flags_pre.index.values, index=flags_pre.index, columns=["pre_index"]),
flags_harmony = pd.merge_asof(
flags_harmony.to_frame(),
pd.DataFrame(flags_original_clean.index.values,
index=flags_original_clean.index,
columns=["pre_index"]),
left_index=True,
right_index=True,
tolerance=tolerance,
direction=method,
)
flags_post.dropna(subset=["pre_index"], inplace=True)
flags_post.set_index(["pre_index"], inplace=True)
flags_harmony.dropna(subset=["pre_index"], inplace=True)
flags_harmony.set_index(["pre_index"], inplace=True)
# get rid of Dataframe (not dios !) , that we needed for the merge_asof()-method
flags_post = flags_post.squeeze()
flags_harmony = flags_harmony.squeeze()
replacement_mask = flags_harmony > flags_original_clean.loc[flags_harmony.index]
flags_original_clean.loc[replacement_mask[replacement_mask].index] = flags_harmony.loc[replacement_mask]
replacement_mask = flags_post > flags_pre.loc[flags_post.index]
flags_pre.loc[replacement_mask[replacement_mask].index] = flags_post.loc[replacement_mask]
drops_index = flags_original.index.difference(flags_original_clean.index)
flags_original = flags_original_clean.reindex(flags_original_clean.index.join(drops_index, how='outer'))
flags_original.loc[drops_index] = flags_original[drops_index]
res = dios.DictOfSeries(flags_pre, columns=flags_header)
return flagger_pre.initFlags(flags=res)
res = dios.DictOfSeries(flags_original, columns=flags_header)
return flagger_original.initFlags(flags=res)
def _fromMerged(data, flagger, fieldname):
......
......@@ -6,7 +6,7 @@ import pytest
import numpy as np
import pandas as pd
import from dios import dios
from dios import dios
from test.common import TESTFLAGGER
from saqc.funcs.harm_functions import (
......@@ -107,17 +107,15 @@ def test_outsortCrap(data, flagger):
drop_index = s.index[5:7]
flagger = flagger.setFlags(field, loc=drop_index)
res, _ = _outsortCrap(s, field, flagger, drop_flags=flagger.BAD)
res, *_ = _outsortCrap(s, field, flagger, drop_flags=flagger.BAD)
assert drop_index.difference(res.index).equals(drop_index)
flagger = flagger.setFlags(field, loc=s.iloc[0:1], flag=flagger.GOOD)
drop_index = drop_index.insert(-1, s.index[0])
to_drop = [flagger.BAD, flagger.GOOD]
res, _ = _outsortCrap(s, field, flagger, drop_flags=to_drop)
res, *_ = _outsortCrap(s, field, flagger, drop_flags=to_drop)
assert drop_index.sort_values().difference(res.index).equals(drop_index.sort_values())
res, _ = _outsortCrap(s, field, flagger, drop_flags=to_drop, return_drops=True)
assert res.index.sort_values().equals(drop_index.sort_values())
@pytest.mark.parametrize("flagger", TESTFLAGGER)
......@@ -149,7 +147,7 @@ def test_harmSingleVarIntermediateFlagging(data, flagger, reshaper, co_flagging)
if co_flagging is False:
assert (
flagger.isFlagged().squeeze()
== [False, False, False, False, True, False, True, False, False]
== [False, False, False, False, True, False, False, False, False]
).all()
if reshaper == "bshift":
if co_flagging is True:
......@@ -159,18 +157,17 @@ def test_harmSingleVarIntermediateFlagging(data, flagger, reshaper, co_flagging)
if co_flagging is False:
assert (
flagger.isFlagged().squeeze()
== [False, False, False, False, False, True, True, False, False]
== [False, False, False, False, False, True, False, False, False]
).all()
if reshaper == "fshift":
if co_flagging is True:
assert flagger.isFlagged(loc=d.index[3:5]).squeeze().all()
assert flagger.isFlagged(loc=d.index[6:7]).squeeze().all()
assert (~flagger.isFlagged(loc=d.index[0:3]).squeeze()).all()
assert (~flagger.isFlagged(loc=d.index[7:]).squeeze()).all()
assert (~flagger.isFlagged(loc=d.index[5:]).squeeze()).all()
if co_flagging is False:
assert (
flagger.isFlagged().squeeze()
== [False, False, False, False, True, False, True, False, False]
== [False, False, False, False, True, False, False, False, False]
).all()
flags = flagger.getFlags()
......@@ -328,3 +325,9 @@ def test_wrapper(data, flagger):
flag_func="max", method='nagg', drop_flags=None)
shift2Grid(data, field, flagger, freq, method='nshift', drop_flags=None)
interpolate2Grid(data, field, flagger, freq, method="spline")
if __name__ == "__main__":
flagger=TESTFLAGGER[2]
dat = data()
field, *_ = dat.columns
test_harmSingleVarIntermediateFlagging(dat, flagger, 'fshift', True)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment