Skip to content
Snippets Groups Projects
Commit 1583f047 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

fixed the nastiest of all my bugs

parent 02dd7f3c
No related branches found
No related tags found
3 merge requests!271Static expansion of regular expressions,!260Follow-Up Translations,!237Flagger Translations
......@@ -173,7 +173,7 @@ def _overlap_rs(x, freq='1min', fill_value=-np.inf):
# we are appending last regular grid entry (if necessary), to conserve integrity of groups of regularized
# timestamps originating all from the same logger.
try:
x = x.append(pd.Series([-np.inf], index=[end]), verify_integrity=True)
x = x.append(pd.Series([fill_value], index=[end]), verify_integrity=True)
except ValueError:
pass
return x
......
......@@ -19,6 +19,7 @@ from saqc.funcs.interpolation import interpolateIndex
from saqc.lib.tools import getDropMask, evalFreqStr
from saqc.lib.ts_operators import shift2Freq, aggregate2Freq
from saqc.flagger.flags import applyFunctionOnHistory
from saqc.lib.rolling import customRoller
logger = logging.getLogger("SaQC")
......@@ -564,7 +565,9 @@ def resample(
# create a dummys
if all_na_2_empty and datcol.dropna().empty:
# Todo: This needs discussion. It makes possible, that different harmonized variables,
# resulting from the harmonization of the same logger, have differing timestamps!
# (Same holds for starting/ending nan-chunk truncation)
datcol = pd.Series([], index=pd.DatetimeIndex([]), name=field)
flagscol = pd.Series([], index=pd.DatetimeIndex([]), name=field)
......@@ -614,7 +617,24 @@ def resample(
return data, flagger
@register(masking='field', module="resampling")
def _getChunkBounds(target_datcol, flagscol, freq):
chunk_end = target_datcol.reindex(flagscol.index, method='bfill', tolerance=freq)
chunk_start = target_datcol.reindex(flagscol.index, method='ffill', tolerance=freq)
ignore_flags = (chunk_end.isna() | chunk_start.isna())
return ignore_flags
def _inverseInterpolation(target_flagscol, flagscol=None, freq=None):
backprojected = flagscol.reindex(target_flagscol.index, method="bfill", tolerance=freq)
fwrdprojected = flagscol.reindex(target_flagscol.index, method="ffill", tolerance=freq)
b_replacement_mask = (backprojected > target_flagscol) & (backprojected >= fwrdprojected)
f_replacement_mask = (fwrdprojected > target_flagscol) & (fwrdprojected > backprojected)
target_flagscol.loc[b_replacement_mask] = backprojected.loc[b_replacement_mask]
target_flagscol.loc[f_replacement_mask] = fwrdprojected.loc[f_replacement_mask]
return target_flagscol
@register(masking='none', module="resampling")
def reindexFlags(
data: DictOfSeries,
field: str,
......@@ -622,8 +642,7 @@ def reindexFlags(
method: Literal["inverse_fagg", "inverse_bagg", "inverse_nagg", "inverse_fshift", "inverse_bshift", "inverse_nshift"],
source: str,
freq: Optional[str]=None,
to_drop: Optional[Union[Any, Sequence[Any]]]=None,
freq_check: Optional[Literal["check", "auto"]]=None,
to_mask: Optional[Union[Any, Sequence[Any]]]=BAD,
**kwargs
) -> Tuple[DictOfSeries, Flagger]:
......@@ -674,14 +693,9 @@ def reindexFlags(
freq : {None, str},default None
The freq determines the projection range for the projection method. See above description for more details.
Defaultly (None), the sampling frequency of source is used.
to_drop : {None, str, List[str]}, default None
to_mask : {None, str, List[str]}, default None
Flags referring to values that are to drop before flags projection. Relevant only when projecting with an
inverted shift method. Defaultly BAD is listed.
freq_check : {None, 'check', 'auto'}, default None
- None: do not validate frequency-string passed to `freq`
- 'check': estimate frequency and log a warning if estimate miss matchs frequency string passed to 'freq', or
if no uniform sampling rate could be estimated
- 'auto': estimate frequency and use estimate. (Ignores `freq` parameter.)
Returns
-------
......@@ -692,37 +706,20 @@ def reindexFlags(
Flags values and shape may have changed relatively to the flagger input.
"""
# TODO: This needs a refactoring
raise NotImplementedError("currently not available - rewrite needed")
if to_mask is None:
to_mask = BAD
flagscol, metacols = flagger.getFlags(source, full=True)
flagscol = flagger[source]
if flagscol.empty:
return data, flagger
target_datcol = data[field]
target_flagscol, target_metacols = flagger.getFlags(field, full=True)
if (freq is None) and (method != "match"):
freq_check = 'auto'
freq = evalFreqStr(freq, freq_check, flagscol.index)
target_flagscol = flagger[field]
if method[-13:] == "interpolation":
backprojected = flagscol.reindex(target_flagscol.index, method="bfill", tolerance=freq)
fwrdprojected = flagscol.reindex(target_flagscol.index, method="ffill", tolerance=freq)
b_replacement_mask = (backprojected > target_flagscol) & (backprojected >= fwrdprojected)
f_replacement_mask = (fwrdprojected > target_flagscol) & (fwrdprojected > backprojected)
target_flagscol.loc[b_replacement_mask] = backprojected.loc[b_replacement_mask]
target_flagscol.loc[f_replacement_mask] = fwrdprojected.loc[f_replacement_mask]
backprojected_meta = {}
fwrdprojected_meta = {}
for meta_key in target_metacols.keys():
backprojected_meta[meta_key] = metacols[meta_key].reindex(target_metacols[meta_key].index, method='bfill',
tolerance=freq)
fwrdprojected_meta[meta_key] = metacols[meta_key].reindex(target_metacols[meta_key].index, method='ffill',
tolerance=freq)
target_metacols[meta_key].loc[b_replacement_mask] = backprojected_meta[meta_key].loc[b_replacement_mask]
target_metacols[meta_key].loc[f_replacement_mask] = fwrdprojected_meta[meta_key].loc[f_replacement_mask]
ignore = _getChunkBounds(target_datcol, flagscol, freq)
flagscol[ignore] = np.nan
target_flagscol = _inverseInterpolation(target_flagscol, flagscol, freq)
if method[-3:] == "agg" or method == "match":
# Aggregation - Inversion
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment