diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index b2bff99d7c2c790181c87daa01c9719c67f106e1..5fd514ca6baf567f9a6d924f7863f7c15144d19d 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -23,19 +23,29 @@ from saqc.lib.tools import toSequence # todo: accelerated func applies -def harmWrapper(harm=True, heap={}): +class Heap: + INDEX = "initial_ts" + DATA = "original_data" + FLAGGER = "original_flagger" + FREQ = "freq" + METHOD = "reshape_method" + DROP = "drop_flags" + + +HARM_2_DEHARM = { + "fshift": "invert_fshift", + "bshift": "invert_bshift", + "nearest_shift": "invert_nearest", + "fagg": "invert_fshift", + "bagg": "invert_bshift", + "nearest_agg": "invert_nearest", +} + + +def harmWrapper(heap={}): # NOTE: # (1) - harmonization will ALWAYS flag flagger.BAD all the np.nan values and afterwards DROP ALL # flagger.BAD flagged values from flags frame for further flagging!!!!!!!!!!!!!!!!!!!!! - harm_2_deharm = { - "fshift": "invert_fshift", - "bshift": "invert_bshift", - "nearest_shift": "invert_nearest", - "fagg": "invert_fshift", - "bagg": "invert_bshift", - "nearest_agg": "invert_nearest", - } - def harmonize( data, field, @@ -50,8 +60,6 @@ def harmWrapper(harm=True, heap={}): reshape_missing_flag=None, reshape_shift_comment=False, drop_flags=None, - # outsort_drop_susp=True, - # outsort_drop_list=None, data_missing_value=np.nan, **kwargs ): @@ -65,23 +73,19 @@ def harmWrapper(harm=True, heap={}): # before sending the current flags and data frame to the future (for backtracking reasons), we clear it # from merge-nans that just resulted from harmonization of other variables! dat_col, flagger_merged = _fromMerged(data, flagger, field) - # dat_col, flags_col = _fromMerged(data, flags, flagger, field) # now we send the flags frame in its current shape to the future: - # heap.update({field: {'original_data': flags_col.assign(data_values=dat_col)}}) heap[field] = { - "original_data": dat_col, - "original_flagger": flagger_merged, - "freq": freq, - "reshape_method": reshape_method, - "drop_flags": drop_flags, - # "drop_susp": outsort_drop_susp, - # "drop_list": outsort_drop_list, + Heap.DATA: dat_col, + Heap.FLAGGER: flagger_merged, + Heap.FREQ: freq, + Heap.METHOD: reshape_method, + Heap.DROP: drop_flags, } # furthermore we need to memorize the initial timestamp to ensure output format will equal input format. - if "initial_ts" not in heap.keys(): - heap.update({"initial_ts": dat_col.index}) + if Heap.INDEX not in heap.keys(): + heap.update({Heap.INDEX: dat_col.index}) # now we can manipulate it without loosing information gathered before harmonization dat_col, flagger_merged_clean = _outsortCrap( @@ -95,7 +99,7 @@ def harmWrapper(harm=True, heap={}): method=inter_method, order=inter_order, agg_method=inter_agg, - total_range=(heap["initial_ts"][0], heap["initial_ts"][-1]), + total_range=(heap[Heap.INDEX][0], heap[Heap.INDEX][-1]), downcast_interpolation=inter_downcast, ) @@ -134,24 +138,18 @@ def harmWrapper(harm=True, heap={}): return data, flagger # get some deharm configuration infos from the heap: - freq = heap[field]["freq"] - redrop_flags = heap[field]["drop_flags"] - # redrop_susp = heap[field]["drop_susp"] - # redrop_list = heap[field]["drop_list"] - resolve_method = harm_2_deharm[heap[field]["reshape_method"]] + harm_info = heap.pop(field) + resolve_method = HARM_2_DEHARM[harm_info[Heap.METHOD]] # retrieve data and flags from the merged saqc-conform data frame (and by that get rid of blow-up entries). dat_col, flagger_merged = _fromMerged(data, flagger, field) # reconstruct the drops that were performed before harmonization - # drops, pre_flags = _outsortCrap( drops, flagger_original_clean = _outsortCrap( dat_col, field, - heap[field]["original_flagger"], - # drop_suspicious=redrop_susp, - # drop_bad=True, - drop_flags=redrop_flags, + harm_info[Heap.FLAGGER], + drop_flags=harm_info[Heap.DROP], return_drops=True, ) @@ -160,7 +158,7 @@ def harmWrapper(harm=True, heap={}): flagger_back = _backtrackFlags( flagger_merged, flagger_original_clean, - freq, + harm_info[Heap.FREQ], track_method=resolve_method, co_flagging=co_flagging, ) @@ -172,14 +170,13 @@ def harmWrapper(harm=True, heap={}): flags_col = flags_col.squeeze(axis=1) drops = drops.squeeze(axis=1) flags_col.loc[drops.index] = drops + # but to stick with the policy of always having flags as pd.DataFrames we blow up the flags col again: if isinstance(flags_col, pd.Series): flags_col = flags_col.to_frame() flagger_back_full = flagger.initFlags(flags=flags_col) - dat_col = heap[field]["original_data"].reindex( - flags_col.index, fill_value=np.nan - ) + dat_col = harm_info[Heap.DATA].reindex(flags_col.index, fill_value=np.nan) dat_col.name = field # transform the result into the form, data travels through saqc: data, flagger_out = _toMerged( @@ -188,42 +185,26 @@ def harmWrapper(harm=True, heap={}): field, dat_col, flagger_back_full, - target_index=heap["initial_ts"], + target_index=heap[Heap.INDEX], ) - # remove weight from the heap: - heap.pop(field) # clear heap if nessecary: - if (len(heap.keys()) == 1) and (list(heap.keys())[0] == "initial_ts"): - heap.pop("initial_ts") + if len(heap) == 1 and Heap.INDEX in heap: + del heap[Heap.INDEX] + # bye bye data return data, flagger_out - if harm: - return harmonize - else: - return deharmonize - + return harmonize, deharmonize -# make functions public -harmonize = harmWrapper(harm=True) -deharmonize = harmWrapper(harm=False) -# the wrapper needs a special treatment +harmonize, deharmonize = harmWrapper() register("harmonize")(harmonize) register("deharmonize")(deharmonize) # (de-)harmonize helper def _outsortCrap( - data, - field, - flagger, - # drop_suspicious=True, - # drop_bad=True, - # drop_list=None, - drop_flags=None, - return_drops=False, - # **kwargs + data, field, flagger, drop_flags=None, return_drops=False, ): """Harmonization gets the more easy, the more data points we can exclude from crowded sampling intervals.