Skip to content
Snippets Groups Projects
Commit 7e0f4bbf authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

Merge branch 'master' of https://git.ufz.de/rdm/saqc

parents 76f42db6 416fc325
No related branches found
No related tags found
No related merge requests found
......@@ -28,7 +28,7 @@ def flagWindow(flagger, flags, mask, direction='fw', window=0, **kwargs) -> pd.S
fw = f.rolling(window=window, closed='both').sum().astype(bool)
fmask = bw | fw
flags[fmask] = flagger.setFlag(flags[fmask], **kwargs)
flags.loc[fmask] = flagger.setFlag(flags.loc[fmask], **kwargs)
return flags
......@@ -47,7 +47,8 @@ def assignTypeSafe(df, colname, rhs):
all columns are converted to the most generic
of the dtypes
"""
df.loc[:, colname] = rhs
# do not use .loc here, as it fails silently :/
df[colname] = rhs
if isinstance(rhs, pd.Series):
dtypes = rhs.dtypes
else:
......@@ -123,8 +124,8 @@ def runner(metafname, flagger, data, flags=None, nodata=np.nan):
raise NameError(
f"function name {func_name} is not definied (variable '{varname}, 'line: {idx + 1})")
old = flagger.isFlagged(fchunk[varname])
new = flagger.isFlagged(ffchunk[varname])
old = flagger.getFlags(fchunk[varname])
new = flagger.getFlags(ffchunk[varname])
mask = old != new
# flag a timespan after the condition is met
......@@ -136,8 +137,7 @@ def runner(metafname, flagger, data, flags=None, nodata=np.nan):
# flag a certain amount of values after condition is met
if Params.FLAGVALUES in flag_params:
ffchunk = assignTypeSafe(
ffchunk,
varname,
ffchunk, varname,
flagNext(flagger, ffchunk[varname], mask, func_name=func_name, **flag_params))
if flag_params.get(Params.PLOT, False):
......@@ -145,7 +145,7 @@ def runner(metafname, flagger, data, flags=None, nodata=np.nan):
plot(dchunk, ffchunk, mask, varname, flagger, title=flag_test)
data.loc[start_date:end_date] = dchunk
flags[start_date:end_date] = ffchunk.squeeze()
flags.loc[start_date:end_date] = ffchunk.squeeze()
flagger.nextTest()
......
......@@ -77,9 +77,17 @@ class DmpFlagger(BaseFlagger):
return super().isFlagged(flagcol, flag, comparator)
def getFlags(self, flags):
flags = self._reduceColumns(flags)
flagcol = flags.loc[:, FlagFields.FLAG]
return super().getFlags(flagcol)
if isinstance(flags, pd.Series):
return super().getFlags(flags)
elif isinstance(flags, pd.DataFrame):
if isinstance(flags.columns, pd.MultiIndex):
f = flags.xs(FlagFields.FLAG, level=ColumnLevels.FLAGS, axis=1)
else:
f = flags.loc[:, FlagFields.FLAG]
else:
raise TypeError(flags)
return f.squeeze()
def _reduceColumns(self, flags):
if set(flags.columns) == set(self.flag_fields):
......
......@@ -41,7 +41,7 @@ def test_positionalPartitioning(flagger):
vname, start_index, end_index = row[fields]
fchunk = pflags.loc[flagger.isFlagged(pflags[vname]), vname]
assert fchunk.index.min() == start_index, "different start indices"
assert fchunk.index.max() + 1 == end_index, f"different end indices: {fchunk.index.max()} vs. {end_index}"
assert fchunk.index.max() == end_index, f"different end indices: {fchunk.index.max()} vs. {end_index}"
@pytest.mark.parametrize("flagger", TESTFLAGGERS)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment