Skip to content
Snippets Groups Projects
Commit e6adfd39 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

implemented flagging back projection method

parent 39864a13
No related branches found
No related tags found
3 merge requests!193Release 1.4,!188Release 1.4,!49Dataprocessing features
Pipeline #4655 passed with stage
in 6 minutes and 54 seconds
......@@ -7,6 +7,13 @@ from saqc.core.register import register
from saqc.lib.ts_operators import interpolateNANs, aggregate2Freq, shift2Freq
from saqc.lib.tools import toSequence
METHOD2ARGS = {'inverse_fshift': ('backward', pd.Timedelta),
'inverse_bshift': ('forward', pd.Timedelta),
'inverse_nshift': ('nearest', lambda x: pd.Timedelta(x)/2),
'inverse_fagg': ('bfill', pd.Timedelta),
'inverse_bagg': ('ffill', pd.Timedelta),
'inverse_nagg': ('nearest', lambda x: pd.Timedelta(x)/2)}
@register
def proc_interpolateMissing(data, field, flagger, method, inter_order=2, inter_limit=2, interpol_flag='UNFLAGGED',
......@@ -70,6 +77,8 @@ def proc_interpolateGrid(data, field, flagger, freq, method, inter_order=2, drop
grid_index = pd.date_range(start=datcol.index[0].floor(freq), end=datcol.index[-1].ceil(freq), freq=freq,
name=datcol.index.name)
aligned_start = datcol.index[0] == grid_index[0]
aligned_end = datcol.index[-1] == grid_index[-1]
datcol = datcol.reindex(
datcol.index.join(grid_index, how="outer", )
)
......@@ -105,10 +114,19 @@ def proc_interpolateGrid(data, field, flagger, freq, method, inter_order=2, drop
# ...hack done
# we might miss the flag for interpolated data grids last entry (if we miss it - the datapoint is always nan
# - just settling a convention here):
# - just settling a convention here(resulting GRID should start BEFORE first valid data entry and range to AFTER
# last valid data)):
if inter_data.shape[0] > flagscol.shape[0]:
flagscol = flagscol.append(pd.Series(empty_intervals_flag, index=[datcol.index[-1]]))
# Additional consistency operation: we have to block first/last interpolated datas flags - since they very
# likely represent chunk starts/ends (except data start and or end timestamp were grid-aligned before Grid
# interpolation already.)
if np.isnan(inter_data[0]) and not aligned_start:
chunk_bounds = chunk_bounds.insert(0, inter_data.index[0])
if np.isnan(inter_data[-1]) and not aligned_end:
chunk_bounds = chunk_bounds.append(pd.DatetimeIndex([inter_data.index[-1]]))
chunk_bounds = chunk_bounds.unique()
flagger_new = flagger.initFlags(inter_data).setFlags(field, flag=flagscol, force=True, **kwargs)
# block chunk ends of interpolation
......@@ -130,8 +148,6 @@ def proc_resample(data, field, flagger, freq, func=np.mean, max_invalid_total_d=
if empty_intervals_flag is None:
empty_intervals_flag = flagger.BAD
datcol = aggregate2Freq(datcol, method, freq, func, fill_value=np.nan,
max_invalid_total=max_invalid_total_d, max_invalid_consec=max_invalid_consec_d)
flagscol = aggregate2Freq(flagscol, method, freq, flag_agg_func, fill_value=empty_intervals_flag,
......@@ -146,7 +162,7 @@ def proc_resample(data, field, flagger, freq, func=np.mean, max_invalid_total_d=
@register
def proc_shift(data, field, flagger, freq, method, drop_flags=None, empty_intervals_flag=None, **kwargs):
# Note: all data nans get excluded defaultly from shifting. I drop_flags is None - all BAD flagged values get
# Note: all data nans get excluded defaultly from shifting. If drop_flags is None - all BAD flagged values get
# excluded as well.
data = data.copy()
datcol = data[field]
......@@ -182,5 +198,71 @@ def proc_transform(data, field, flagger, func, **kwargs):
data[field] = new_col
return data, flagger
#@register
#def proc_projectFlags(data, field, flagger, target_field, **kwargs):
\ No newline at end of file
@register
def proc_projectFlags(data, field, flagger, target, method, freq=None, drop_flags=None, **kwargs):
datcol = data[field].copy()
target_datcol = data[target].copy()
flagscol = flagger.getFlags(field)
target_flagscol = flagger.getFlags(target)
if freq is None:
freq = pd.Timedelta(datcol.index.freq)
if freq is pd.NaT:
raise ValueError(
"Nor is {} a frequency regular timeseries, neither was a frequency passed to parameter 'freq'. "
"Dont know what to do.".format(field)
)
if method[-3:] == "agg":
# Aggregation - Inversion
projection_method = METHOD2ARGS[method][0]
tolerance = METHOD2ARGS[method][1](freq)
flagscol = flagscol.reindex(target_flagscol.index, method=projection_method,
tolerance=tolerance)
replacement_mask = flagscol > target_flagscol
target_flagscol.loc[replacement_mask] = flagscol.loc[replacement_mask]
if method[-5:] == "shift":
# NOTE: although inverting a simple shift seems to be a less complex operation, it has quite some
# code assigned to it and appears to be more verbose than inverting aggregation -
# that ownes itself to the problem of BAD/invalid values blocking a proper
# shift inversion and having to be outsorted before shift inversion and re-inserted afterwards.
#
# starting with the dropping and its memorization:
if drop_flags is None:
drop_flags = flagger.BAD
drop_flags = toSequence(drop_flags)
drop_mask = pd.Series(False, index=target_datcol.index)
for f in drop_flags:
drop_mask |= flagger.isFlagged(field, flag=f)
drop_mask |= target_datcol.isna()
target_flagscol_drops = target_flagscol[drop_mask]
target_flagscol.drop(drop_mask[drop_mask].index, inplace=True)
# shift inversion
projection_method = METHOD2ARGS[method][0]
tolerance = METHOD2ARGS[method][1](freq)
flags_merged = pd.merge_asof(
flagscol,
pd.Series(target_flagscol.index.values,
index=target_flagscol.index,
name="pre_index"),
left_index=True,
right_index=True,
tolerance=tolerance,
direction=projection_method,
)
flags_merged.dropna(subset=["pre_index"], inplace=True)
flags_merged = flags_merged.set_index(["pre_index"]).squeeze()
# write flags to target
replacement_mask = flags_merged > target_flagscol.loc[flags_merged.index]
target_flagscol.loc[replacement_mask[replacement_mask].index] = flags_merged.loc[replacement_mask]
# reinsert drops
target_flagscol = target_flagscol.reindex(target_flagscol.index.join(target_flagscol_drops.index, how='outer'))
target_flagscol.loc[target_flagscol_drops.index] = target_flagscol_drops.values
flagger = flagger.setFlags(field=target, flag=target_flagscol.values)
return data, flagger
\ No newline at end of file
......@@ -265,7 +265,7 @@ def aggregate2Freq(data, method, freq, agg_func, fill_value=np.nan, max_invalid_
# - resample AND groupBy do insert value zero for empty intervals if resampling with any kind of "sum" application -
# we want "fill_value" to be inserted
# - we are aggregating data and flags with this function and empty intervals usually would get assigned flagger.BAD
# flag (where resample inserts np.nan)
# flag (where resample inserts np.nan or 0)
data_resampler = data.resample(freq_string, base=base, closed=closed,
label=label)
......@@ -273,7 +273,7 @@ def aggregate2Freq(data, method, freq, agg_func, fill_value=np.nan, max_invalid_
empty_intervals = data_resampler.count() == 0
data = data_resampler.apply(agg_func)
# since loffset keyword of pandas "discharges" after one use of the resampler (pandas logic) - we correct the
# since loffset keyword of pandas.resample "discharges" after one use of the resampler (pandas logic) - we correct the
# resampled labels offset manually, if necessary.
if method == "nagg":
data.index = data.index.shift(freq=pd.Timedelta(freq) / 2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment