Skip to content
Snippets Groups Projects
Commit 8b7947fb authored by David Schäfer's avatar David Schäfer
Browse files

making the harmonization indepedent from the CategoricalFlagger

parent ea43c562
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ import logging
from saqc.funcs.functions import flagMissing
from saqc.funcs.register import register
from saqc.lib.tools import toSequence
# todo: frequencie estimation function
......@@ -46,10 +47,11 @@ def harmWrapper(harm=True, heap={}):
inter_order=1,
inter_downcast=False,
reshape_agg=max,
reshape_missing_flag_index=-1,
reshape_missing_flag=None,
reshape_shift_comment=False,
outsort_drop_susp=True,
outsort_drop_list=None,
drop_flags=None,
# outsort_drop_susp=True,
# outsort_drop_list=None,
data_missing_value=np.nan,
**kwargs
):
......@@ -72,8 +74,9 @@ def harmWrapper(harm=True, heap={}):
"original_flagger": flagger_merged,
"freq": freq,
"reshape_method": reshape_method,
"drop_susp": outsort_drop_susp,
"drop_list": outsort_drop_list,
"drop_flags": drop_flags,
# "drop_susp": outsort_drop_susp,
# "drop_list": outsort_drop_list,
}
# furthermore we need to memorize the initial timestamp to ensure output format will equal input format.
......@@ -82,12 +85,7 @@ def harmWrapper(harm=True, heap={}):
# now we can manipulate it without loosing information gathered before harmonization
dat_col, flagger_merged_clean = _outsortCrap(
dat_col,
field,
flagger_merged,
drop_suspicious=outsort_drop_susp,
drop_bad=True,
drop_list=outsort_drop_list,
dat_col, field, flagger_merged, drop_flags=drop_flags,
)
# interpolation! (yeah)
......@@ -108,7 +106,7 @@ def harmWrapper(harm=True, heap={}):
ref_index=dat_col.index,
method=reshape_method,
agg_method=reshape_agg,
missing_flag=reshape_missing_flag_index,
missing_flag=reshape_missing_flag,
set_shift_comment=reshape_shift_comment,
**kwargs
)
......@@ -137,8 +135,9 @@ def harmWrapper(harm=True, heap={}):
# get some deharm configuration infos from the heap:
freq = heap[field]["freq"]
redrop_susp = heap[field]["drop_susp"]
redrop_list = heap[field]["drop_list"]
redrop_flags = heap[field]["drop_flags"]
# redrop_susp = heap[field]["drop_susp"]
# redrop_list = heap[field]["drop_list"]
resolve_method = harm_2_deharm[heap[field]["reshape_method"]]
# retrieve data and flags from the merged saqc-conform data frame (and by that get rid of blow-up entries).
......@@ -150,9 +149,9 @@ def harmWrapper(harm=True, heap={}):
dat_col,
field,
heap[field]["original_flagger"],
drop_suspicious=redrop_susp,
drop_bad=True,
drop_list=redrop_list,
# drop_suspicious=redrop_susp,
# drop_bad=True,
drop_flags=redrop_flags,
return_drops=True,
)
......@@ -219,11 +218,12 @@ def _outsortCrap(
data,
field,
flagger,
drop_suspicious=True,
drop_bad=True,
drop_list=None,
# drop_suspicious=True,
# drop_bad=True,
# drop_list=None,
drop_flags=None,
return_drops=False,
**kwargs
# **kwargs
):
"""Harmonization gets the more easy, the more data points we can exclude from crowded sampling intervals.
......@@ -231,7 +231,6 @@ def _outsortCrap(
the data and the flags passed. In deharmonization the function is used to reconstruct original flags field shape.
:param data: pd.Series. ['data'].
:param flags: pd.PandasLike. The flags associated with the data passed.
:param flagger: saqc.flagger.
:param drop_suspicious: Boolean. Default = True. If True, only values that are flagged GOOD or UNFLAGGED get
processed.
......@@ -247,35 +246,15 @@ def _outsortCrap(
drop_mask = pd.Series(data=False, index=data.index)
if drop_bad is True:
drop_flags = toSequence(drop_flags, default=flagger.BAD)
for drop_flag in drop_flags:
drop_mask = drop_mask | flagger.isFlagged(
field, flag=flagger.BAD, comparator="=="
)
if drop_suspicious is True:
drop_mask = drop_mask | ~(
flagger.isFlagged(field, flag=flagger.GOOD, comparator="<=")
field, flag=drop_flag, comparator="=="
)
if drop_list is not None:
for to_drop in drop_list:
if to_drop in flagger.dtype.categories:
drop_mask = drop_mask | flagger.isFlagged(
field, flag=to_drop, comparator="=="
)
else:
logging.warning(
'Cant drop "{}" - flagged data. Its not a flag value, the passed flagger happens to '
"know about.".format(str(to_drop))
)
flagger_out = flagger.getFlagger(loc=~drop_mask)
# data_out = flagger.getFlags(loc=drop_mask) if return_drops else data[~drop]
if return_drops:
# return flag drops at first argument
return flagger.getFlags(loc=drop_mask), flagger_out
# return flags[drop_mask], flags[~drop_mask]
# return data at first argument
return data[~drop_mask], flagger_out
......@@ -537,7 +516,7 @@ def _reshapeFlags(
ref_index,
method="fshift",
agg_method=max,
missing_flag=-1,
missing_flag=None,
set_shift_comment=True,
**kwargs
):
......@@ -581,6 +560,7 @@ def _reshapeFlags(
:return: flags: pd.Series/pd.DataFrame. The reshaped pandas like Flags object, referring to the harmonized data.
"""
missing_flag = missing_flag or flagger.UNFLAGGED
aggregations = ["nearest_agg", "bagg", "fagg"]
shifts = ["fshift", "bshift", "nearest_shift"]
......@@ -608,11 +588,7 @@ def _reshapeFlags(
flags_series = flags.squeeze()
flagger_new = flagger.initFlags(flags=flags).setFlags(
field,
loc=flags_series.isna(),
flag=flagger.dtype.categories[missing_flag],
force=True,
**kwargs
field, loc=flags_series.isna(), flag=missing_flag, force=True, **kwargs
)
if set_shift_comment:
......@@ -654,11 +630,7 @@ def _reshapeFlags(
.squeeze()
.resample(freq_string, closed=closed, label=label, base=base)
# NOTE: breaks for non categorical flaggers
.apply(
lambda x: agg_method(x)
if not x.empty
else flagger.dtype.categories[missing_flag]
)
.apply(lambda x: agg_method(x) if not x.empty else missing_flag)
.astype(flagger.dtype)
.to_frame(name=field)
)
......@@ -808,22 +780,20 @@ def _toMerged(
# trivial case: there is only one variable:
if flags.empty:
data = data_to_insert.reindex(target_index).to_frame(name=fieldname)
flags = flags_to_insert.reindex(
target_index, fill_value=flagger.dtype.categories[0]
)
flags = flags_to_insert.reindex(target_index, fill_value=flagger.UNFLAGGED)
return data, flagger.initFlags(flags=flags)
# annoying case: more than one variables:
# erase nan rows resulting from harmonization but keep/regain those, that were initially present in the data:
new_index = data[mask].index.join(target_index, how="outer")
data = data.reindex(new_index)
flags = flags.reindex(new_index, fill_value=flagger.dtype.categories[0])
flags = flags.reindex(new_index, fill_value=flagger.UNFLAGGED)
data = pd.merge(
data, data_to_insert, how="outer", left_index=True, right_index=True
)
flags = pd.merge(
flags, flags_to_insert, how="outer", left_index=True, right_index=True
)
flags.fillna(flagger.dtype.categories[0], inplace=True)
flags.fillna(flagger.UNFLAGGED, inplace=True)
# internally harmonization memorizes its own manipulation by inserting nan flags -
# those we will now assign the flagger.bad flag by the "missingTest":
......
......@@ -18,23 +18,16 @@ from saqc.funcs.harm_functions import (
)
TESTFLAGGER = TESTFLAGGER[:-1]
RESHAPERS = ["nearest_shift", "fshift", "bshift"]
COFLAGGING = [False, True]
SETSHIFTCOMMENT = [False, True]
INTERPOLATIONS = ["fshift", "bshift", "nearest_shift", "nearest_agg", "bagg"]
INTERPOLATIONS2 = ["fagg", "time", "polynomial"]
FREQS = ["15min", "30min"]
......@@ -317,31 +310,15 @@ def test_outsortCrap(data, flagger):
flagger = flagger.setFlags(field, iloc=slice(5, 7))
drop_index = data.index[5:7]
d, _ = _outsortCrap(data, field, flagger, drop_suspicious=True, drop_bad=False)
assert drop_index.difference(d.index).equals(drop_index)
d, _ = _outsortCrap(data, field, flagger, drop_suspicious=False, drop_bad=True)
d, _ = _outsortCrap(data, field, flagger, drop_flags=flagger.BAD)
assert drop_index.difference(d.index).equals(drop_index)
flagger = flagger.setFlags(field, iloc=slice(0, 1), flag=flagger.GOOD)
drop_index = drop_index.insert(-1, data.index[0])
d, _ = _outsortCrap(
data,
field,
flagger,
drop_suspicious=False,
drop_bad=False,
drop_list=[flagger.BAD, flagger.GOOD],
)
d, _ = _outsortCrap(data, field, flagger, drop_flags=[flagger.BAD, flagger.GOOD],)
assert drop_index.sort_values().difference(d.index).equals(drop_index.sort_values())
f_drop, _ = _outsortCrap(
data,
field,
flagger,
drop_suspicious=False,
drop_bad=False,
drop_list=[flagger.BAD, flagger.GOOD],
return_drops=True,
data, field, flagger, drop_flags=[flagger.BAD, flagger.GOOD], return_drops=True,
)
assert f_drop.index.sort_values().equals(drop_index.sort_values())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment