Skip to content
Snippets Groups Projects
Commit 02c1fff6 authored by David Schäfer's avatar David Schäfer
Browse files

cleanups

parent 8b7947fb
No related branches found
No related tags found
No related merge requests found
......@@ -23,19 +23,29 @@ from saqc.lib.tools import toSequence
# todo: accelerated func applies
def harmWrapper(harm=True, heap={}):
class Heap:
INDEX = "initial_ts"
DATA = "original_data"
FLAGGER = "original_flagger"
FREQ = "freq"
METHOD = "reshape_method"
DROP = "drop_flags"
HARM_2_DEHARM = {
"fshift": "invert_fshift",
"bshift": "invert_bshift",
"nearest_shift": "invert_nearest",
"fagg": "invert_fshift",
"bagg": "invert_bshift",
"nearest_agg": "invert_nearest",
}
def harmWrapper(heap={}):
# NOTE:
# (1) - harmonization will ALWAYS flag flagger.BAD all the np.nan values and afterwards DROP ALL
# flagger.BAD flagged values from flags frame for further flagging!!!!!!!!!!!!!!!!!!!!!
harm_2_deharm = {
"fshift": "invert_fshift",
"bshift": "invert_bshift",
"nearest_shift": "invert_nearest",
"fagg": "invert_fshift",
"bagg": "invert_bshift",
"nearest_agg": "invert_nearest",
}
def harmonize(
data,
field,
......@@ -50,8 +60,6 @@ def harmWrapper(harm=True, heap={}):
reshape_missing_flag=None,
reshape_shift_comment=False,
drop_flags=None,
# outsort_drop_susp=True,
# outsort_drop_list=None,
data_missing_value=np.nan,
**kwargs
):
......@@ -65,23 +73,19 @@ def harmWrapper(harm=True, heap={}):
# before sending the current flags and data frame to the future (for backtracking reasons), we clear it
# from merge-nans that just resulted from harmonization of other variables!
dat_col, flagger_merged = _fromMerged(data, flagger, field)
# dat_col, flags_col = _fromMerged(data, flags, flagger, field)
# now we send the flags frame in its current shape to the future:
# heap.update({field: {'original_data': flags_col.assign(data_values=dat_col)}})
heap[field] = {
"original_data": dat_col,
"original_flagger": flagger_merged,
"freq": freq,
"reshape_method": reshape_method,
"drop_flags": drop_flags,
# "drop_susp": outsort_drop_susp,
# "drop_list": outsort_drop_list,
Heap.DATA: dat_col,
Heap.FLAGGER: flagger_merged,
Heap.FREQ: freq,
Heap.METHOD: reshape_method,
Heap.DROP: drop_flags,
}
# furthermore we need to memorize the initial timestamp to ensure output format will equal input format.
if "initial_ts" not in heap.keys():
heap.update({"initial_ts": dat_col.index})
if Heap.INDEX not in heap.keys():
heap.update({Heap.INDEX: dat_col.index})
# now we can manipulate it without loosing information gathered before harmonization
dat_col, flagger_merged_clean = _outsortCrap(
......@@ -95,7 +99,7 @@ def harmWrapper(harm=True, heap={}):
method=inter_method,
order=inter_order,
agg_method=inter_agg,
total_range=(heap["initial_ts"][0], heap["initial_ts"][-1]),
total_range=(heap[Heap.INDEX][0], heap[Heap.INDEX][-1]),
downcast_interpolation=inter_downcast,
)
......@@ -134,24 +138,18 @@ def harmWrapper(harm=True, heap={}):
return data, flagger
# get some deharm configuration infos from the heap:
freq = heap[field]["freq"]
redrop_flags = heap[field]["drop_flags"]
# redrop_susp = heap[field]["drop_susp"]
# redrop_list = heap[field]["drop_list"]
resolve_method = harm_2_deharm[heap[field]["reshape_method"]]
harm_info = heap.pop(field)
resolve_method = HARM_2_DEHARM[harm_info[Heap.METHOD]]
# retrieve data and flags from the merged saqc-conform data frame (and by that get rid of blow-up entries).
dat_col, flagger_merged = _fromMerged(data, flagger, field)
# reconstruct the drops that were performed before harmonization
# drops, pre_flags = _outsortCrap(
drops, flagger_original_clean = _outsortCrap(
dat_col,
field,
heap[field]["original_flagger"],
# drop_suspicious=redrop_susp,
# drop_bad=True,
drop_flags=redrop_flags,
harm_info[Heap.FLAGGER],
drop_flags=harm_info[Heap.DROP],
return_drops=True,
)
......@@ -160,7 +158,7 @@ def harmWrapper(harm=True, heap={}):
flagger_back = _backtrackFlags(
flagger_merged,
flagger_original_clean,
freq,
harm_info[Heap.FREQ],
track_method=resolve_method,
co_flagging=co_flagging,
)
......@@ -172,14 +170,13 @@ def harmWrapper(harm=True, heap={}):
flags_col = flags_col.squeeze(axis=1)
drops = drops.squeeze(axis=1)
flags_col.loc[drops.index] = drops
# but to stick with the policy of always having flags as pd.DataFrames we blow up the flags col again:
if isinstance(flags_col, pd.Series):
flags_col = flags_col.to_frame()
flagger_back_full = flagger.initFlags(flags=flags_col)
dat_col = heap[field]["original_data"].reindex(
flags_col.index, fill_value=np.nan
)
dat_col = harm_info[Heap.DATA].reindex(flags_col.index, fill_value=np.nan)
dat_col.name = field
# transform the result into the form, data travels through saqc:
data, flagger_out = _toMerged(
......@@ -188,42 +185,26 @@ def harmWrapper(harm=True, heap={}):
field,
dat_col,
flagger_back_full,
target_index=heap["initial_ts"],
target_index=heap[Heap.INDEX],
)
# remove weight from the heap:
heap.pop(field)
# clear heap if nessecary:
if (len(heap.keys()) == 1) and (list(heap.keys())[0] == "initial_ts"):
heap.pop("initial_ts")
if len(heap) == 1 and Heap.INDEX in heap:
del heap[Heap.INDEX]
# bye bye data
return data, flagger_out
if harm:
return harmonize
else:
return deharmonize
return harmonize, deharmonize
# make functions public
harmonize = harmWrapper(harm=True)
deharmonize = harmWrapper(harm=False)
# the wrapper needs a special treatment
harmonize, deharmonize = harmWrapper()
register("harmonize")(harmonize)
register("deharmonize")(deharmonize)
# (de-)harmonize helper
def _outsortCrap(
data,
field,
flagger,
# drop_suspicious=True,
# drop_bad=True,
# drop_list=None,
drop_flags=None,
return_drops=False,
# **kwargs
data, field, flagger, drop_flags=None, return_drops=False,
):
"""Harmonization gets the more easy, the more data points we can exclude from crowded sampling intervals.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment