From 20caaeb2f1395d571a3b66f489475e57d3f7ee63 Mon Sep 17 00:00:00 2001 From: Bert Palm <bert.palm@ufz.de> Date: Thu, 23 Feb 2023 17:09:53 +0100 Subject: [PATCH] pepare dios replacement --- saqc/core/translation/dmpscheme.py | 3 +- saqc/core/translation/positionalscheme.py | 2 +- saqc/funcs/curvefit.py | 2 +- saqc/funcs/drift.py | 24 ++++---- saqc/funcs/generic.py | 10 ++-- saqc/funcs/outliers.py | 14 ++--- saqc/funcs/resampling.py | 8 +-- saqc/funcs/scores.py | 15 ++--- saqc/funcs/tools.py | 7 ++- saqc/funcs/transformation.py | 6 +- saqc/lib/tools.py | 39 +++++++++++- tests/common.py | 4 +- tests/funcs/test_constants_detection.py | 3 +- tests/funcs/test_functions.py | 16 ++--- tests/funcs/test_generic_api_functions.py | 62 ++++++++++++++------ tests/funcs/test_generic_config_functions.py | 2 +- tests/fuzzy/test_masking.py | 2 +- 17 files changed, 136 insertions(+), 83 deletions(-) diff --git a/saqc/core/translation/dmpscheme.py b/saqc/core/translation/dmpscheme.py index bf35b7b89..6b9a9a5d3 100644 --- a/saqc/core/translation/dmpscheme.py +++ b/saqc/core/translation/dmpscheme.py @@ -17,6 +17,7 @@ import pandas as pd from saqc import BAD, DOUBTFUL, GOOD, UNFLAGGED from saqc.core import Flags, History from saqc.core.translation.basescheme import BackwardMap, ForwardMap, MappingScheme +from saqc.lib.tools import getUnionIndex _QUALITY_CAUSES = [ "", @@ -133,7 +134,7 @@ class DmpScheme(MappingScheme): tflags = super().toExternal(flags, attrs=attrs) out = pd.DataFrame( - index=reduce(lambda x, y: x.union(y), tflags.indexes).sort_values(), + index=getUnionIndex(tflags), columns=pd.MultiIndex.from_product([flags.columns, _QUALITY_LABELS]), ) diff --git a/saqc/core/translation/positionalscheme.py b/saqc/core/translation/positionalscheme.py index f503b0fe3..fffc98834 100644 --- a/saqc/core/translation/positionalscheme.py +++ b/saqc/core/translation/positionalscheme.py @@ -91,7 +91,7 @@ class PositionalScheme(MappingScheme): for field in flags.columns: thist = flags.history[field].hist.replace(self._BACKWARD).astype(float) # concatenate the single flag values - ncols = thist.shape[-1] + ncols = len(thist.columns) init = 9 * 10**ncols bases = 10 ** np.arange(ncols - 1, -1, -1) diff --git a/saqc/funcs/curvefit.py b/saqc/funcs/curvefit.py index c0a86fd51..865550c8b 100644 --- a/saqc/funcs/curvefit.py +++ b/saqc/funcs/curvefit.py @@ -212,7 +212,7 @@ def _fitPolynomial( window = int(window - 1) if min_periods is None: min_periods = window - if to_fit.shape[0] < 200000: + if len(to_fit) < 200000: numba = False else: numba = True diff --git a/saqc/funcs/drift.py b/saqc/funcs/drift.py index d3555ac51..2545b7d12 100644 --- a/saqc/funcs/drift.py +++ b/saqc/funcs/drift.py @@ -138,12 +138,12 @@ class DriftMixin: """ fields = toSequence(field) - data_to_flag = self._data[fields].to_df() - data_to_flag.dropna(inplace=True) + data = self._data[fields].to_df() + data.dropna(inplace=True) - segments = data_to_flag.groupby(pd.Grouper(freq=freq)) + segments = data.groupby(pd.Grouper(freq=freq)) for segment in segments: - if segment[1].shape[0] <= 1: + if len(segment[1]) <= 1: continue drifters = detectDeviants( @@ -223,11 +223,11 @@ class DriftMixin: if reference not in fields: fields.append(reference) - data_to_flag = self._data[fields].to_df().dropna() + data = self._data[fields].to_df().dropna() - segments = data_to_flag.groupby(pd.Grouper(freq=freq)) + segments = data.groupby(pd.Grouper(freq=freq)) for segment in segments: - if segment[1].shape[0] <= 1: + if len(segment[1]) <= 1: continue for i in range(len(fields)): @@ -346,7 +346,7 @@ class DriftMixin: drift_frame = pd.DataFrame(d, index=to_correct_clean.index) # group the drift frame - for k in range(0, maint_data.shape[0] - 1): + for k in range(0, len(maint_data) - 1): # assign group numbers for the timespans in between one maintenance ending and the beginning of the next # maintenance time itself remains np.nan assigned drift_frame.loc[ @@ -460,13 +460,13 @@ class DriftMixin: first_valid = np.array( [ ~pd.isna(para_dict[unique_successive[i]]).any() - for i in range(0, unique_successive.shape[0]) + for i in range(0, len(unique_successive)) ] ) first_valid = np.where(first_normal & first_valid)[0][0] last_valid = 1 - for k in range(0, unique_successive.shape[0]): + for k in range(0, len(unique_successive)): if unique_successive[k] < 0 & ( not pd.isna(para_dict[unique_successive[k]]).any() ): @@ -757,8 +757,8 @@ def _assignRegimeAnomaly( plateaus = detectDeviants(cluster_dios, metric, spread, frac, method, "samples") if set_flags: - for p in plateaus: - flags[cluster_dios.iloc[:, p].index, field] = flag + for p, cols in zip(plateaus, cluster_dios.columns[plateaus]): + flags[cluster_dios[cols].index, field] = flag if set_cluster: for p in plateaus: diff --git a/saqc/funcs/generic.py b/saqc/funcs/generic.py index e70b4c17e..803edfdaf 100644 --- a/saqc/funcs/generic.py +++ b/saqc/funcs/generic.py @@ -15,7 +15,7 @@ import pandas as pd from saqc import BAD, FILTER_ALL from saqc.core import DictOfSeries, Flags, History, register from saqc.core.register import _maskData -from saqc.lib.tools import isflagged, toSequence +from saqc.lib.tools import isAllBoolean, isflagged, toSequence from saqc.lib.types import GenericFunction, PandasLike from saqc.parsing.environ import ENVIRONMENT @@ -51,7 +51,7 @@ def _prepare( for f in fchunk.columns: fchunk.history[f] = flags.history[f] dchunk, _ = _maskData( - data=data.loc[:, columns].copy(), flags=fchunk, columns=columns, thresh=dfilter + data=data[columns].copy(), flags=fchunk, columns=columns, thresh=dfilter ) return dchunk, fchunk.copy() @@ -169,7 +169,7 @@ class GenericMixin: # update data & flags for i, col in enumerate(targets): - datacol = result.iloc[:, i] + datacol = result[result.columns[i]] self._data[col] = datacol if col not in self._flags: @@ -272,7 +272,7 @@ class GenericMixin: f"the generic function returned {len(result.columns)} field(s), but only {len(targets)} target(s) were given" ) - if not result.empty and not (result.dtypes == bool).all(): + if not result.empty and not isAllBoolean(result): raise TypeError(f"generic expression does not return a boolean array") meta = { @@ -288,7 +288,7 @@ class GenericMixin: # update flags & data for i, col in enumerate(targets): - maskcol = result.iloc[:, i] + maskcol = result[result.columns[i]] # make sure the column exists if col not in self._flags: diff --git a/saqc/funcs/outliers.py b/saqc/funcs/outliers.py index 182205ec3..03c50820f 100644 --- a/saqc/funcs/outliers.py +++ b/saqc/funcs/outliers.py @@ -131,14 +131,14 @@ class OutliersMixin: return self if not window: - window = scores.shape[0] + window = len(scores) if isinstance(window, str): partitions = scores.groupby(pd.Grouper(freq=window)) else: grouper_series = pd.Series( - data=np.arange(0, scores.shape[0]), index=scores.index + data=np.arange(0, len(scores)), index=scores.index ) grouper_series = grouper_series.transform( lambda x: int(np.floor(x / window)) @@ -147,10 +147,10 @@ class OutliersMixin: # calculate flags for every partition for _, partition in partitions: - if partition.empty | (partition.shape[0] < min_periods): + if partition.empty | (len(partition) < min_periods): continue - sample_size = partition.shape[0] + sample_size = len(partition) sorted_i = partition.values.argsort() resids = partition.values[sorted_i] @@ -931,7 +931,7 @@ class OutliersMixin: # period number defined test intervals if isinstance(window, int): grouper_series = pd.Series( - data=np.arange(0, datcol.shape[0]), index=datcol.index + data=np.arange(0, len(datcol)), index=datcol.index ) grouper_series_lagged = grouper_series + (window / 2) grouper_series = grouper_series.transform(lambda x: x // window) @@ -1036,7 +1036,7 @@ class OutliersMixin: fields = toSequence(field) - df = self._data[fields].loc[self._data[fields].index_of("shared")].to_df() + df = self._data[fields].to_df(how="inner") if isinstance(method, str): if method == "modZscore": @@ -1265,7 +1265,7 @@ def _evalStrayLabels( if reduction_drop_flagged: test_slice = test_slice.drop(to_flag_frame.index, errors="ignore") - if test_slice.shape[0] < reduction_min_periods: + if len(test_slice) < reduction_min_periods: to_flag_frame.loc[index[1], var] = True continue diff --git a/saqc/funcs/resampling.py b/saqc/funcs/resampling.py index 4d2a59e96..ebc02bd4b 100644 --- a/saqc/funcs/resampling.py +++ b/saqc/funcs/resampling.py @@ -270,13 +270,9 @@ class ResamplingMixin: datcol = self._data[field] - # workaround for #GL-333 if datcol.empty: - if self._data.itype is None: - index = pd.DatetimeIndex([]) - else: - index = self._data.itype.min_pdindex - datcol = pd.Series(index=index, dtype=datcol.dtype) + # see for #GL-374 + datcol = pd.Series(index=pd.DatetimeIndex([]), dtype=datcol.dtype) freq = evalFreqStr(freq, freq_check, datcol.index) diff --git a/saqc/funcs/scores.py b/saqc/funcs/scores.py index 1846cab2b..9e3d0bc4b 100644 --- a/saqc/funcs/scores.py +++ b/saqc/funcs/scores.py @@ -180,11 +180,8 @@ class ScoresMixin: target = target[0] fields = toSequence(field) - val_frame = self._data[fields].copy() - score_index = val_frame.index_of("shared") - score_ser = pd.Series(np.nan, index=score_index, name=target) - - val_frame = val_frame.loc[val_frame.index_of("shared")].to_df() + val_frame = self._data[fields].copy().to_df(how="inner") + score_ser = pd.Series(np.nan, index=val_frame.index, name=target) val_frame.dropna(inplace=True) if val_frame.empty: @@ -192,23 +189,23 @@ class ScoresMixin: # partitioning if not freq: - freq = val_frame.shape[0] + freq = len(val_frame.index) if isinstance(freq, str): grouper = pd.Grouper(freq=freq) else: grouper = pd.Series( - data=np.arange(0, val_frame.shape[0]), index=val_frame.index + data=np.arange(0, len(val_frame)), index=val_frame.index ) grouper = grouper.transform(lambda x: int(np.floor(x / freq))) partitions = val_frame.groupby(grouper) for _, partition in partitions: - if partition.empty or (partition.shape[0] < min_periods): + if partition.empty or (len(partition) < min_periods): continue - sample_size = partition.shape[0] + sample_size = len(partition) nn_neighbors = min(n, max(sample_size, 2) - 1) dist, *_ = kNN( partition.values, nn_neighbors, algorithm=method, metric=metric, p=p diff --git a/saqc/funcs/tools.py b/saqc/funcs/tools.py index 1036b7207..d8307a459 100644 --- a/saqc/funcs/tools.py +++ b/saqc/funcs/tools.py @@ -214,13 +214,14 @@ class ToolsMixin: mask = periodicMask(datcol_idx, start, end, ~closed) elif mode == "selection_field": idx = self._data[selection_field].index.intersection(datcol_idx) - mask = self._data.loc[idx, selection_field] + mask = self._data[selection_field].loc[idx] else: raise ValueError( "Keyword passed as masking mode is unknown ({})!".format(mode) ) - self._data.aloc[mask, field] = np.nan + mask = mask.reindex(self._data[field].index, fill_value=False).astype(bool) + self._data[field].loc[mask] = np.nan self._flags[mask, field] = UNFLAGGED return self @@ -301,7 +302,7 @@ class ToolsMixin: level = kwargs.get("flag", UNFLAGGED) if dfilter < np.inf: - data.loc[flags[field] >= dfilter, field] = np.nan + data[field].loc[flags[field] >= dfilter] = np.nan if store_kwargs is None: store_kwargs = {} diff --git a/saqc/funcs/transformation.py b/saqc/funcs/transformation.py index 0fe642131..1e4121b84 100644 --- a/saqc/funcs/transformation.py +++ b/saqc/funcs/transformation.py @@ -52,14 +52,12 @@ class TransformationMixin: val_ser = self._data[field].copy() # partitioning if not freq: - freq = val_ser.shape[0] + freq = len(val_ser) if isinstance(freq, str): grouper = pd.Grouper(freq=freq) else: - grouper = pd.Series( - data=np.arange(0, val_ser.shape[0]), index=val_ser.index - ) + grouper = pd.Series(data=np.arange(0, len(val_ser)), index=val_ser.index) grouper = grouper.transform(lambda x: int(np.floor(x / freq))) partitions = val_ser.groupby(grouper) diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index 3a454bd18..8bf96b839 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -8,10 +8,11 @@ from __future__ import annotations import collections +import functools import itertools import re import warnings -from typing import Callable, Collection, List, Sequence, TypeVar, Union +from typing import Any, Callable, Collection, List, Sequence, TypeVar, Union import numpy as np import pandas as pd @@ -374,7 +375,9 @@ def detectDeviants( dist_mat = np.zeros((var_num, var_num)) combs = list(itertools.combinations(range(0, var_num), 2)) for i, j in combs: - dist = metric(data.iloc[:, i].values, data.iloc[:, j].values) + d_i = data[data.columns[i]] + d_j = data[data.columns[j]] + dist = metric(d_i.values, d_j.values) dist_mat[i, j] = dist condensed = np.abs(dist_mat[tuple(zip(*combs))]) @@ -386,7 +389,8 @@ def detectDeviants( elif population == "samples": counts = {cluster[j]: 0 for j in range(0, var_num)} for c in range(var_num): - counts[cluster[c]] += data.iloc[:, c].dropna().shape[0] + field = data.columns[c] + counts[cluster[c]] += data[field].dropna().shape[0] pop_num = np.sum(list(counts.values())) else: raise ValueError( @@ -557,3 +561,32 @@ def isflagged(flagscol: A, thresh: float) -> A: return flagscol > UNFLAGGED return flagscol >= thresh + + +def getUnionIndex(obj, default: pd.DatetimeIndex | None = None): + assert hasattr(obj, "columns") + if default is None: + default = pd.DatetimeIndex([]) + indexes = [obj[k].index for k in obj.columns] + if indexes: + return functools.reduce(pd.Index.union, indexes).sort_values() + return default + + +def getSharedIndex(obj, default: pd.DatetimeIndex | None = None): + assert hasattr(obj, "columns") + if default is None: + default = pd.DatetimeIndex([]) + indexes = [obj[k].index for k in obj.columns] + if indexes: + return functools.reduce(pd.Index.intersection, indexes).sort_values() + return default + + +def isAllBoolean(obj: Any): + if not hasattr(obj, "columns"): + return pd.api.types.is_bool_dtype(obj) + for c in obj.columns: + if not pd.api.types.is_bool_dtype(obj[c]): + return False + return True diff --git a/tests/common.py b/tests/common.py index c82b5d2bf..819f5d4f3 100644 --- a/tests/common.py +++ b/tests/common.py @@ -22,14 +22,14 @@ def initData( if rows is None: freq = freq or "1h" - di = DictOfSeries(itype="datetime") + di = dict() dates = pd.date_range(start=start_date, end=end_date, freq=freq, periods=rows) dummy = np.arange(len(dates)) for col in range(1, cols + 1): di[f"var{col}"] = pd.Series(data=dummy * col, index=dates) - return di + return DictOfSeries(di) def dummyHistory(hist: pd.DataFrame = None, meta: list = None): diff --git a/tests/funcs/test_constants_detection.py b/tests/funcs/test_constants_detection.py index 29a5a2514..8cbf3290d 100644 --- a/tests/funcs/test_constants_detection.py +++ b/tests/funcs/test_constants_detection.py @@ -19,7 +19,8 @@ def data(): constants_data = initData( 1, start_date="2011-01-01 00:00:00", end_date="2011-01-01 03:00:00", freq="5min" ) - constants_data.iloc[5:25] = 200 + for c in constants_data.columns: + constants_data[c].iloc[5:25] = 200 return constants_data diff --git a/tests/funcs/test_functions.py b/tests/funcs/test_functions.py index 9ae18b532..991481575 100644 --- a/tests/funcs/test_functions.py +++ b/tests/funcs/test_functions.py @@ -53,8 +53,8 @@ def test_flagRange(data, field): def test_flagSesonalRange(data, field): - data.iloc[::2] = 0 - data.iloc[1::2] = 50 + data[field].iloc[::2] = 0 + data[field].iloc[1::2] = 50 nyears = len(data[field].index.year.unique()) tests = [ @@ -89,7 +89,7 @@ def test_flagSesonalRange(data, field): start = f"{test['startmonth']:02}-{test['startday']:02}T00:00:00" end = f"{test['endmonth']:02}-{test['endday']:02}T00:00:00" - qc = qc.copyField(field, field + "_masked") + qc = qc.copyField(field, newfield) qc = qc.selectTime( newfield, mode="periodic", @@ -128,12 +128,12 @@ def test_forceFlags(data, field): def test_flagIsolated(data, field): flags = initFlagsLike(data) - d_len = data.shape[0][0] - data.iloc[1:3, 0] = np.nan - data.iloc[4:5, 0] = np.nan + d_len = len(data[field].index) + data[field].iloc[1:3] = np.nan + data[field].iloc[4:5] = np.nan flags[data[field].index[5:6], field] = BAD - data.iloc[11:13, 0] = np.nan - data.iloc[15:17, 0] = np.nan + data[field].iloc[11:13] = np.nan + data[field].iloc[15:17] = np.nan # data flags # 2016-01-01 0.0 -inf diff --git a/tests/funcs/test_generic_api_functions.py b/tests/funcs/test_generic_api_functions.py index e2181e9bd..18eb41240 100644 --- a/tests/funcs/test_generic_api_functions.py +++ b/tests/funcs/test_generic_api_functions.py @@ -106,20 +106,39 @@ def test_overwriteFieldFlagGeneric(data, fields, func): @pytest.mark.parametrize( - "targets, func", + "data, targets, func, expected_data", [ - (["tmp"], lambda x, y: x + y), - (["tmp1", "tmp2"], lambda x, y: (x + y, y * 2)), + ( + DictOfSeries(dict(a=pd.Series([1.0, 2.0]), b=pd.Series([10.0, 20.0]))), + ["t1"], + lambda x, y: x + y, + DictOfSeries( + dict( + a=pd.Series([1.0, 2.0]), + b=pd.Series([10.0, 20.0]), + t1=pd.Series([11.0, 22.0]), + ) + ), + ), + ( + DictOfSeries(dict(a=pd.Series([1.0, 2.0]), b=pd.Series([10.0, 20.0]))), + ["t1", "t2"], + lambda x, y: (x + y, y * 2), + DictOfSeries( + dict( + a=pd.Series([1.0, 2.0]), + b=pd.Series([10.0, 20.0]), + t1=pd.Series([11.0, 22.0]), + t2=pd.Series([20.0, 40.0]), + ) + ), + ), ], ) -def test_writeTargetProcGeneric(data, targets, func): - fields = ["var1", "var2"] +def test_writeTargetProcGeneric(data, targets, func, expected_data): + fields = data.columns.tolist() dfilter = 128 - expected_data = DictOfSeries( - func(*[data[f] for f in fields]), columns=toSequence(targets) - ).squeeze() - expected_meta = { "func": "procGeneric", "args": (fields, targets), @@ -140,7 +159,7 @@ def test_writeTargetProcGeneric(data, targets, func): dfilter=dfilter, label="generic", ) - assert (expected_data == res.data[targets].squeeze()).all(axis=None) + assert (expected_data == res.data).all(axis=None) # check that new histories where created for target in targets: assert res._flags.history[target].hist.iloc[0].isna().all() @@ -148,17 +167,24 @@ def test_writeTargetProcGeneric(data, targets, func): @pytest.mark.parametrize( - "fields, func", + "data, fields, func, expected_data", [ - (["var1"], lambda x: x * 2), - (["var1", "var2"], lambda x, y: (x + y, y * 2)), + ( + DictOfSeries(dict(a=pd.Series([1.0, 2.0]), b=pd.Series([10.0, 20.0]))), + ["a"], + lambda x: x * 2, + DictOfSeries(dict(a=pd.Series([2.0, 4.0]), b=pd.Series([10.0, 20.0]))), + ), + ( + DictOfSeries(dict(a=pd.Series([1.0, 2.0]), b=pd.Series([10.0, 20.0]))), + ["a", "b"], + lambda x, y: (x + y, y * 2), + DictOfSeries(dict(a=pd.Series([11.0, 22.0]), b=pd.Series([20.0, 40.0]))), + ), ], ) -def test_overwriteFieldProcGeneric(data, fields, func): +def test_overwriteFieldProcGeneric(data, fields, func, expected_data): dfilter = 128 - expected_data = DictOfSeries( - func(*[data[f] for f in fields]), columns=fields - ).squeeze() expected_meta = { "func": "procGeneric", @@ -176,7 +202,7 @@ def test_overwriteFieldProcGeneric(data, fields, func): ) res = saqc.processGeneric(field=fields, func=func, dfilter=dfilter, label="generic") - assert (expected_data == res.data[fields].squeeze()).all(axis=None) + assert (expected_data == res.data).all(axis=None) # check that the histories got appended for field in fields: assert (res._flags.history[field].hist[0] == 127.0).all() diff --git a/tests/funcs/test_generic_config_functions.py b/tests/funcs/test_generic_config_functions.py index 0dc9d20ec..194f62d88 100644 --- a/tests/funcs/test_generic_config_functions.py +++ b/tests/funcs/test_generic_config_functions.py @@ -113,7 +113,7 @@ def test_arithmeticOperators(data): def test_nonReduncingBuiltins(data): var1, *_ = data.columns - data = data.iloc[1:10, 0] + data = data[var1].iloc[1:10] flags = Flags({var1: pd.Series(UNFLAGGED, index=data.index)}) tests = [ diff --git a/tests/fuzzy/test_masking.py b/tests/fuzzy/test_masking.py index 472601037..ef7d54e96 100644 --- a/tests/fuzzy/test_masking.py +++ b/tests/fuzzy/test_masking.py @@ -61,7 +61,7 @@ def test_flagsMutationPreventsUnmasking(data_field_flags): data_masked, mask = _maskData(data_in, flags, columns=[field], thresh=UNFLAGGED) flags[:, field] = UNFLAGGED data_out = _unmaskData(data_masked, mask) - assert (data_out.loc[flags[field] == BAD, field].isna()).all(axis=None) + assert (data_out[field].loc[flags[field] == BAD].isna()).all(axis=None) @pytest.mark.slow -- GitLab