diff --git a/CHANGELOG.md b/CHANGELOG.md index 9120d74be275a9fcb709f60ecd752c3facfbe1b6..a5a67c9c984708a1e52628c87c4959d342dc97ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,7 @@ coming soon ... ## Features - added the data processing module `proc_functions` - `flagCrossValidation` implemented +- CLI: added support for parquet files ## Bugfixes - `spikes_flagRaise` - overestimation of value courses average fixed diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md index d2b6c90afe0af08bbbf435068792faeee668eab1..c034992821df5a9fa9f310748d801ccfeeb3c7f3 100644 --- a/docs/GettingStarted.md +++ b/docs/GettingStarted.md @@ -26,31 +26,34 @@ practice, you will first want to create a new virtual environment that you install SaQC into by typing the following in your console: +##### On Unix/Mac-systems -```sh +```sh # if you have not installed venv yet, do so: -python -m pip install --user virtualenv +python3 -m pip install --user virtualenv # move to the directory where you want to create your virtual environment cd YOURDIR # create virtual environment called "env_saqc" -python -m venv env_saqc - -``` -To activate your virtual environment, you need to type the following: - - -##### On Unix/Mac-systems +python3 -m venv env_saqc -```sh # activate the virtual environment source env_saqc/bin/activate ``` -##### On Windows +##### On Windows-systems ```sh +# if you have not installed venv yet, do so: +py -3 -m pip install --user virtualenv + +# move to the directory where you want to create your virtual environment +cd YOURDIR + +# create virtual environment called "env_saqc" +py -3 -m venv env_saqc + # move to the Scripts directory in "env_saqc" cd env_saqc/Scripts @@ -62,10 +65,20 @@ cd env_saqc/Scripts ### Via PyPI -Type +Type the following: + +##### On Unix/Mac-systems + ```sh -python -m pip install saqc +python3 -m pip install saqc +``` + +##### On Windows-systems + + +```sh +py -3 -m pip install saqc ``` @@ -156,10 +169,16 @@ cd saqc From here, you can run saqc and tell it to run the tests from the toy config-file on the toy dataset via the `-c` and `-d` options: +##### On Unix/Mac-systems ```sh -python -m saqc -c ressources/data/myconfig.csv -d ressources/data/data.csv +python3 -m saqc -c ressources/data/myconfig.csv -d ressources/data/data.csv ``` -If you installed saqc via PYPi, you can omit "python -m". +##### On Windows +```sh +py -3 -m saqc -c ressources/data/myconfig.csv -d ressources/data/data.csv +``` + +If you installed saqc via PYPi, you can omit ```sh python -m```. The command will output this plot: diff --git a/requirements.txt b/requirements.txt index 45132e2397cf607c5b17e106d94eed3a0b0ce753..c68f7d45f0e9971bc27530c8228c7eaee6bed4c9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ importlib-metadata==1.7.0 joblib==0.16.0 llvmlite==0.34.0 mlxtend==0.17.3 -matplotlib==3.3.1 +matplotlib==3.3.2 more-itertools==8.5.0 numba==0.51.2 numpy==1.19.2 diff --git a/saqc/__main__.py b/saqc/__main__.py index e8acb5aff3864117f723be35a8ee3a7f2bc13260..7f32f6a0c65f1e7e7cacb08d7bf1c137664aa021 100644 --- a/saqc/__main__.py +++ b/saqc/__main__.py @@ -1,20 +1,24 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- +import logging +from functools import partial +from pathlib import Path + import click import numpy as np import pandas as pd -import logging +import pyarrow as pa from saqc.core import SaQC from saqc.flagger import CategoricalFlagger from saqc.flagger.dmpflagger import DmpFlagger -import dios logger = logging.getLogger("SaQC") + FLAGGERS = { "numeric": CategoricalFlagger([-1, 0, 1]), "category": CategoricalFlagger(["NIL", "OK", "BAD"]), @@ -30,6 +34,35 @@ def _setup_logging(loglvl): logger.addHandler(handler) +def setupIO(nodata): + reader = { + ".csv" : partial(pd.read_csv, index_col=0, parse_dates=True), + ".parquet" : pd.read_parquet + } + + writer = { + ".csv" : partial(pd.DataFrame.to_csv, header=True, index=True, na_rep=nodata), + ".parquet" : lambda df, outfile: pa.parquet.write_table(pa.Table.from_pandas(df), outfile) + } + return reader, writer + + +def readData(reader_dict, fname): + extension = Path(fname).suffix + reader = reader_dict.get(extension) + if not reader: + raise ValueError(f"Unsupported file format '{extension}', use one of {tuple(READER.keys())}") + return reader(fname) + + +def writeData(writer_dict, df, fname): + extension = Path(fname).suffix + writer = writer_dict.get(extension) + if not writer: + raise ValueError(f"Unsupported file format '{extension}', use one of {tuple(READER.keys())}") + writer(df, fname) + + @click.command() @click.option( "-c", "--config", type=click.Path(exists=True), required=True, help="path to the configuration file", @@ -49,26 +82,26 @@ def _setup_logging(loglvl): def main(config, data, flagger, outfile, nodata, log_level, fail): _setup_logging(log_level) + reader, writer = setupIO(nodata) - data = pd.read_csv(data, index_col=0, parse_dates=True,) - data = dios.DictOfSeries(data) + data = readData(reader, data) saqc = SaQC(flagger=FLAGGERS[flagger], data=data, nodata=nodata, error_policy="raise" if fail else "warn",) data_result, flagger_result = saqc.readConfig(config).getResult() - return if outfile: data_result = data_result.to_df() - flags = flagger_result.getFlags().to_df() + flags = flagger_result.flags.to_df() flags_flagged = flagger_result.isFlagged().to_df() flags_out = flags.where((flags.isnull() | flags_flagged), flagger_result.GOOD) fields = {"data": data_result, "flags": flags_out} if isinstance(flagger_result, DmpFlagger): - fields["comments"] = flagger_result.comments.to_df() - fields["causes"] = flagger_result.causes.to_df() + fields["quality_flag"] = fields.pop("flags") + fields["quality_comment"] = flagger_result.comments.to_df() + fields["quality_cause"] = flagger_result.causes.to_df() out = ( pd.concat(fields.values(), axis=1, keys=fields.keys()) @@ -76,7 +109,7 @@ def main(config, data, flagger, outfile, nodata, log_level, fail): .sort_index(axis=1, level=0, sort_remaining=False) ) out.columns = out.columns.rename(["", ""]) - out.to_csv(outfile, header=True, index=True, na_rep=nodata) + writeData(writer, out, outfile) if __name__ == "__main__": diff --git a/saqc/core/core.py b/saqc/core/core.py index d00752a8edd058b8b44a955bdc41153f448d78de..0d86af504539443cca2ef59c7b6d384d677cc9b7 100644 --- a/saqc/core/core.py +++ b/saqc/core/core.py @@ -144,6 +144,15 @@ class SaQC: out = out._wrap(func, lineno=lineno, expr=expr)(**kwargs) return out + def _expandFields(self, func_dump, variables): + if not func_dump["regex"]: + return [func_dump] + + out = [] + for field in variables[variables.str.match(func_dump["field"])]: + out.append({**func_dump, "field": field}) + return out + def evaluate(self): """ Realize all the registered calculations and return a updated SaQC Object @@ -161,38 +170,39 @@ class SaQC: data, flagger = self._data, self._flagger for func_dump in self._to_call: - func_name = func_dump['func_name'] - func_kws = func_dump['func_kws'] - field = func_dump['field'] - plot = func_dump["ctrl_kws"]["plot"] - logger.debug(f"processing: {field}, {func_name}, {func_kws}") - - try: - t0 = timeit.default_timer() - data_result, flagger_result = _saqcCallFunc(func_dump, data, flagger) - - except Exception as e: - t1 = timeit.default_timer() - logger.debug(f"{func_name} failed after {t1 - t0} sec") - _handleErrors(e, func_dump, self._error_policy) - continue - else: - t1 = timeit.default_timer() - logger.debug(f"{func_name} finished after {t1 - t0} sec") - - if plot: - plotHook( - data_old=data, - data_new=data_result, - flagger_old=flagger, - flagger_new=flagger_result, - sources=[], - targets=[field], - plot_name=func_name, - ) - - data = data_result - flagger = flagger_result + for func_dump in self._expandFields(func_dump, data.columns.union(flagger._flags.columns)): + func_name = func_dump['func_name'] + func_kws = func_dump['func_kws'] + field = func_dump['field'] + plot = func_dump["ctrl_kws"]["plot"] + logger.debug(f"processing: {field}, {func_name}, {func_kws}") + + try: + t0 = timeit.default_timer() + data_result, flagger_result = _saqcCallFunc(func_dump, data, flagger) + + except Exception as e: + t1 = timeit.default_timer() + logger.debug(f"{func_name} failed after {t1 - t0} sec") + _handleErrors(e, func_dump, self._error_policy) + continue + else: + t1 = timeit.default_timer() + logger.debug(f"{func_name} finished after {t1 - t0} sec") + + if plot: + plotHook( + data_old=data, + data_new=data_result, + flagger_old=flagger, + flagger_new=flagger_result, + sources=[], + targets=[field], + plot_name=func_name, + ) + + data = data_result + flagger = flagger_result if any([fdump["ctrl_kws"]["plot"] for fdump in self._to_call]): plotAllHook(data, flagger) @@ -217,7 +227,7 @@ class SaQC: def _wrap(self, func_name, lineno=None, expr=None): def inner(field: str, *args, regex: bool = False, to_mask=None, plot=False, inplace=False, **kwargs): - fields = [field] if not regex else self._data.columns[self._data.columns.str.match(field)] + # fields = [field] if not regex else self._data.columns[self._data.columns.str.match(field)] kwargs.setdefault('nodata', self._nodata) @@ -238,13 +248,16 @@ class SaQC: "func_args": args, "func_kws": kwargs, "ctrl_kws": ctrl_kws, + "field": field, + "regex": regex, } out = self if inplace else self.copy() + out._to_call.append(func_dump) - for field in fields: - dump_copy = {**func_dump, "field": field} - out._to_call.append(dump_copy) + # for field in fields: + # dump_copy = {**func_dump, "field": field} + # out._to_call.append(dump_copy) return out return inner diff --git a/saqc/flagger/dmpflagger.py b/saqc/flagger/dmpflagger.py index 0a0acd9e37fe6b7a5b52dc5382c1303fda61dd74..5df4d9de146bd2f86fd301d3d5c0df0befa9f3c6 100644 --- a/saqc/flagger/dmpflagger.py +++ b/saqc/flagger/dmpflagger.py @@ -105,12 +105,16 @@ class DmpFlagger(CategoricalFlagger): else: return self._construct_new(flags, causes, comments) - def setFlags(self, field, loc=None, flag=None, force=False, comment="", cause="", inplace=False, **kwargs): + def setFlags(self, field, loc=None, flag=None, force=False, comment="", cause="OTHER", inplace=False, **kwargs): assert "iloc" not in kwargs, "deprecated keyword, iloc" assertScalar("field", field, optional=False) flag = self.BAD if flag is None else flag - comment = json.dumps(dict(comment=comment, commit=self.project_version, test=kwargs.get("func_name", ""))) + comment = json.dumps( + {"comment": comment, + "commit": self.project_version, + "test": kwargs.get("func_name", "")} + ) if force: row_indexer = loc diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index e13e6e38e834d5616c06ec600bf8c651c58a8d7b..1bac19c57ca43dc6f00c91952ebb4e3070bcde5a 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -212,10 +212,10 @@ def flagGeneric(data, field, flagger, func, nodata=np.nan, **kwargs): if not np.issubdtype(mask.dtype, np.bool_): raise TypeError(f"generic expression does not return a boolean array") - if flagger.getFlags(field).empty: - flagger = flagger.merge( - flagger.initFlags( - data=pd.Series(name=field, index=mask.index, dtype=np.float64))) + # if flagger.getFlags(field).empty: + # flagger = flagger.merge( + # flagger.initFlags( + # data=pd.Series(name=field, index=mask.index, dtype=np.float64))) flagger = flagger.setFlags(field, mask, **kwargs) return data, flagger diff --git a/saqc/funcs/spikes_detection.py b/saqc/funcs/spikes_detection.py index fbfb4289be2ae51c46ced5e6017526961da18ed4..65ddbd58cbebe3010af966b3a3bf9a9346bd969d 100644 --- a/saqc/funcs/spikes_detection.py +++ b/saqc/funcs/spikes_detection.py @@ -16,7 +16,8 @@ from saqc.lib.tools import ( offset2seconds, slidingWindowIndices, findIndex, - toSequence + toSequence, + customRolling ) from outliers import smirnov_grubbs @@ -870,7 +871,7 @@ def spikes_flagMad(data, field, flagger, window, z=3.5, **kwargs): @register(masking='field') -def spikes_flagBasic(data, field, flagger, thresh=7, tolerance=0, window="15min", **kwargs): +def spikes_flagBasic(data, field, flagger, thresh, tolerance, window, numba_kickin=200000, **kwargs): """ A basic outlier test that is designed to work for harmonized and not harmonized data. @@ -902,6 +903,11 @@ def spikes_flagBasic(data, field, flagger, thresh=7, tolerance=0, window="15min" Maximum difference between pre-spike and post-spike values. See condition (2) window : str, default '15min' Maximum length of "spiky" value courses. See condition (3) + numba_kickin : int, default 200000 + When there are detected more than `numba_kickin` incidents of potential spikes, + the pandas.rolling - part of computation gets "jitted" with numba. + Default value hast proven to be around the break even point between "jit-boost" and "jit-costs". + Returns ------- @@ -922,54 +928,45 @@ def spikes_flagBasic(data, field, flagger, thresh=7, tolerance=0, window="15min" dataseries = data[field].dropna() # get all the entries preceding a significant jump - pre_jumps = dataseries.diff(periods=-1).abs() > thresh - pre_jumps = pre_jumps[pre_jumps] - if pre_jumps.empty: + post_jumps = dataseries.diff().abs() > thresh + post_jumps = post_jumps[post_jumps] + if post_jumps.empty: return data, flagger # get all the entries preceeding a significant jump and its successors within "length" range - to_roll = pre_jumps.reindex(dataseries.index, method="ffill", tolerance=window, fill_value=False).dropna() + to_roll = post_jumps.reindex(dataseries.index, method="bfill", tolerance=window, fill_value=False).dropna() # define spike testing function to roll with: - def spike_tester(chunk, pre_jumps_index, thresh, tol): - if not chunk.index[-1] in pre_jumps_index: + def spike_tester(chunk, thresh=thresh, tol=tolerance): + # signum change!!! + chunk_stair = (np.abs(chunk - chunk[-1]) < thresh)[::-1].cumsum() + initial = np.searchsorted(chunk_stair, 2) + if initial == len(chunk): return 0 + if np.abs(chunk[- initial - 1] - chunk[-1]) < tol: + return initial - 1 else: - # signum change!!! - chunk_stair = (abs(chunk - chunk[-1]) < thresh)[::-1].cumsum() - first_return = chunk_stair[(chunk_stair == 2)] - if first_return.sum() == 0: - return 0 - if abs(chunk[first_return.index[0]] - chunk[-1]) < tol: - return (chunk_stair == 1).sum() - 1 - else: - return 0 + return 0 - # since .rolling does neither support windows, defined by left starting points, nor rolling over monotonically - # decreasing indices, we have to trick the method by inverting the timeseries and transforming the resulting index - # to pseudo-increase. to_roll = dataseries[to_roll] - original_index = to_roll.index - to_roll = to_roll[::-1] - pre_jump_reversed_index = to_roll.index[0] - pre_jumps.index - to_roll.index = to_roll.index[0] - to_roll.index - - # now lets roll: - to_roll = ( - to_roll.rolling(window, closed="both") - .apply(spike_tester, args=(pre_jump_reversed_index, thresh, tolerance), raw=False) - .astype(int) - ) - # reconstruct original index and sequence - to_roll = to_roll[::-1] - to_roll.index = original_index - to_write = to_roll[to_roll != 0] - to_flag = pd.Index([]) - # here comes a loop...): - for row in to_write.iteritems(): - loc = to_roll.index.get_loc(row[0]) - to_flag = to_flag.append(to_roll.iloc[loc + 1 : loc + row[1] + 1].index) - - to_flag = to_flag.drop_duplicates(keep="first") + roll_mask = pd.Series(False, index=to_roll.index) + roll_mask[post_jumps.index] = True + engine=None + if roll_mask.sum() > numba_kickin: + engine = 'numba' + result = customRolling(to_roll, window, spike_tester, roll_mask, closed='both', engine=engine) + group_col = np.nancumsum(result) + group_frame = pd.DataFrame({'group_col': group_col[:-1], + 'diff_col': np.diff(group_col).astype(int)}, + index=result.index[:-1]) + + groups = group_frame.groupby('group_col') + + def g_func(x): + r = np.array([False] * x.shape[0]) + r[-x[-1]:] = True + return r + + to_flag = groups['diff_col'].transform(g_func) flagger = flagger.setFlags(field, to_flag, **kwargs) return data, flagger diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index d8383172f5db724609445f0ad2b07ea55fae0a9d..8d21d8dcd95b2ebce770f2c329cdacb9211b5fa8 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -9,6 +9,8 @@ import numba as nb import pandas as pd import logging import dios +from pandas.api.indexers import BaseIndexer +from pandas._libs.window.indexers import calculate_variable_window_bounds # from saqc.flagger import BaseFlagger @@ -357,3 +359,94 @@ def mutateIndex(index, old_name, new_name): index = index.insert(pos, new_name) return index + +class FreqIndexer(BaseIndexer): + def get_window_bounds(self, num_values, min_periods, center, closed): + start, end = calculate_variable_window_bounds(num_values, self.window_size, min_periods, center, closed, + self.index_array) + end[~self.win_points] = 0 + start[~self.win_points] = 0 + return start, end + + +class PeriodsIndexer(BaseIndexer): + def get_window_bounds(self, num_values, min_periods, center, closed): + start_s = np.zeros(self.window_size, dtype="int64") + start_e = ( + np.arange(self.window_size, num_values, dtype="int64") + - self.window_size + + 1 + ) + start = np.concatenate([start_s, start_e])[:num_values] + + end_s = np.arange(self.window_size, dtype="int64") + 1 + end_e = start_e + self.window_size + end = np.concatenate([end_s, end_e])[:num_values] + start[~self.win_points] = 0 + end[~self.win_points] = 0 + return start, end + + +def customRolling(to_roll, winsz, func, roll_mask, min_periods=1, center=False, closed=None, raw=True, engine=None): + """ + A wrapper around pandas.rolling.apply(), that allows for skipping func application on + arbitrary selections of windows. + + Parameters + ---------- + to_roll : pandas.Series + Timeseries to be "rolled over". + winsz : {int, str} + Gets passed on to the window-size parameter of pandas.Rolling. + func : Callable + Function to be rolled with. + roll_mask : numpy.array[bool] + A mask, indicating the rolling windows, `func` shall be applied on. + Has to be of same length as `to_roll`. + roll_mask[i] = False indicates, that the window with right end point to_roll.index[i] shall + be skipped. + min_periods : int, default 1 + Gets passed on to the min_periods parameter of pandas.Rolling. + (Note, that rolling with freq string defined window size and `min_periods`=None, + results in nothing being computed due to some inconsistencies in the interplay of pandas.rolling and its + indexer.) + center : bool, default False + Gets passed on to the center parameter of pandas.Rolling. + closed : {None, 'left', 'right', 'both'}, default None + Gets passed on to the closed parameter of pandas.Rolling. + raw : bool, default True + Gets passed on to the raw parameter of pandas.Rolling.apply. + engine : {None, 'numba'}, default None + Gets passed on to the engine parameter of pandas.Rolling.apply. + + Returns + ------- + result : pandas.Series + The result of the rolling application. + + """ + i_roll = to_roll.copy() + i_roll.index = np.arange(to_roll.shape[0]) + if isinstance(winsz, str): + winsz = int(pd.Timedelta(winsz).total_seconds()*10**9) + indexer = FreqIndexer(window_size=winsz, + win_points=roll_mask, + index_array=to_roll.index.to_numpy(int), + center=center, + closed=closed) + + elif isinstance(winsz, int): + indexer = PeriodsIndexer(window_size=winsz, + win_points=roll_mask, + center=center, + closed=closed) + + i_roll = i_roll.rolling(indexer, + min_periods=min_periods, + center=center, + closed=closed).apply(func, raw=raw, engine=engine) + + return pd.Series(i_roll.values, index=to_roll.index) + + + diff --git a/sphinx-doc/requirements_sphinx.txt b/sphinx-doc/requirements_sphinx.txt index 8c284aedaad5c1ca6bc481b58ff649dbae265ec0..2480e078d388b2b7c069e83be7490652d71707ad 100644 --- a/sphinx-doc/requirements_sphinx.txt +++ b/sphinx-doc/requirements_sphinx.txt @@ -22,7 +22,7 @@ kiwisolver==1.1.0 llvmlite==0.34.0 Markdown==3.2.2 MarkupSafe==1.1.1 -matplotlib==3.3.1 +matplotlib==3.3.2 mlxtend==0.17.2 more-itertools==8.5.0 numba==0.51.2 diff --git a/test/core/test_reader.py b/test/core/test_reader.py index fffb6418b5bacf56e26be7b1c6f04bce8a438fa7..996429274d57badfc00e18f45438ad86339fc998 100644 --- a/test/core/test_reader.py +++ b/test/core/test_reader.py @@ -47,7 +47,8 @@ def test_variableRegex(data): for regex, expected in tests: fobj = writeIO(header + "\n" + f"{regex} ; flagDummy()") saqc = SaQC(SimpleFlagger(), data).readConfig(fobj) - result = [f["field"] for f in saqc._to_call] + expansion = saqc._expandFields(saqc._to_call[0], data.columns) + result = [f["field"] for f in expansion] assert np.all(result == expected) @@ -104,14 +105,18 @@ def test_configFile(data): def test_configChecks(data): - var1, var2, var3, *_ = data.columns + var1, _, var3, *_ = data.columns + + @register(masking="none") + def flagFunc(data, field, flagger, arg, opt_arg=None, **kwargs): + return data, flagger header = f"{F.VARNAME};{F.TEST}" tests = [ - (f"{var1};flagRange(mn=0)", TypeError), # bad argument name - (f"{var1};flagRange(min=0)", TypeError), # not enough arguments + (f"{var1};flagFunc(mn=0)", TypeError), # bad argument name + (f"{var1};flagFunc()", TypeError), # not enough arguments (f"{var3};flagNothing()", NameError), # unknown function - (";flagRange(min=3)", SyntaxError), # missing variable + (";flagFunc(min=3)", SyntaxError), # missing variable (f"{var1};", SyntaxError), # missing test (f"{var1}; min", TypeError), # not a function call ] diff --git a/test/funcs/test_functions.py b/test/funcs/test_functions.py index 777a60425f64d7e3414fc4605404d8fcb0de4ce2..69e0feefb68cd3caf010c24bbf65f5632b2e2a72 100644 --- a/test/funcs/test_functions.py +++ b/test/funcs/test_functions.py @@ -117,7 +117,7 @@ def test_flagIsolated(data, flagger): data, flagger_result = flagIsolated( data, field, flagger_result, group_window="2D", gap_window="2.1D", continuation_range="1.1D", - ) + )/home/luenensc assert flagger_result.isFlagged(field)[[3, 5, 13, 14]].all() diff --git a/test/funcs/test_spikes_detection.py b/test/funcs/test_spikes_detection.py index 7487cc4a4c45338f82cb396f67820e4fe4ab4a6d..cfdeb79b0a6a5f612f3b2c5a88cdd1e8fdaa61c6 100644 --- a/test/funcs/test_spikes_detection.py +++ b/test/funcs/test_spikes_detection.py @@ -76,7 +76,7 @@ def test_flagSpikesBasic(spiky_data, flagger): data = spiky_data[0] field, *_ = data.columns flagger = flagger.initFlags(data) - data, flagger_result = spikes_flagBasic(data, field, flagger, thresh=60, tolerance=10, window_size="20min") + data, flagger_result = spikes_flagBasic(data, field, flagger, thresh=60, tolerance=10, window="20min") flag_result = flagger_result.getFlags(field) test_sum = (flag_result[spiky_data[1]] == flagger.BAD).sum() assert test_sum == len(spiky_data[1])