Skip to content
Snippets Groups Projects
Commit 6a684dbb authored by Bert Palm's avatar Bert Palm 🎇
Browse files

fixed resampling.py, added History changing function to flags.py

parent 441acd0d
No related branches found
No related tags found
4 merge requests!271Static expansion of regular expressions,!260Follow-Up Translations,!237Flagger Translations,!232WIP: Fuzzy testing
......@@ -275,7 +275,11 @@ class Flags:
return str(self.toDios()).replace('DictOfSeries', type(self).__name__)
def initFlagsLike(reference: Union[pd.Series, DictLike, Flags], initial_value: float = UNFLAGGED) -> Flags:
def initFlagsLike(
reference: Union[pd.Series, DictLike, Flags],
initial_value: float = UNFLAGGED,
name: str = None,
) -> Flags:
"""
Create empty Flags, from an reference data structure.
......@@ -287,6 +291,12 @@ def initFlagsLike(reference: Union[pd.Series, DictLike, Flags], initial_value: f
initial_value : float, default 0
value to initialize the columns with
name : str, default None
Only respected if `reference` is of type ``pd.Series``.
The column name that is used for the Flags. If ``None``
the name of the series itself is taken, if this is also
`None`, a ValueError is raised.
Notes
-----
Implementation detail:
......@@ -307,7 +317,13 @@ def initFlagsLike(reference: Union[pd.Series, DictLike, Flags], initial_value: f
reference = reference._data
if isinstance(reference, pd.Series):
reference = reference.to_frame('f0')
if name is None:
name = reference.name
if name is None:
raise ValueError("Either the passed series must be named or a name must be passed")
if not isinstance(name, str):
raise TypeError(f"name must be str not '{type(name).__name__}'")
reference = reference.to_frame(name=name)
for k, item in reference.items():
......@@ -327,5 +343,42 @@ def initFlagsLike(reference: Union[pd.Series, DictLike, Flags], initial_value: f
return Flags(result)
def applyFunctionOnHistory(flags: Flags, column, hist_func, hist_kws, mask_func, mask_kws, last_column=None):
"""
Apply function on history.
Two functions must be given. Both are called for each column in the History. One on History.hist, the
other on History.mask. Both take a pd.Series as first arg, which is the column from hist or mask respectively.
Parameters
----------
flags :
column :
hist_func :
hist_kws :
mask_func :
mask_kws :
last_column :
Returns
-------
"""
flags = flags.copy()
history = flags.history[column]
new_history = History()
for pos in history.columns:
new_history.hist[pos] = hist_func(history.hist[pos], **hist_kws)
new_history.mask[pos] = mask_func(history.mask[pos], **mask_kws)
if last_column is None:
new_history.mask.iloc[:, -1:] = True
else:
new_history.append(last_column, force=True)
flags.history[column] = new_history
return flags
# for now we keep this name
Flagger = Flags
......@@ -13,11 +13,12 @@ from dios import DictOfSeries
from saqc.common import *
from saqc.core.register import register
from saqc.flagger import Flagger
from saqc.flagger import Flagger, initFlagsLike, History
from saqc.funcs.tools import copy, drop, rename
from saqc.funcs.interpolation import interpolateIndex
from saqc.lib.tools import dropper, evalFreqStr
from saqc.lib.ts_operators import shift2Freq, aggregate2Freq
from saqc.flagger.flags import applyFunctionOnHistory
logger = logging.getLogger("SaQC")
......@@ -42,7 +43,7 @@ def aggregate(
value_func,
flag_func: Callable[[pd.Series], float]=np.nanmax,
method: Literal["fagg", "bagg", "nagg"]="nagg",
to_drop: Optional[Union[Any, Sequence[Any]]]=None,
to_drop: Optional[Union[Any, Sequence[Any]]]=None, # todo: rm, use to_mask instead
**kwargs
) -> Tuple[DictOfSeries, Flagger]:
"""
......@@ -343,7 +344,7 @@ def shift(
method: Literal["fshift", "bshift", "nshift"]="nshift",
to_drop: Optional[Union[Any, Sequence[Any]]]=None,
empty_intervals_flag: Optional[str]=None,
freq_check: Optional[Literal["check", "auto"]]=None,
freq_check: Optional[Literal["check", "auto"]]=None, # todo: rm, not a user decision
**kwargs
) -> Tuple[DictOfSeries, Flagger]:
......@@ -417,7 +418,7 @@ def _shift(
"""
data = data.copy()
datcol = data[field]
flagscol = flagger.getFlags(field)
flagscol = flagger[field]
if empty_intervals_flag is None:
empty_intervals_flag = UNFLAGGED
......@@ -426,20 +427,33 @@ def _shift(
drop_mask |= datcol.isna()
datcol[drop_mask] = np.nan
datcol.dropna(inplace=True)
freq = evalFreqStr(freq, freq_check, datcol.index)
flagscol.drop(drop_mask[drop_mask].index, inplace=True)
# create a dummys
if datcol.empty:
data[field] = datcol
reshaped_flagger = flagger.initFlags(datcol).setFlags(field, flag=flagscol, force=True, inplace=True, **kwargs)
flagger = flagger.slice(drop=field).merge(reshaped_flagger, subset=[field], inplace=True)
return data, flagger
datcol = pd.Series([], index=pd.DatetimeIndex([]), name=field)
flagscol = pd.Series([], index=pd.DatetimeIndex([]), name=field)
flagscol.drop(drop_mask[drop_mask].index, inplace=True)
# clear the past
flagger.history[field] = flagger.history[field].reindex(datcol.index)
flagger[field] = flagscol
# do the shift, we need to process the history manually
else:
freq = evalFreqStr(freq, freq_check, datcol.index)
datcol = shift2Freq(datcol, method, freq, fill_value=np.nan)
# after next 3 lines we leave history in unstable state
# but the following append will fix this
history = flagger.history[field]
history.hist = shift2Freq(history.hist, method, freq, fill_value=UNTOUCHED)
history.mask = shift2Freq(history.mask, method, freq, fill_value=False)
flagscol = shift2Freq(flagscol, method, freq, fill_value=empty_intervals_flag)
history.append(flagscol, force=True)
flagger.history[field] = history
datcol = shift2Freq(datcol, method, freq, fill_value=np.nan)
flagscol = shift2Freq(flagscol, method, freq, fill_value=empty_intervals_flag)
data[field] = datcol
reshaped_flagger = flagger.initFlags(datcol).setFlags(field, flag=flagscol, force=True, inplace=True, **kwargs)
flagger = flagger.slice(drop=field).merge(reshaped_flagger, subset=[field], inplace=True)
return data, flagger
......@@ -546,7 +560,7 @@ def resample(
data = data.copy()
datcol = data[field]
flagscol = flagger.getFlags(field)
flagscol = flagger[field]
if empty_intervals_flag is None:
empty_intervals_flag = BAD
......@@ -554,41 +568,56 @@ def resample(
datcol.drop(datcol[drop_mask].index, inplace=True)
freq = evalFreqStr(freq, freq_check, datcol.index)
flagscol.drop(flagscol[drop_mask].index, inplace=True)
if all_na_2_empty:
if datcol.dropna().empty:
datcol = pd.Series([], index=pd.DatetimeIndex([]), name=field)
if datcol.empty:
# for consistency reasons - return empty data/flags column when there is no valid data left
# after filtering.
data[field] = datcol
reshaped_flagger = flagger.initFlags(datcol).setFlags(field, flag=flagscol, force=True, inplace=True, **kwargs)
flagger = flagger.slice(drop=field).merge(reshaped_flagger, subset=[field], inplace=True)
return data, flagger
# create a dummys
if all_na_2_empty and datcol.dropna().empty:
datcol = pd.Series([], index=pd.DatetimeIndex([]), name=field)
flagscol = pd.Series([], index=pd.DatetimeIndex([]), name=field)
# clear the past
flagger.history[field] = flagger.history[field].reindex(datcol.index)
flagger[field] = flagscol
# do the resampling
else:
datcol = aggregate2Freq(
datcol,
method,
freq,
agg_func,
fill_value=np.nan,
max_invalid_total=max_invalid_total_d,
max_invalid_consec=max_invalid_consec_d,
)
datcol = aggregate2Freq(
datcol,
method,
freq,
agg_func,
fill_value=np.nan,
max_invalid_total=max_invalid_total_d,
max_invalid_consec=max_invalid_consec_d,
)
flagscol = aggregate2Freq(
flagscol,
method,
freq,
flag_agg_func,
fill_value=empty_intervals_flag,
max_invalid_total=max_invalid_total_f,
max_invalid_consec=max_invalid_consec_f,
)
flagscol = aggregate2Freq(
flagscol,
method,
freq,
flag_agg_func,
fill_value=empty_intervals_flag,
max_invalid_total=max_invalid_total_f,
max_invalid_consec=max_invalid_consec_f,
)
kws = dict(
method=method,
freq=freq,
agg_func=flag_agg_func,
fill_value=UNTOUCHED,
max_invalid_total=max_invalid_total_f,
max_invalid_consec=max_invalid_consec_f,
)
flagger = applyFunctionOnHistory(
flagger, field,
hist_func=aggregate2Freq, hist_kws=kws,
mask_func=aggregate2Freq, mask_kws=kws,
last_column=flagscol
)
# data/flags reshaping:
data[field] = datcol
reshaped_flagger = flagger.initFlags(datcol).setFlags(field, flag=flagscol, force=True, inplace=True, **kwargs)
flagger = flagger.slice(drop=field).merge(reshaped_flagger, subset=[field], inplace=True)
return data, flagger
......@@ -671,6 +700,7 @@ def reindexFlags(
"""
# TODO: This needs a refactoring
raise NotImplementedError("currently not available - rewrite needed")
flagscol, metacols = flagger.getFlags(source, full=True)
if flagscol.empty:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment