diff --git a/saqc/flagger/flags.py b/saqc/flagger/flags.py index 175833ff56c4c7bff8922689f29d56c2fc99c1e3..537cf91783b9abfed504e0d8a7ee94fd09e2d19b 100644 --- a/saqc/flagger/flags.py +++ b/saqc/flagger/flags.py @@ -275,7 +275,11 @@ class Flags: return str(self.toDios()).replace('DictOfSeries', type(self).__name__) -def initFlagsLike(reference: Union[pd.Series, DictLike, Flags], initial_value: float = UNFLAGGED) -> Flags: +def initFlagsLike( + reference: Union[pd.Series, DictLike, Flags], + initial_value: float = UNFLAGGED, + name: str = None, +) -> Flags: """ Create empty Flags, from an reference data structure. @@ -287,6 +291,12 @@ def initFlagsLike(reference: Union[pd.Series, DictLike, Flags], initial_value: f initial_value : float, default 0 value to initialize the columns with + name : str, default None + Only respected if `reference` is of type ``pd.Series``. + The column name that is used for the Flags. If ``None`` + the name of the series itself is taken, if this is also + `None`, a ValueError is raised. + Notes ----- Implementation detail: @@ -307,7 +317,13 @@ def initFlagsLike(reference: Union[pd.Series, DictLike, Flags], initial_value: f reference = reference._data if isinstance(reference, pd.Series): - reference = reference.to_frame('f0') + if name is None: + name = reference.name + if name is None: + raise ValueError("Either the passed series must be named or a name must be passed") + if not isinstance(name, str): + raise TypeError(f"name must be str not '{type(name).__name__}'") + reference = reference.to_frame(name=name) for k, item in reference.items(): @@ -327,5 +343,42 @@ def initFlagsLike(reference: Union[pd.Series, DictLike, Flags], initial_value: f return Flags(result) +def applyFunctionOnHistory(flags: Flags, column, hist_func, hist_kws, mask_func, mask_kws, last_column=None): + """ + Apply function on history. + + Two functions must be given. Both are called for each column in the History. One on History.hist, the + other on History.mask. Both take a pd.Series as first arg, which is the column from hist or mask respectively. + + Parameters + ---------- + flags : + column : + hist_func : + hist_kws : + mask_func : + mask_kws : + last_column : + + Returns + ------- + + """ + flags = flags.copy() + history = flags.history[column] + new_history = History() + for pos in history.columns: + new_history.hist[pos] = hist_func(history.hist[pos], **hist_kws) + new_history.mask[pos] = mask_func(history.mask[pos], **mask_kws) + + if last_column is None: + new_history.mask.iloc[:, -1:] = True + else: + new_history.append(last_column, force=True) + + flags.history[column] = new_history + return flags + + # for now we keep this name Flagger = Flags diff --git a/saqc/funcs/resampling.py b/saqc/funcs/resampling.py index bf809340ec8207b983053cb2ced153b89d492329..27d44b30dc76e85e2516366615e89dc36ccfab90 100644 --- a/saqc/funcs/resampling.py +++ b/saqc/funcs/resampling.py @@ -13,11 +13,12 @@ from dios import DictOfSeries from saqc.common import * from saqc.core.register import register -from saqc.flagger import Flagger +from saqc.flagger import Flagger, initFlagsLike, History from saqc.funcs.tools import copy, drop, rename from saqc.funcs.interpolation import interpolateIndex from saqc.lib.tools import dropper, evalFreqStr from saqc.lib.ts_operators import shift2Freq, aggregate2Freq +from saqc.flagger.flags import applyFunctionOnHistory logger = logging.getLogger("SaQC") @@ -42,7 +43,7 @@ def aggregate( value_func, flag_func: Callable[[pd.Series], float]=np.nanmax, method: Literal["fagg", "bagg", "nagg"]="nagg", - to_drop: Optional[Union[Any, Sequence[Any]]]=None, + to_drop: Optional[Union[Any, Sequence[Any]]]=None, # todo: rm, use to_mask instead **kwargs ) -> Tuple[DictOfSeries, Flagger]: """ @@ -343,7 +344,7 @@ def shift( method: Literal["fshift", "bshift", "nshift"]="nshift", to_drop: Optional[Union[Any, Sequence[Any]]]=None, empty_intervals_flag: Optional[str]=None, - freq_check: Optional[Literal["check", "auto"]]=None, + freq_check: Optional[Literal["check", "auto"]]=None, # todo: rm, not a user decision **kwargs ) -> Tuple[DictOfSeries, Flagger]: @@ -417,7 +418,7 @@ def _shift( """ data = data.copy() datcol = data[field] - flagscol = flagger.getFlags(field) + flagscol = flagger[field] if empty_intervals_flag is None: empty_intervals_flag = UNFLAGGED @@ -426,20 +427,33 @@ def _shift( drop_mask |= datcol.isna() datcol[drop_mask] = np.nan datcol.dropna(inplace=True) - freq = evalFreqStr(freq, freq_check, datcol.index) + flagscol.drop(drop_mask[drop_mask].index, inplace=True) + + # create a dummys if datcol.empty: - data[field] = datcol - reshaped_flagger = flagger.initFlags(datcol).setFlags(field, flag=flagscol, force=True, inplace=True, **kwargs) - flagger = flagger.slice(drop=field).merge(reshaped_flagger, subset=[field], inplace=True) - return data, flagger + datcol = pd.Series([], index=pd.DatetimeIndex([]), name=field) + flagscol = pd.Series([], index=pd.DatetimeIndex([]), name=field) - flagscol.drop(drop_mask[drop_mask].index, inplace=True) + # clear the past + flagger.history[field] = flagger.history[field].reindex(datcol.index) + flagger[field] = flagscol + + # do the shift, we need to process the history manually + else: + freq = evalFreqStr(freq, freq_check, datcol.index) + datcol = shift2Freq(datcol, method, freq, fill_value=np.nan) + + # after next 3 lines we leave history in unstable state + # but the following append will fix this + history = flagger.history[field] + history.hist = shift2Freq(history.hist, method, freq, fill_value=UNTOUCHED) + history.mask = shift2Freq(history.mask, method, freq, fill_value=False) + + flagscol = shift2Freq(flagscol, method, freq, fill_value=empty_intervals_flag) + history.append(flagscol, force=True) + flagger.history[field] = history - datcol = shift2Freq(datcol, method, freq, fill_value=np.nan) - flagscol = shift2Freq(flagscol, method, freq, fill_value=empty_intervals_flag) data[field] = datcol - reshaped_flagger = flagger.initFlags(datcol).setFlags(field, flag=flagscol, force=True, inplace=True, **kwargs) - flagger = flagger.slice(drop=field).merge(reshaped_flagger, subset=[field], inplace=True) return data, flagger @@ -546,7 +560,7 @@ def resample( data = data.copy() datcol = data[field] - flagscol = flagger.getFlags(field) + flagscol = flagger[field] if empty_intervals_flag is None: empty_intervals_flag = BAD @@ -554,41 +568,56 @@ def resample( datcol.drop(datcol[drop_mask].index, inplace=True) freq = evalFreqStr(freq, freq_check, datcol.index) flagscol.drop(flagscol[drop_mask].index, inplace=True) - if all_na_2_empty: - if datcol.dropna().empty: - datcol = pd.Series([], index=pd.DatetimeIndex([]), name=field) - if datcol.empty: - # for consistency reasons - return empty data/flags column when there is no valid data left - # after filtering. - data[field] = datcol - reshaped_flagger = flagger.initFlags(datcol).setFlags(field, flag=flagscol, force=True, inplace=True, **kwargs) - flagger = flagger.slice(drop=field).merge(reshaped_flagger, subset=[field], inplace=True) - return data, flagger + # create a dummys + if all_na_2_empty and datcol.dropna().empty: + + datcol = pd.Series([], index=pd.DatetimeIndex([]), name=field) + flagscol = pd.Series([], index=pd.DatetimeIndex([]), name=field) + + # clear the past + flagger.history[field] = flagger.history[field].reindex(datcol.index) + flagger[field] = flagscol + + # do the resampling + else: + datcol = aggregate2Freq( + datcol, + method, + freq, + agg_func, + fill_value=np.nan, + max_invalid_total=max_invalid_total_d, + max_invalid_consec=max_invalid_consec_d, + ) - datcol = aggregate2Freq( - datcol, - method, - freq, - agg_func, - fill_value=np.nan, - max_invalid_total=max_invalid_total_d, - max_invalid_consec=max_invalid_consec_d, - ) - flagscol = aggregate2Freq( - flagscol, - method, - freq, - flag_agg_func, - fill_value=empty_intervals_flag, - max_invalid_total=max_invalid_total_f, - max_invalid_consec=max_invalid_consec_f, - ) + flagscol = aggregate2Freq( + flagscol, + method, + freq, + flag_agg_func, + fill_value=empty_intervals_flag, + max_invalid_total=max_invalid_total_f, + max_invalid_consec=max_invalid_consec_f, + ) + + kws = dict( + method=method, + freq=freq, + agg_func=flag_agg_func, + fill_value=UNTOUCHED, + max_invalid_total=max_invalid_total_f, + max_invalid_consec=max_invalid_consec_f, + ) + + flagger = applyFunctionOnHistory( + flagger, field, + hist_func=aggregate2Freq, hist_kws=kws, + mask_func=aggregate2Freq, mask_kws=kws, + last_column=flagscol + ) - # data/flags reshaping: data[field] = datcol - reshaped_flagger = flagger.initFlags(datcol).setFlags(field, flag=flagscol, force=True, inplace=True, **kwargs) - flagger = flagger.slice(drop=field).merge(reshaped_flagger, subset=[field], inplace=True) return data, flagger @@ -671,6 +700,7 @@ def reindexFlags( """ # TODO: This needs a refactoring + raise NotImplementedError("currently not available - rewrite needed") flagscol, metacols = flagger.getFlags(source, full=True) if flagscol.empty: