diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index 447ea2d9889549deca92985c4551b6919a4b8a95..b2bff99d7c2c790181c87daa01c9719c67f106e1 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -7,6 +7,7 @@ import logging from saqc.funcs.functions import flagMissing from saqc.funcs.register import register +from saqc.lib.tools import toSequence # todo: frequencie estimation function @@ -46,10 +47,11 @@ def harmWrapper(harm=True, heap={}): inter_order=1, inter_downcast=False, reshape_agg=max, - reshape_missing_flag_index=-1, + reshape_missing_flag=None, reshape_shift_comment=False, - outsort_drop_susp=True, - outsort_drop_list=None, + drop_flags=None, + # outsort_drop_susp=True, + # outsort_drop_list=None, data_missing_value=np.nan, **kwargs ): @@ -72,8 +74,9 @@ def harmWrapper(harm=True, heap={}): "original_flagger": flagger_merged, "freq": freq, "reshape_method": reshape_method, - "drop_susp": outsort_drop_susp, - "drop_list": outsort_drop_list, + "drop_flags": drop_flags, + # "drop_susp": outsort_drop_susp, + # "drop_list": outsort_drop_list, } # furthermore we need to memorize the initial timestamp to ensure output format will equal input format. @@ -82,12 +85,7 @@ def harmWrapper(harm=True, heap={}): # now we can manipulate it without loosing information gathered before harmonization dat_col, flagger_merged_clean = _outsortCrap( - dat_col, - field, - flagger_merged, - drop_suspicious=outsort_drop_susp, - drop_bad=True, - drop_list=outsort_drop_list, + dat_col, field, flagger_merged, drop_flags=drop_flags, ) # interpolation! (yeah) @@ -108,7 +106,7 @@ def harmWrapper(harm=True, heap={}): ref_index=dat_col.index, method=reshape_method, agg_method=reshape_agg, - missing_flag=reshape_missing_flag_index, + missing_flag=reshape_missing_flag, set_shift_comment=reshape_shift_comment, **kwargs ) @@ -137,8 +135,9 @@ def harmWrapper(harm=True, heap={}): # get some deharm configuration infos from the heap: freq = heap[field]["freq"] - redrop_susp = heap[field]["drop_susp"] - redrop_list = heap[field]["drop_list"] + redrop_flags = heap[field]["drop_flags"] + # redrop_susp = heap[field]["drop_susp"] + # redrop_list = heap[field]["drop_list"] resolve_method = harm_2_deharm[heap[field]["reshape_method"]] # retrieve data and flags from the merged saqc-conform data frame (and by that get rid of blow-up entries). @@ -150,9 +149,9 @@ def harmWrapper(harm=True, heap={}): dat_col, field, heap[field]["original_flagger"], - drop_suspicious=redrop_susp, - drop_bad=True, - drop_list=redrop_list, + # drop_suspicious=redrop_susp, + # drop_bad=True, + drop_flags=redrop_flags, return_drops=True, ) @@ -219,11 +218,12 @@ def _outsortCrap( data, field, flagger, - drop_suspicious=True, - drop_bad=True, - drop_list=None, + # drop_suspicious=True, + # drop_bad=True, + # drop_list=None, + drop_flags=None, return_drops=False, - **kwargs + # **kwargs ): """Harmonization gets the more easy, the more data points we can exclude from crowded sampling intervals. @@ -231,7 +231,6 @@ def _outsortCrap( the data and the flags passed. In deharmonization the function is used to reconstruct original flags field shape. :param data: pd.Series. ['data']. - :param flags: pd.PandasLike. The flags associated with the data passed. :param flagger: saqc.flagger. :param drop_suspicious: Boolean. Default = True. If True, only values that are flagged GOOD or UNFLAGGED get processed. @@ -247,35 +246,15 @@ def _outsortCrap( drop_mask = pd.Series(data=False, index=data.index) - if drop_bad is True: + drop_flags = toSequence(drop_flags, default=flagger.BAD) + for drop_flag in drop_flags: drop_mask = drop_mask | flagger.isFlagged( - field, flag=flagger.BAD, comparator="==" - ) - - if drop_suspicious is True: - drop_mask = drop_mask | ~( - flagger.isFlagged(field, flag=flagger.GOOD, comparator="<=") + field, flag=drop_flag, comparator="==" ) - if drop_list is not None: - for to_drop in drop_list: - if to_drop in flagger.dtype.categories: - drop_mask = drop_mask | flagger.isFlagged( - field, flag=to_drop, comparator="==" - ) - else: - logging.warning( - 'Cant drop "{}" - flagged data. Its not a flag value, the passed flagger happens to ' - "know about.".format(str(to_drop)) - ) - flagger_out = flagger.getFlagger(loc=~drop_mask) - # data_out = flagger.getFlags(loc=drop_mask) if return_drops else data[~drop] if return_drops: - # return flag drops at first argument return flagger.getFlags(loc=drop_mask), flagger_out - # return flags[drop_mask], flags[~drop_mask] - # return data at first argument return data[~drop_mask], flagger_out @@ -537,7 +516,7 @@ def _reshapeFlags( ref_index, method="fshift", agg_method=max, - missing_flag=-1, + missing_flag=None, set_shift_comment=True, **kwargs ): @@ -581,6 +560,7 @@ def _reshapeFlags( :return: flags: pd.Series/pd.DataFrame. The reshaped pandas like Flags object, referring to the harmonized data. """ + missing_flag = missing_flag or flagger.UNFLAGGED aggregations = ["nearest_agg", "bagg", "fagg"] shifts = ["fshift", "bshift", "nearest_shift"] @@ -608,11 +588,7 @@ def _reshapeFlags( flags_series = flags.squeeze() flagger_new = flagger.initFlags(flags=flags).setFlags( - field, - loc=flags_series.isna(), - flag=flagger.dtype.categories[missing_flag], - force=True, - **kwargs + field, loc=flags_series.isna(), flag=missing_flag, force=True, **kwargs ) if set_shift_comment: @@ -654,11 +630,7 @@ def _reshapeFlags( .squeeze() .resample(freq_string, closed=closed, label=label, base=base) # NOTE: breaks for non categorical flaggers - .apply( - lambda x: agg_method(x) - if not x.empty - else flagger.dtype.categories[missing_flag] - ) + .apply(lambda x: agg_method(x) if not x.empty else missing_flag) .astype(flagger.dtype) .to_frame(name=field) ) @@ -808,22 +780,20 @@ def _toMerged( # trivial case: there is only one variable: if flags.empty: data = data_to_insert.reindex(target_index).to_frame(name=fieldname) - flags = flags_to_insert.reindex( - target_index, fill_value=flagger.dtype.categories[0] - ) + flags = flags_to_insert.reindex(target_index, fill_value=flagger.UNFLAGGED) return data, flagger.initFlags(flags=flags) # annoying case: more than one variables: # erase nan rows resulting from harmonization but keep/regain those, that were initially present in the data: new_index = data[mask].index.join(target_index, how="outer") data = data.reindex(new_index) - flags = flags.reindex(new_index, fill_value=flagger.dtype.categories[0]) + flags = flags.reindex(new_index, fill_value=flagger.UNFLAGGED) data = pd.merge( data, data_to_insert, how="outer", left_index=True, right_index=True ) flags = pd.merge( flags, flags_to_insert, how="outer", left_index=True, right_index=True ) - flags.fillna(flagger.dtype.categories[0], inplace=True) + flags.fillna(flagger.UNFLAGGED, inplace=True) # internally harmonization memorizes its own manipulation by inserting nan flags - # those we will now assign the flagger.bad flag by the "missingTest": diff --git a/test/funcs/test_harm_funcs.py b/test/funcs/test_harm_funcs.py index 2f9accf33a2ba30c3b32939605543c40fa3538f0..e8944b6feb634160feb392a03ac652d279f5c5f8 100644 --- a/test/funcs/test_harm_funcs.py +++ b/test/funcs/test_harm_funcs.py @@ -18,23 +18,16 @@ from saqc.funcs.harm_functions import ( ) -TESTFLAGGER = TESTFLAGGER[:-1] - - RESHAPERS = ["nearest_shift", "fshift", "bshift"] - COFLAGGING = [False, True] - SETSHIFTCOMMENT = [False, True] - INTERPOLATIONS = ["fshift", "bshift", "nearest_shift", "nearest_agg", "bagg"] INTERPOLATIONS2 = ["fagg", "time", "polynomial"] - FREQS = ["15min", "30min"] @@ -317,31 +310,15 @@ def test_outsortCrap(data, flagger): flagger = flagger.setFlags(field, iloc=slice(5, 7)) drop_index = data.index[5:7] - d, _ = _outsortCrap(data, field, flagger, drop_suspicious=True, drop_bad=False) - assert drop_index.difference(d.index).equals(drop_index) - - d, _ = _outsortCrap(data, field, flagger, drop_suspicious=False, drop_bad=True) + d, _ = _outsortCrap(data, field, flagger, drop_flags=flagger.BAD) assert drop_index.difference(d.index).equals(drop_index) flagger = flagger.setFlags(field, iloc=slice(0, 1), flag=flagger.GOOD) drop_index = drop_index.insert(-1, data.index[0]) - d, _ = _outsortCrap( - data, - field, - flagger, - drop_suspicious=False, - drop_bad=False, - drop_list=[flagger.BAD, flagger.GOOD], - ) - + d, _ = _outsortCrap(data, field, flagger, drop_flags=[flagger.BAD, flagger.GOOD],) assert drop_index.sort_values().difference(d.index).equals(drop_index.sort_values()) + f_drop, _ = _outsortCrap( - data, - field, - flagger, - drop_suspicious=False, - drop_bad=False, - drop_list=[flagger.BAD, flagger.GOOD], - return_drops=True, + data, field, flagger, drop_flags=[flagger.BAD, flagger.GOOD], return_drops=True, ) assert f_drop.index.sort_values().equals(drop_index.sort_values())