Skip to content
Snippets Groups Projects
Commit 23236f65 authored by Peter Lünenschloß's avatar Peter Lünenschloß
Browse files

simplifications

parent a687105f
No related branches found
No related tags found
1 merge request!256Filter funcs
Pipeline #22796 failed with stage
in 1 minute and 43 seconds
......@@ -12,7 +12,7 @@ from dios import DictOfSeries
from saqc.constants import *
from saqc.core import register, Flags
from saqc.lib.ts_operators import varQC
from saqc.lib.tools import customRoller, getFreqDelta
from saqc.lib.tools import customRoller, getFreqDelta, statPass
from saqc.lib.types import FreqString, ColumnName
......@@ -142,23 +142,9 @@ def flagByVariance(
max_consec_missing = np.inf
min_periods = int(np.ceil(pd.Timedelta(window) / pd.Timedelta(delta)))
to_set = statPass(dataseries, lambda x: varQC(x, max_missing, max_consec_missing),
window, thresh, min_periods=min_periods, comparator='<')
def var_below_thresh(s: pd.Series):
if varQC(s, max_missing, max_consec_missing) <= thresh:
return True
return np.nan
rolling = dataseries.rolling(window=window, min_periods=min_periods)
plateaus = rolling.apply(var_below_thresh, raw=False)
# are there any candidates for beeing flagged plateau-ish
if plateaus.sum() == 0:
return data, flags
plateaus.fillna(method="bfill", limit=min_periods - 1, inplace=True)
# result:
plateaus = (plateaus[plateaus == 1.0]).index
flags[plateaus, field] = flag
flags[to_set[to_set].index, field] = flag
return data, flags
......@@ -3,16 +3,15 @@
import pandas as pd
import numpy as np
from dios import DictOfSeries
from typing import Callable
from saqc.constants import *
from saqc.core import register, Flags
from saqc.lib.types import ColumnName, FreqString, PositiveInt, PositiveFloat
from saqc.lib.tools import getAttrOrApply
from saqc.lib.types import ColumnName, FreqString, PositiveInt, PositiveFloat, Literal
from saqc.lib.tools import statPass
@register(masking='field', module="noise")
def flagByVarianceLowPass(data: DictOfSeries,
def flagByLowPass(data: DictOfSeries,
field: ColumnName,
flags: Flags,
stat: Callable[[np.array, pd.Series], float],
......@@ -23,32 +22,14 @@ def flagByVarianceLowPass(data: DictOfSeries,
min_periods: PositiveInt = None,
flag: float = BAD,
**kwargs):
""""""
datcol = data[field]
if not min_periods:
min_periods = 0
if not sub_thresh:
sub_thresh = thresh
if not sub_wnsz:
sub_wnsz = wnsz
wnsz = pd.Timedelta(wnsz)
sub_wnsz = pd.Timedelta(sub_wnsz)
stat_parent = datcol.rolling(wnsz, min_periods=min_periods)
stat_parent = getAttrOrApply(stat_parent, stat)
exceeding_parent = stat_parent > thresh
stat_sub = datcol.rolling(sub_wnsz)
stat_sub = getAttrOrApply(stat_sub, stat)
min_stat = stat_sub.rolling(wnsz - sub_wnsz, closed='both').min()
exceeding_sub = min_stat > sub_thresh
exceeds = exceeding_sub & exceeding_parent
to_set = pd.Series(False, index=exceeds.index)
for g in exceeds.groupby(by=exceeds.values):
if g[0]:
to_set[g[1].index[0] - wnsz:g[1].index[-1]] = True
flags[exceeds[exceeds].index, field] = flag
to_set = statPass(datcol, stat, wnsz, thresh, sub_wnsz, sub_thresh, min_periods, comparator='>')
flags[to_set[to_set].index, field] = flag
return data, flags
\ No newline at end of file
......@@ -413,7 +413,8 @@ def _shift(
# The last 2 lines left the history in an unstable state, Also we want to
# append a dummy column, that represent the 'shift' in the history.
history.hist.loc[:, :0] = UNFLAGGED
#history.hist.loc[:, :0] = UNFLAGGED
history.hist[0] = UNFLAGGED
dummy = pd.Series(UNTOUCHED, index=datcol.index, dtype=float)
history.append(dummy, force=True)
......
......@@ -3,8 +3,8 @@
import re
import datetime
from typing import Sequence, Union, Any, Iterator
from typing import Sequence, Union, Any, Iterator,Callable
import operator
import itertools
import numpy as np
import numba as nb
......@@ -14,6 +14,8 @@ import logging
import dios
import collections
from scipy.cluster.hierarchy import linkage, fcluster
from saqc.lib.types import ColumnName, FreqString, PositiveInt, PositiveFloat, Literal
from saqc.lib.types import T
......@@ -580,4 +582,47 @@ def getAttrOrApply(in_obj, apply_obj, attr_access='__name__', attr_or='apply'):
except AttributeError:
out = getattr(in_obj, attr_or)(apply_obj)
return out
\ No newline at end of file
return out
def statPass(datcol: pd.Series,
stat: Callable[[np.array, pd.Series], float],
wnsz: FreqString,
thresh: PositiveFloat,
sub_wnsz: FreqString = None,
sub_thresh: PositiveFloat = None,
min_periods: PositiveInt = None,
comparator: Literal['>', '>=', '==', '<=', '<'] = '<'):
"""
Check `datcol`, if it contains chunks of length `wnsz`, exceeding `thresh` with
regard to `stat` and `comparator`:
(check, if: `stat`(*chunk*) `comparator` `thresh`)
If yes, subsequently check, if all (maybe overlapping) *sub-chunks* of *chunk*, with length `sub_wnsz`,
satisfy, `stat`(*chunk*) `comparator` `sub_thresh`
"""
oper = {'>': operator.gt,
'>=': operator.ge,
'==': operator.eq,
'<=': operator.le,
'<': operator.lt}[comparator]
stat_parent = datcol.rolling(wnsz, min_periods=min_periods)
stat_parent = getAttrOrApply(stat_parent, stat)
exceeds = oper(stat_parent, thresh)
if sub_wnsz:
stat_sub = datcol.rolling(sub_wnsz)
stat_sub = getAttrOrApply(stat_sub, stat)
min_stat = stat_sub.rolling(wnsz - sub_wnsz, closed='both').min()
exceeding_sub = oper(min_stat, sub_thresh)
exceeds = exceeding_sub & exceeds
exceeds = exceeding_sub & exceeds
to_set = pd.Series(False, index=exceeds.index)
for g in exceeds.groupby(by=exceeds.values):
if g[0]:
to_set[g[1].index[0] - wnsz:g[1].index[-1]] = True
return to_set
\ No newline at end of file
......@@ -493,3 +493,4 @@ def polynomialInterpolation(data, inter_limit=2, inter_order=2):
return interpolateNANs(
data, "polynomial", inter_limit=inter_limit, order=inter_order
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment