Skip to content
Snippets Groups Projects
Commit 3e683b30 authored by David Schäfer's avatar David Schäfer
Browse files

refactored (and stripped down) the function flagIsolated

parent 0e531a1b
No related branches found
No related tags found
No related merge requests found
......@@ -88,32 +88,21 @@ Note: Only works for datetime indexed data
isolated(window, group_size=1, continuation_range='1min')
```
| parameter | data type | default value | description |
| --------- | --------- | ------------- | ----------- |
| window | [offset string](docs/ParameterDescriptions.md#offset-strings) | | The range, within there are no valid values allowed for a valuegroup to get flagged isolated. See condition (1) and (2). |
| group_size | integer | `1` | The upper bound for the size of a value group to be considered an isolated group. See condition (3). |
| continuation_range | [offset string](docs/ParameterDescriptions.md#offset-strings) | `"1min"` | The upper bound for the temporal extension of a value group to be considered an isolated group. See condition (4). Only relevant if `group_size` > 1. |
The function flags isolated values / value groups.
Isolated values are values / value groups,
that, in a range of `window`,
are surrounded either by already flagged or missing values only.
The function defaults to flag isolated single values only. But the parameters
allow for detections of more complex isolation definitions, including groups
of isolated values.
A continuous group of timeseries values
$`x_{k}, x_{k+1},...,x_{k+n}`$ is considered to be "isolated", if:
1. There are no values, preceeding $`x_{k}`$ within `window` or all the
preceeding values within this range are flagged
2. There are no values, succeeding $`x_{k+n}`$, within `window`, or all the
succeeding values within this range are flagged
3. $`n \leq `$ `group_size`
4. $` |y_{k} - y_{n+k}| < `$ `continuation_range`, with $`y `$, denoting the series
of timestamps associated with $`x`$.
| parameter | data type | default value | description |
|--------------|---------------------------------------------------------------|---------------|------------------------------------------------------------------------|
| group_window | [offset string](docs/ParameterDescriptions.md#offset-strings) | | Maximum size of an isolated group, see condition (1). |
| gap_window | [offset string](docs/ParameterDescriptions.md#offset-strings) | | Minimum size of the gap separating isolated, see condition (2) and (3) |
The function flags arbitrary large groups of values, if they are surrounded by sufficiently
large data gaps. A gap is defined as group of missing and/or flagged values.
A continuous group of values
$`x_{k}, x_{k+1},...,x_{k+n}`$ with timestamps $`t_{k}, t_{k+1}, ..., t_{k+n}`$
is considered to be isolated, if:
1. $` t_{k+n} - t_{k} <= `$ `group_window`
2. None of the values $` x_i, ..., x_{k-1} `$, with $`t_{k-1} - t_{i} >= `$ `gap_window` is valid and unflagged
3. None of the values $` x_{k+n+1}, ..., x_{j} `$, with $`t_{k+n+1} - t_{j} >= `$ `gap_window` is valid and unflagged
### missing
......
......@@ -4,7 +4,7 @@
import numpy as np
import pandas as pd
from saqc.lib.tools import sesonalMask, flagWindow
from saqc.lib.tools import sesonalMask, flagWindow, groupConsecutives
from saqc.funcs.register import register
......@@ -117,57 +117,29 @@ def flagIsolated(
data,
field,
flagger,
window,
group_size=1,
continuation_range="10min",
gap_window,
group_window,
**kwargs,
):
dat_col = data.loc[~flagger.isFlagged(field), field].dropna()
gap_check = dat_col.rolling(window).count()
gap_check = gap_check[(gap_check.index[0] + pd.Timedelta(window)) :]
if group_size == 1:
# isolated single values are much easier to identify:
# exclude series initials:
# reverse rolling trick:
isolated_indices = gap_check[
(gap_check[::-1].rolling(2).sum() == 2)[::-1].values
].index
else:
# check, which groups are centered enough for being isolated
continuation_check = gap_check.rolling(continuation_range).count()
# check which values are sparsely enough surrounded
gap_check = (
gap_check[::-1]
.rolling(2)
.apply(
lambda x: int((x[0] == 1) & (x[1] <= group_size)),
raw=False,
)
)
gap_check = gap_check[::-1] == 1
isolated_indices = gap_check[gap_check].index
# check if the isolated values groups are sufficiently centered:
isolated_indices = isolated_indices[
continuation_check[isolated_indices] <= group_size
]
# propagate True value onto entire isolated group
# NOTE:
# will not work with bfill method, because its not sure the frequency
# grid is actually equidistant - so here comes the rolling reverse
# trick for offset defined windows again
gap_check[:] = np.nan
gap_check.loc[isolated_indices] = True
original_index = gap_check.index
gap_check = gap_check[::-1]
pseudo_increasing_index = gap_check.index[0] - gap_check.index
gap_check.index = pseudo_increasing_index
gap_check = gap_check.rolling(continuation_range).count().notna()[::-1]
isolated_indices = original_index[gap_check.values]
flagger = flagger.setFlags(field, isolated_indices, **kwargs)
gap_window = pd.tseries.frequencies.to_offset(gap_window)
group_window = pd.tseries.frequencies.to_offset(group_window)
col = data[field].mask(flagger.isFlagged(field))
mask = col.isnull()
flags = pd.Series(data=0, index=col.index, dtype=bool)
for srs in groupConsecutives(mask):
if np.all(~srs):
start = srs.index[0]
stop = srs.index[-1]
if stop - start <= group_window:
left = mask[start-gap_window:start].iloc[:-1]
if left.count() and left.all():
right = mask[stop:stop+gap_window].iloc[1:]
if right.count() and right.all():
flags[start:stop] = True
flagger = flagger.setFlags(field, flags, **kwargs)
return data, flagger
......@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
import numbers
from typing import Sequence, Union, Any
from typing import Sequence, Union, Any, Iterator
import numpy as np
import pandas as pd
......@@ -346,3 +346,35 @@ def funcInput_2_func(func):
return STRING_2_FUNC[func]
else:
raise ValueError("Function input not a callable nor a known key to internal the func dictionary.")
@nb.jit(nopython=True, cache=True)
def otherIndex(values: np.ndarray, start: int = 0) -> int:
"""
returns the index of the first non value not equal to values[0]
-> values[start:i] are all identical
"""
val = values[start]
for i in range(start, len(values)):
if values[i] != val:
return i
return -1
def groupConsecutives(series: pd.Series) -> Iterator[pd.Series]:
"""
group consecutive values into distinct pd.Series
"""
index = series.index
values = series.values
target = values[0]
start = 0
while True:
stop = otherIndex(values, start)
if stop == -1:
break
yield pd.Series(data=values[start:stop], index=index[start:stop])
start = stop
......@@ -136,20 +136,20 @@ def test_flagIsolated(data, flagger):
data.iloc[15:17, 0] = np.nan
flagger = flagger.initFlags(data)
flagger = flagger.setFlags(field, iloc=slice(5, 6))
data, flagger = flagIsolated(data, field, flagger, "2.1D")
assert flagger.isFlagged(field)[slice(3, 6, 2)].all()
_, flagger_result = flagIsolated(data, field, flagger, group_window="1D", gap_window="2.1D")
assert flagger_result.isFlagged(field)[slice(3, 6, 2)].all()
flagger = flagger.setFlags(
field, iloc=slice(3, 4), flag=flagger.UNFLAGGED, force=True
)
data, flagger = flagIsolated(
data, flagger_result = flagIsolated(
data,
field,
flagger,
"2.1D",
group_size=2,
flagger_result,
group_window="2D",
gap_window="2.1D",
continuation_range="1.1D",
)
assert flagger.isFlagged(field)[[3, 5, 13, 14]].all()
assert flagger_result.isFlagged(field)[[3, 5, 13, 14]].all()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment