diff --git a/README.md b/README.md index 174d76513954fdf3eaf320845be25d86a904b2a0..f8d3d32d4ccb3293695b19bd9e57e55a527f3e98 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ SaQC provides a basic CLI to get you started. As soon as the basic inputs, a dataset and the [configuration file](saqc/docs/ConfigurationFiles.md) are prepared, running SaQC is as simple as: ```sh -python -m saqc \ +saqc \ --config path_to_configuration.txt \ --data path_to_data.csv \ --outfile path_to_output.csv diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md index 710d3b0d977aeb0daa76c1c63f022a4e5e5c4fd6..365509b03cb68f37d6b24f5e60f4535884219479 100644 --- a/docs/GettingStarted.md +++ b/docs/GettingStarted.md @@ -80,7 +80,7 @@ and paste the following lines into it: varname;test;plot SM2;range(min=10, max=60);False - SM2;spikes_simpleMad(winsz="30d", z=3.5);True + SM2;spikes_simpleMad(window="30d", z=3.5);True These lines illustrate how different quality control tests can be specified for different variables by following the pattern: @@ -110,7 +110,7 @@ cd saqc From here, you can run saqc and tell it to run the tests from the toy config-file on the toy dataset via the `-c` and `-d` options: ```sh -python -m saqc -c ressources/data/myconfig.csv -d ressources/data/data.csv +saqc -c ressources/data/myconfig.csv -d ressources/data/data.csv ``` Which will output this plot: @@ -139,7 +139,7 @@ range-test: varname;test;plot SM2;range(min=-20, max=60);False - SM2;spikes_simpleMad(winsz="30d", z=3.5);True + SM2;spikes_simpleMad(window="30d", z=3.5);True Rerunning SaQC as above produces the following plot:  @@ -158,8 +158,8 @@ something like this: varname;test;plot SM1;range(min=10, max=60);False SM2;range(min=10, max=60);False - SM1;spikes_simpleMad(winsz="15d", z=3.5);True - SM2;spikes_simpleMad(winsz="30d", z=3.5);True + SM1;spikes_simpleMad(window="15d", z=3.5);True + SM2;spikes_simpleMad(window="30d", z=3.5);True which gives you separate plots for each line where the plotting option is set to `True` as well as one summary "data plot" that depicts the joint flags from all @@ -203,7 +203,7 @@ If you want the final results to be saved to a csv-file, you can do so by the use of the `-o` option: ```sh -python -m saqc -c ressources/data/config.csv -d ressources/data/data.csv -o ressources/data/out.csv +saqc -c ressources/data/config.csv -d ressources/data/data.csv -o ressources/data/out.csv ``` Which saves a dataframe that contains both the original data and the quality diff --git a/ressources/data/config.csv b/ressources/data/config.csv index d726175bebf2d890abfaa3024dbfa0ec11adf1f1..260bb3e7c33901b9571b6a6f5adce4a1d35f441d 100644 --- a/ressources/data/config.csv +++ b/ressources/data/config.csv @@ -3,4 +3,4 @@ SM2;harmonize_shift2Grid(freq="15Min");True SM1;range(min=10, max=60);False SM2;missing(nodata=NAN);False SM2;range(min=10, max=60);False -SM2;spikes_simpleMad(winsz="30d", z=3.5);True \ No newline at end of file +SM2;spikes_simpleMad(window="30d", z=3.5);True diff --git a/ressources/machine_learning/train_machine_learning.py b/ressources/machine_learning/train_machine_learning.py index 17dbc8535f8d9a2232256a2caf8f3fb20499d3ff..abc87f2e66223979ac35c770b446b7fc9df9688a 100644 --- a/ressources/machine_learning/train_machine_learning.py +++ b/ressources/machine_learning/train_machine_learning.py @@ -7,11 +7,12 @@ import joblib # for saving of model objects import os import time import datetime + ###-------------------- ### EXAMPLE PARAMETRIZATION: ###-------------------- -#pd.options.mode.chained_assignment = None # default='warn' +# pd.options.mode.chained_assignment = None # default='warn' # data = pd.read_feather("data/sm/02_data.feather") # data = data.reset_index()#data.index has to be reset as I use row nos only for indexing # @@ -84,10 +85,7 @@ def trainML( outdata[name + "_Dt_1"].rolling(window_values, center=False).mean() ) # mean gradient t to t-window outdata[name + "_Dt" + str(window_values)] = ( - outdata[name + "_Dt_1"] - .iloc[::-1] - .rolling(window_values, center=False) - .mean()[::-1] + outdata[name + "_Dt_1"].iloc[::-1].rolling(window_values, center=False).mean()[::-1] ) # mean gradient t to t+window return outdata @@ -109,29 +107,19 @@ def trainML( ) # draw random sample sensordf.TeTr[index_test] = "Te" # assign test samples - sensordf["flag_bin_t_1"] = sensordf["flag_bin"] - sensordf["flag_bin"].shift( - 1 - ) # Flag at t-1 - sensordf["flag_bin_t1"] = sensordf["flag_bin"] - sensordf["flag_bin"].shift( - -1 - ) # Flag at t+1 + sensordf["flag_bin_t_1"] = sensordf["flag_bin"] - sensordf["flag_bin"].shift(1) # Flag at t-1 + sensordf["flag_bin_t1"] = sensordf["flag_bin"] - sensordf["flag_bin"].shift(-1) # Flag at t+1 sensordf["flag_bin_t_" + str(window_flags)] = ( sensordf["flag_bin"].rolling(window_flags + 1, center=False).sum() ) # n Flags in interval t to t-window_flags sensordf["flag_bin_t" + str(window_flags)] = ( - sensordf["flag_bin"] - .iloc[::-1] - .rolling(window_flags + 1, center=False) - .sum()[::-1] + sensordf["flag_bin"].iloc[::-1].rolling(window_flags + 1, center=False).sum()[::-1] ) # n Flags in interval t to t+window_flags # forward-orientation not possible, so right-orientation on reversed data an reverse result # Add context information for field+references for i in [field] + references: - sensordf = pd.concat( - [sensordf, _refCalc(reference=sensordf[i], window_values=window_values)], - axis=1, - ) + sensordf = pd.concat([sensordf, _refCalc(reference=sensordf[i], window_values=window_values)], axis=1,) # write back into new dataframe traindata = traindata.append(sensordf) @@ -148,7 +136,7 @@ def trainML( # make column in "traindata" to store predictions traindata = traindata.assign(PredMan=0) outinfo_df = [] - resultfile = open(os.path.join(os.getcwd(),path, modelname + "_resultfile.txt"), "w") + resultfile = open(os.path.join(os.getcwd(), path, modelname + "_resultfile.txt"), "w") starttime = time.time() # For each category of groupvar, fit a separate model @@ -158,26 +146,14 @@ def trainML( print("TRAINING MODEL...") # drop unneeded columns groupdata = traindata[traindata[group_field] == groupvar].drop( - columns=[ - "Time", - "RowIndex", - "Flag", - "flag_bin", - "PredMan", - group_field, - sensor_field, - ] - ) - forest = RandomForestClassifier( - n_estimators=500, random_state=randomseed, oob_score=True, n_jobs=-1 + columns=["Time", "RowIndex", "Flag", "flag_bin", "PredMan", group_field, sensor_field,] ) + forest = RandomForestClassifier(n_estimators=500, random_state=randomseed, oob_score=True, n_jobs=-1) X_tr = groupdata.drop(columns=["TeTr", "FlagMan"])[groupdata.TeTr == "Tr"] Y_tr = groupdata.FlagMan[groupdata.TeTr == "Tr"] forest.fit(y=Y_tr, X=X_tr) # save model object - joblib.dump( - forest, os.path.join(path, modelname + "_" + str(groupvar) + ".pkl") - ) + joblib.dump(forest, os.path.join(path, modelname + "_" + str(groupvar) + ".pkl")) # retrieve training predictions print("PREDICTING...") preds_tr = ( @@ -205,23 +181,16 @@ def trainML( ] resultfile.write("TRAINING RECALL:" + "\n") resultfile.write( - str(recall_score(groupdata.FlagMan[groupdata.TeTr == "Tr"], preds_tr)) - + "\n" + str(recall_score(groupdata.FlagMan[groupdata.TeTr == "Tr"], preds_tr)) + "\n" ) # Training error (Out-of-Bag) resultfile.write("TEST RECALL:" + "\n") resultfile.write( - str(recall_score(groupdata.FlagMan[groupdata.TeTr == "Te"], preds_te)) - + "\n" - + "\n" + str(recall_score(groupdata.FlagMan[groupdata.TeTr == "Te"], preds_te)) + "\n" + "\n" ) # Test error outinfo_df.append(outinfo) # save back to dataframe - traindata.PredMan[ - (traindata.TeTr == "Tr") & (traindata[group_field] == groupvar) - ] = preds_tr - traindata.PredMan[ - (traindata.TeTr == "Te") & (traindata[group_field] == groupvar) - ] = preds_te + traindata.PredMan[(traindata.TeTr == "Tr") & (traindata[group_field] == groupvar)] = preds_tr + traindata.PredMan[(traindata.TeTr == "Te") & (traindata[group_field] == groupvar)] = preds_te endtime = time.time() print("TIME ELAPSED: " + str(datetime.timedelta(seconds=endtime - starttime)) + " hours") @@ -247,21 +216,10 @@ def trainML( # write results back into original "data" dataframe data = data.assign(PredMan=np.nan) - data.PredMan[ - traindata.RowIndex - ] = traindata.PredMan # based on RowIndex as NAs were created in traindata + data.PredMan[traindata.RowIndex] = traindata.PredMan # based on RowIndex as NAs were created in traindata data.to_feather(os.path.join(path, modelname + "_data_preds.feather")) trainML( - data, - field, - references, - sensor_field, - group_field, - window_values, - window_flags, - path, - modelname, - 0.3, + data, field, references, sensor_field, group_field, window_values, window_flags, path, modelname, 0.3, ) diff --git a/saqc/__main__.py b/saqc/__main__.py index c22ec402027c9ef2a87b175969bb8f599d2544d4..dd7b7acd07734697088fab18f461d5e6952d6453 100644 --- a/saqc/__main__.py +++ b/saqc/__main__.py @@ -10,10 +10,7 @@ from saqc.core import run from saqc.flagger import CategoricalFlagger -FLAGGERS = { - "numeric": CategoricalFlagger([-1, 0, 1]), - "category": CategoricalFlagger(["NIL", "OK", "BAD"]) -} +FLAGGERS = {"numeric": CategoricalFlagger([-1, 0, 1]), "category": CategoricalFlagger(["NIL", "OK", "BAD"])} @click.command() @@ -25,26 +22,19 @@ FLAGGERS = { @click.option("--fail/--no-fail", default=True, help="whether to stop the program run on errors") def main(config, data, flagger, outfile, nodata, fail): - data = pd.read_csv( - data, - index_col=0, - parse_dates=True, - ) + data = pd.read_csv(data, index_col=0, parse_dates=True,) data_result, flagger_result = run( config_file=config, flagger=FLAGGERS[flagger], data=data, nodata=nodata, - error_policy="raise" if fail else "warn" + error_policy="raise" if fail else "warn", ) if outfile: flags = flagger_result.getFlags() - flags_out = flags.where( - (flags.isnull() | flagger_result.isFlagged()), - flagger_result.GOOD - ) + flags_out = flags.where((flags.isnull() | flagger_result.isFlagged()), flagger_result.GOOD) cols_out = sum([[c, c + "_flags"] for c in flags_out], []) data_out = data_result.join(flags_out, rsuffix="_flags") data_out[cols_out].to_csv(outfile, header=True, index=True) diff --git a/saqc/core/core.py b/saqc/core/core.py index bfa64a6cbfb5ea829333b1200ae771105429466a..85f9e1e03a1a9b9df5eb7b68f9523192e12d6bfb 100644 --- a/saqc/core/core.py +++ b/saqc/core/core.py @@ -41,9 +41,7 @@ def _checkInput(data, flags, flagger): if not isinstance(flagger, BaseFlagger): flaggerlist = [CategoricalFlagger, SimpleFlagger, DmpFlagger] - raise TypeError( - f"flagger must be of type {flaggerlist} or any inherit class from {BaseFlagger}" - ) + raise TypeError(f"flagger must be of type {flaggerlist} or any inherit class from {BaseFlagger}") if flags is None: return @@ -140,11 +138,7 @@ def run( try: # actually run the tests data_chunk_result, flagger_chunk_result = evalExpression( - func, - data=data_chunk, - field=varname, - flagger=flagger_chunk, - nodata=nodata, + func, data=data_chunk, field=varname, flagger=flagger_chunk, nodata=nodata, ) except Exception as e: _handleErrors(e, configrow, func, error_policy) @@ -152,11 +146,7 @@ def run( if configrow[Fields.PLOT]: plotHook( - data_chunk_result, - flagger_chunk, - flagger_chunk_result, - varname, - func, + data_chunk_result, flagger_chunk, flagger_chunk_result, varname, func, ) # NOTE: @@ -169,4 +159,3 @@ def run( plotAllHook(data, flagger) return data, flagger - diff --git a/saqc/core/evaluator.py b/saqc/core/evaluator.py index 03925b998e8dc0101e4700a95f81040e7b0f7bd1..38b31314f2187c8befc6dffde1493e66075a2de8 100644 --- a/saqc/core/evaluator.py +++ b/saqc/core/evaluator.py @@ -23,13 +23,11 @@ def initLocalEnv(data: pd.DataFrame, field: str, flagger: BaseFlagger, nodata: f "field": field, "this": field, "flagger": flagger, - "NAN": np.nan, "NODATA": nodata, "GOOD": flagger.GOOD, "BAD": flagger.BAD, "UNFLAGGED": flagger.UNFLAGGED, - "ismissing": lambda data: ((data == nodata) | pd.isnull(data)), "isflagged": partial(_dslIsFlagged, flagger), "abs": np.abs, @@ -46,17 +44,25 @@ class DslTransformer(ast.NodeTransformer): SUPPORTED = ( ast.Expression, - ast.UnaryOp, ast.BinOp, - ast.BitOr, ast.BitAnd, + ast.UnaryOp, + ast.BinOp, + ast.BitOr, + ast.BitAnd, ast.Num, ast.Compare, - ast.Add, ast.Sub, - ast.Mult, ast.Div, - ast.Pow, ast.Mod, + ast.Add, + ast.Sub, + ast.Mult, + ast.Div, + ast.Pow, + ast.Mod, ast.USub, - ast.Eq, ast.NotEq, - ast.Gt, ast.Lt, - ast.GtE, ast.LtE, + ast.Eq, + ast.NotEq, + ast.Gt, + ast.Lt, + ast.GtE, + ast.LtE, ast.Invert, ast.Name, ) @@ -65,17 +71,12 @@ class DslTransformer(ast.NodeTransformer): self.environment = environment self.variables = variables - def visit_Call(self, node): func_name = node.func.id if func_name not in self.environment: raise NameError(f"unspported function: '{func_name}'") - return ast.Call( - func=node.func, - args=[self.visit(arg) for arg in node.args], - keywords=[], - ) + return ast.Call(func=node.func, args=[self.visit(arg) for arg in node.args], keywords=[],) def visit_Name(self, node): name = node.id @@ -85,9 +86,7 @@ class DslTransformer(ast.NodeTransformer): if name in self.variables: value = ast.Constant(value=name) return ast.Subscript( - value=ast.Name(id="data", ctx=ast.Load()), - slice=ast.Index(value=value), - ctx=ast.Load(), + value=ast.Name(id="data", ctx=ast.Load()), slice=ast.Index(value=value), ctx=ast.Load(), ) if name in self.environment: return ast.Constant(value=name) @@ -103,16 +102,21 @@ class DslTransformer(ast.NodeTransformer): class ConfigTransformer(ast.NodeTransformer): SUPPORTED_NODES = ( - ast.Call, ast.Num, ast.Str, ast.keyword, - ast.NameConstant, ast.UnaryOp, ast.Name, - ast.Load, ast.Expression, ast.Subscript, - ast.Index, ast.USub + ast.Call, + ast.Num, + ast.Str, + ast.keyword, + ast.NameConstant, + ast.UnaryOp, + ast.Name, + ast.Load, + ast.Expression, + ast.Subscript, + ast.Index, + ast.USub, ) - SUPPORTED_ARGUMENTS = ( - ast.Str, ast.Num, ast.NameConstant, ast.Call, - ast.UnaryOp, ast.USub, ast.Name - ) + SUPPORTED_ARGUMENTS = (ast.Str, ast.Num, ast.NameConstant, ast.Call, ast.UnaryOp, ast.USub, ast.Name) def __init__(self, dsl_transformer, environment, pass_parameter): self.dsl_transformer = dsl_transformer @@ -134,9 +138,7 @@ class ConfigTransformer(ast.NodeTransformer): ast.Name(id="flagger", ctx=ast.Load()), ] - node = ast.Call( - func=node.func, args=new_args + node.args, keywords=node.keywords - ) + node = ast.Call(func=node.func, args=new_args + node.args, keywords=node.keywords) return self.generic_visit(node) @@ -150,14 +152,10 @@ class ConfigTransformer(ast.NodeTransformer): raise TypeError(f"unknown function parameter '{node.arg}'") if not isinstance(value, self.SUPPORTED_ARGUMENTS): - raise TypeError( - f"invalid argument type '{type(value)}'" - ) + raise TypeError(f"invalid argument type '{type(value)}'") if isinstance(value, ast.Name) and value.id not in self.environment: - raise NameError( - f"unknown variable: {value.id}" - ) + raise NameError(f"unknown variable: {value.id}") return self.generic_visit(node) diff --git a/saqc/core/reader.py b/saqc/core/reader.py index defc61a721880b075070d26a1bb82e18321ad70e..4ebb53d51b608610c7cfeb2bddfc220af899acf4 100644 --- a/saqc/core/reader.py +++ b/saqc/core/reader.py @@ -25,16 +25,12 @@ def checkConfig(config_df, data, flagger, nodata): var_name = config_row[F.VARNAME] if pd.isnull(config_row[F.VARNAME]) or not var_name: - _raise( - config_row, SyntaxError, f"non-optional column '{F.VARNAME}' is missing or empty" - ) + _raise(config_row, SyntaxError, f"non-optional column '{F.VARNAME}' is missing or empty") test_fields = config_row.filter(regex=F.TESTS).dropna() if test_fields.empty: _raise( - config_row, - SyntaxError, - f"at least one test needs to be given for variable", + config_row, SyntaxError, f"at least one test needs to be given for variable", ) for col, expr in test_fields.iteritems(): @@ -44,10 +40,7 @@ def checkConfig(config_df, data, flagger, nodata): compileExpression(expr, data, var_name, flagger, nodata) except (TypeError, NameError, SyntaxError) as exc: _raise( - config_row, - type(exc), - exc.args[0] + f" (failing statement: '{expr}')", - col, + config_row, type(exc), exc.args[0] + f" (failing statement: '{expr}')", col, ) return config_df @@ -80,12 +73,7 @@ def prepareConfig(config_df, data): # fill nans with default values config_df = config_df.fillna( - { - F.VARNAME: np.nan, - F.START: data.index.min(), - F.END: data.index.max(), - F.PLOT: False, - } + {F.VARNAME: np.nan, F.START: data.index.min(), F.END: data.index.max(), F.PLOT: False,} ) # dtype = np.datetime64 if isinstance(data.index, pd.DatetimeIndex) else int diff --git a/saqc/flagger/baseflagger.py b/saqc/flagger/baseflagger.py index 565ae85940a5459eb45dd825d3d0cb1ad6d156e1..3099996dccbbe208e916fdf6e259d37a61016407 100644 --- a/saqc/flagger/baseflagger.py +++ b/saqc/flagger/baseflagger.py @@ -41,9 +41,7 @@ class BaseFlagger(ABC): self.signature = ("flag",) self._flags: pd.DataFrame - def initFlags( - self, data: pd.DataFrame = None, flags: pd.DataFrame = None - ) -> BaseFlaggerT: + def initFlags(self, data: pd.DataFrame = None, flags: pd.DataFrame = None) -> BaseFlaggerT: """ initialize a flagger based on the given 'data' or 'flags' if 'data' is not None: return a flagger with flagger.UNFALGGED values @@ -53,9 +51,7 @@ class BaseFlagger(ABC): if data is None and flags is None: raise TypeError("either 'data' or 'flags' are required") if data is not None: - flags = pd.DataFrame( - data=self.UNFLAGGED, index=data.index, columns=data.columns - ) + flags = pd.DataFrame(data=self.UNFLAGGED, index=data.index, columns=data.columns) return self._copy(self._assureDtype(flags)) def setFlagger(self, other: BaseFlaggerT): @@ -80,9 +76,7 @@ class BaseFlagger(ABC): return self._copy(self._assureDtype(flags)) - def getFlagger( - self, field: str = None, loc: LocT = None, iloc: IlocT = None - ) -> BaseFlaggerT: + def getFlagger(self, field: str = None, loc: LocT = None, iloc: IlocT = None) -> BaseFlaggerT: """ return a potentially trimmed down copy of self """ @@ -93,9 +87,7 @@ class BaseFlagger(ABC): flags = flags.to_frame() return self._copy(flags) - def getFlags( - self, field: str = None, loc: LocT = None, iloc: IlocT = None - ) -> PandasT: + def getFlags(self, field: str = None, loc: LocT = None, iloc: IlocT = None) -> PandasT: """ return a copy of a potentially trimmed down 'self._flags' DataFrame """ @@ -106,13 +98,7 @@ class BaseFlagger(ABC): return flags.loc[mask, field] def setFlags( - self, - field: str, - loc: LocT = None, - iloc: IlocT = None, - flag: FlagT = None, - force: bool = False, - **kwargs, + self, field: str, loc: LocT = None, iloc: IlocT = None, flag: FlagT = None, force: bool = False, **kwargs, ) -> BaseFlaggerT: assertScalar("field", field, optional=False) @@ -129,22 +115,12 @@ class BaseFlagger(ABC): out._flags.loc[mask, field] = other[mask] return out - def clearFlags( - self, field: str, loc: LocT = None, iloc: IlocT = None, **kwargs - ) -> BaseFlaggerT: + def clearFlags(self, field: str, loc: LocT = None, iloc: IlocT = None, **kwargs) -> BaseFlaggerT: assertScalar("field", field, optional=False) - return self.setFlags( - field=field, loc=loc, iloc=iloc, flag=self.UNFLAGGED, force=True, **kwargs - ) + return self.setFlags(field=field, loc=loc, iloc=iloc, flag=self.UNFLAGGED, force=True, **kwargs) def isFlagged( - self, - field=None, - loc: LocT = None, - iloc: IlocT = None, - flag: FlagT = None, - comparator: str = ">", - **kwargs, + self, field=None, loc: LocT = None, iloc: IlocT = None, flag: FlagT = None, comparator: str = ">", **kwargs, ) -> PandasT: assertScalar("field", field, optional=True) assertScalar("flag", flag, optional=True) @@ -160,9 +136,7 @@ class BaseFlagger(ABC): out._flags = flags return out - def _locatorMask( - self, field: str = None, loc: LocT = None, iloc: IlocT = None - ) -> PandasT: + def _locatorMask(self, field: str = None, loc: LocT = None, iloc: IlocT = None) -> PandasT: field = field or slice(None) locator = [l for l in (loc, iloc, slice(None)) if l is not None][0] index = self._flags.index diff --git a/saqc/flagger/categoricalflagger.py b/saqc/flagger/categoricalflagger.py index 7e36ea563802a5779a4d5ae87ae59fe27a6cedd1..eb5384d2bdea5a5ff24323ae416fd440474a87f7 100644 --- a/saqc/flagger/categoricalflagger.py +++ b/saqc/flagger/categoricalflagger.py @@ -30,9 +30,7 @@ class CategoricalFlagger(BaseFlagger): not needed here, move out """ if isinstance(flag, pd.Series): - return ( - isinstance(flag.dtype, pd.CategoricalDtype) and flag.dtype == self.dtype - ) + return isinstance(flag.dtype, pd.CategoricalDtype) and flag.dtype == self.dtype return flag in self.dtype.categories @property diff --git a/saqc/flagger/continuousflagger.py b/saqc/flagger/continuousflagger.py index 7309308e271e8d1d4e46b8b3e55c2cbc4241d2ab..37e96508d224f8973eeb61fffbf4068630d439ed 100644 --- a/saqc/flagger/continuousflagger.py +++ b/saqc/flagger/continuousflagger.py @@ -16,27 +16,10 @@ class ContinuousFlagger(BaseFlagger): self._unflagged_flag = unflagged self.signature = ("flag", "factor", "modify") - def setFlags( - self, - field, - loc=None, - iloc=None, - flag=None, - force=False, - factor=1, - modify=False, - **kwargs - ): + def setFlags(self, field, loc=None, iloc=None, flag=None, force=False, factor=1, modify=False, **kwargs): # NOTE: incomplete, as the option to # update flags is temporarily gone - return super().setFlags( - field=field, - loc=loc, - iloc=iloc, - flag=self._checkFlag(flag), - force=force, - **kwargs - ) + return super().setFlags(field=field, loc=loc, iloc=iloc, flag=self._checkFlag(flag), force=force, **kwargs) # NOTE: # we should probably override _assureDtype here diff --git a/saqc/flagger/dmpflagger.py b/saqc/flagger/dmpflagger.py index 8e2a648113dcd31d314f8b801cb2118e7feb9614..24843283792093d655beff167f3713274860b444 100644 --- a/saqc/flagger/dmpflagger.py +++ b/saqc/flagger/dmpflagger.py @@ -35,10 +35,7 @@ class DmpFlagger(CategoricalFlagger): super().__init__(FLAGS) self.flags_fields = [FlagFields.FLAG, FlagFields.CAUSE, FlagFields.COMMENT] version = subprocess.run( - "git describe --tags --always --dirty", - shell=True, - check=False, - stdout=subprocess.PIPE, + "git describe --tags --always --dirty", shell=True, check=False, stdout=subprocess.PIPE, ).stdout self.project_version = version.decode().strip() self.signature = ("flag", "comment", "cause", "force") @@ -52,16 +49,12 @@ class DmpFlagger(CategoricalFlagger): """ if data is not None: - flags = pd.DataFrame( - data=self.UNFLAGGED, - columns=self._getColumnIndex(data.columns), - index=data.index, - ) + flags = pd.DataFrame(data=self.UNFLAGGED, columns=self._getColumnIndex(data.columns), index=data.index,) elif flags is not None: if not isinstance(flags.columns, pd.MultiIndex): - flags = flags.T.set_index( - keys=self._getColumnIndex(flags.columns, [FlagFields.FLAG]) - ).T.reindex(columns=self._getColumnIndex(flags.columns)) + flags = flags.T.set_index(keys=self._getColumnIndex(flags.columns, [FlagFields.FLAG])).T.reindex( + columns=self._getColumnIndex(flags.columns) + ) else: raise TypeError("either 'data' or 'flags' are required") @@ -70,9 +63,7 @@ class DmpFlagger(CategoricalFlagger): def getFlagger(self, field=None, loc=None, iloc=None): # NOTE: we need to preserve all indexing levels assertScalar("field", field, optional=True) - variables = (self._flags.columns - .get_level_values(ColumnLevels.VARIABLES) - .drop_duplicates()) + variables = self._flags.columns.get_level_values(ColumnLevels.VARIABLES).drop_duplicates() cols = toSequence(field, variables) out = super().getFlagger(field, loc, iloc) out._flags.columns = self._getColumnIndex(cols) @@ -85,28 +76,12 @@ class DmpFlagger(CategoricalFlagger): flags = self._flags.xs(FlagFields.FLAG, level=ColumnLevels.FLAGS, axis=1).copy() return super()._assureDtype(flags.loc[mask, field]) - def setFlags( - self, - field, - loc=None, - iloc=None, - flag=None, - force=False, - comment="", - cause="", - **kwargs - ): + def setFlags(self, field, loc=None, iloc=None, flag=None, force=False, comment="", cause="", **kwargs): assertScalar("field", field, optional=True) flag = self.BAD if flag is None else self._checkFlag(flag) - comment = json.dumps( - { - "comment": comment, - "commit": self.project_version, - "test": kwargs.get("func_name", ""), - } - ) + comment = json.dumps({"comment": comment, "commit": self.project_version, "test": kwargs.get("func_name", ""),}) this = self.getFlags(field=field) other = self._broadcastFlags(field=field, flag=flag) @@ -123,9 +98,7 @@ class DmpFlagger(CategoricalFlagger): ) -> pd.MultiIndex: cols = toSequence(cols) fields = toSequence(fields, self.flags_fields) - return pd.MultiIndex.from_product( - [cols, fields], names=[ColumnLevels.VARIABLES, ColumnLevels.FLAGS] - ) + return pd.MultiIndex.from_product([cols, fields], names=[ColumnLevels.VARIABLES, ColumnLevels.FLAGS]) def _assureDtype(self, flags): # NOTE: building up new DataFrames is significantly diff --git a/saqc/funcs/break_detection.py b/saqc/funcs/break_detection.py index 4224ea1eb6a71114959e299bf8f4f0d62f10e435..38fbeb3cc6728eaae9e03ba71cade78673b1c9b3 100644 --- a/saqc/funcs/break_detection.py +++ b/saqc/funcs/break_detection.py @@ -90,9 +90,7 @@ def flagBreaksSpektrumBased( # relative - change - break criteria testing: abs_change = np.abs(dataseries.shift(+1) - dataseries) - breaks = (abs_change > thresh_abs) & ( - abs_change / dataseries > thresh_rel - ) + breaks = (abs_change > thresh_abs) & (abs_change / dataseries > thresh_rel) breaks = breaks[breaks == True] # First derivative criterion @@ -102,34 +100,21 @@ def flagBreaksSpektrumBased( for brake in breaks.index: # slice out slice-to-be-filtered (with some safety extension of 12 times the data rate) - slice_start = ( - brake - pd.Timedelta(first_der_window) - smoothing_periods * pd.Timedelta(data_rate) - ) - slice_end = ( - brake + pd.Timedelta(first_der_window) + smoothing_periods * pd.Timedelta(data_rate) - ) + slice_start = brake - pd.Timedelta(first_der_window) - smoothing_periods * pd.Timedelta(data_rate) + slice_end = brake + pd.Timedelta(first_der_window) + smoothing_periods * pd.Timedelta(data_rate) data_slice = dataseries[slice_start:slice_end] # obtain first derivative: if smooth is True: first_deri_series = pd.Series( - data=savgol_filter( - data_slice, - window_length=smoothing_periods, - polyorder=smooth_poly_deg, - deriv=1, - ), + data=savgol_filter(data_slice, window_length=smoothing_periods, polyorder=smooth_poly_deg, deriv=1,), index=data_slice.index, ) else: first_deri_series = data_slice.diff() # condition constructing and testing: - test_slice = first_deri_series[ - brake - - pd.Timedelta(first_der_window): brake - + pd.Timedelta(first_der_window) - ] + test_slice = first_deri_series[brake - pd.Timedelta(first_der_window) : brake + pd.Timedelta(first_der_window)] test_sum = abs((test_slice.sum() * first_der_factor) / test_slice.size) @@ -143,10 +128,7 @@ def flagBreaksSpektrumBased( if smooth is True: second_deri_series = pd.Series( data=savgol_filter( - data_slice, - window_length=smoothing_periods, - polyorder=smooth_poly_deg, - deriv=2, + data_slice, window_length=smoothing_periods, polyorder=smooth_poly_deg, deriv=2, ), index=data_slice.index, ) @@ -156,22 +138,11 @@ def flagBreaksSpektrumBased( # criterion evaluation: first_second = ( (1 - scnd_der_ratio_range) - < abs( - ( - second_deri_series.shift(+1)[brake] - / second_deri_series[brake] - ) - ) + < abs((second_deri_series.shift(+1)[brake] / second_deri_series[brake])) < 1 + scnd_der_ratio_range ) - second_second = ( - abs( - second_deri_series[brake] - / second_deri_series.shift(-1)[brake] - ) - > scnd_der_ratio_thresh - ) + second_second = abs(second_deri_series[brake] / second_deri_series.shift(-1)[brake]) > scnd_der_ratio_thresh if (~first_second) | (~second_second): breaks[brake] = False diff --git a/saqc/funcs/constants_detection.py b/saqc/funcs/constants_detection.py index c8d9398999c3efc4ed935a0cf5e0c50bafd99b79..d390c5fa2dcc5b5b2e7d015fa0868962f276f449 100644 --- a/saqc/funcs/constants_detection.py +++ b/saqc/funcs/constants_detection.py @@ -6,12 +6,7 @@ import pandas as pd from saqc.funcs.register import register from saqc.lib.statistic_functions import varQC -from saqc.lib.tools import ( - valueRange, - slidingWindowIndices, - retrieveTrustworthyOriginal, - groupConsecutives -) +from saqc.lib.tools import valueRange, slidingWindowIndices, retrieveTrustworthyOriginal, groupConsecutives @register("constant") @@ -36,14 +31,7 @@ def flagConstant(data, field, flagger, thresh, window, **kwargs): @register("constant_varianceBased") def flagConstantVarianceBased( - data, - field, - flagger, - window="12h", - thresh=0.0005, - max_missing=None, - max_consec_missing=None, - **kwargs + data, field, flagger, window="12h", thresh=0.0005, max_missing=None, max_consec_missing=None, **kwargs ): """ @@ -78,20 +66,15 @@ def flagConstantVarianceBased( min_periods = int(np.ceil(pd.Timedelta(window) / pd.Timedelta(data_rate))) - plateaus = dataseries.rolling( - window=window, min_periods=min_periods - ).apply( - lambda x: True - if varQC(x, max_missing, max_consec_missing) <= thresh - else np.nan, - raw=False, + plateaus = dataseries.rolling(window=window, min_periods=min_periods).apply( + lambda x: True if varQC(x, max_missing, max_consec_missing) <= thresh else np.nan, raw=False, ) # are there any candidates for beeing flagged plateau-ish if plateaus.sum() == 0: return data, flagger - plateaus.fillna(method="bfill", limit=min_periods-1, inplace=True) + plateaus.fillna(method="bfill", limit=min_periods - 1, inplace=True) # result: plateaus = (plateaus[plateaus == 1.0]).index diff --git a/saqc/funcs/functions.py b/saqc/funcs/functions.py index abef3bb4c27c2438a25ed984d2ca60a6c3aac5a6..06d0e7d371bae6b5bc0933478332ad46c1ef2bd5 100644 --- a/saqc/funcs/functions.py +++ b/saqc/funcs/functions.py @@ -35,18 +35,14 @@ def flagGeneric(data, field, flagger, func, **kwargs): @register("flagWindowAfterFlag") def flagWindowAfterFlag(data, field, flagger, window, func, **kwargs): data, flagger_new = func - flagger_repeated = flagWindow( - flagger, flagger_new, field, direction="fw", window=window, **kwargs - ) + flagger_repeated = flagWindow(flagger, flagger_new, field, direction="fw", window=window, **kwargs) return data, flagger_repeated @register("flagNextAfterFlag") def flagNextAfterFlag(data, field, flagger, n, func, **kwargs): data, flagger_new = func - flagger_repeated = flagWindow( - flagger, flagger_new, field, direction="fw", window=n, **kwargs - ) + flagger_repeated = flagWindow(flagger, flagger_new, field, direction="fw", window=n, **kwargs) return data, flagger_repeated @@ -72,16 +68,7 @@ def flagMissing(data, field, flagger, nodata=np.nan, **kwargs): @register("sesonalRange") def flagSesonalRange( - data, - field, - flagger, - min, - max, - startmonth=1, - endmonth=12, - startday=1, - endday=31, - **kwargs, + data, field, flagger, min, max, startmonth=1, endmonth=12, startday=1, endday=31, **kwargs, ): smask = sesonalMask(data.index, startmonth, startday, endmonth, endday) @@ -89,9 +76,7 @@ def flagSesonalRange( if d.empty: return data, flagger - _, flagger_range = flagRange( - d, field, flagger.getFlagger(loc=d.index), min=min, max=max, **kwargs - ) + _, flagger_range = flagRange(d, field, flagger.getFlagger(loc=d.index), min=min, max=max, **kwargs) if not flagger_range.isFlagged(field).any(): return data, flagger @@ -114,12 +99,7 @@ def forceFlags(data, field, flagger, flag, **kwargs): @register("isolated") def flagIsolated( - data, - field, - flagger, - gap_window, - group_window, - **kwargs, + data, field, flagger, gap_window, group_window, **kwargs, ): gap_window = pd.tseries.frequencies.to_offset(gap_window) @@ -134,9 +114,9 @@ def flagIsolated( start = srs.index[0] stop = srs.index[-1] if stop - start <= group_window: - left = mask[start-gap_window:start].iloc[:-1] + left = mask[start - gap_window : start].iloc[:-1] if left.count() and left.all(): - right = mask[stop:stop+gap_window].iloc[1:] + right = mask[stop : stop + gap_window].iloc[1:] if right.count() and right.all(): flags[start:stop] = True diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index dc641810a43f3dc2af2f196f3ce2d412ec6006ad..245c91fe8d99a3f78ecf6deb23fe04b2a6836317 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -10,7 +10,6 @@ from saqc.funcs.register import register from saqc.lib.tools import toSequence, getFuncFromInput - class Heap: INDEX = "initial_ts" DATA = "original_data" @@ -29,7 +28,7 @@ HARM_2_DEHARM = { "nagg": "invert_nearest", "fagg_no_deharm": "regain", "bagg_no_deharm": "regain", - "nagg_no_deharm": "regain" + "nagg_no_deharm": "regain", } @@ -52,7 +51,7 @@ def harmWrapper(heap={}): reshape_shift_comment=False, drop_flags=None, data_missing_value=np.nan, - **kwargs + **kwargs, ): # get funcs from strings: @@ -61,9 +60,7 @@ def harmWrapper(heap={}): # for some tingle tangle reasons, resolving the harmonization will not be sound, if not all missing/np.nan # values get flagged initially: - data, flagger = flagMissing( - data, field, flagger, nodata=data_missing_value, **kwargs - ) + data, flagger = flagMissing(data, field, flagger, nodata=data_missing_value, **kwargs) # and dropped for harmonization: if drop_flags is not None: if flagger.BAD not in drop_flags: @@ -87,9 +84,7 @@ def harmWrapper(heap={}): heap.update({Heap.INDEX: dat_col.index}) # now we can manipulate it without loosing information gathered before harmonization - dat_col, flagger_merged_clean = _outsortCrap( - dat_col, field, flagger_merged, drop_flags=drop_flags, - ) + dat_col, flagger_merged_clean = _outsortCrap(dat_col, field, flagger_merged, drop_flags=drop_flags,) # interpolation! (yeah) dat_col, chunk_bounds = _interpolateGrid( @@ -112,17 +107,13 @@ def harmWrapper(heap={}): missing_flag=reshape_missing_flag, set_shift_comment=reshape_shift_comment, block_flags=chunk_bounds, - **kwargs + **kwargs, ) # finally we happily blow up the data and flags frame again, # to release them on their ongoing journey through saqc. data, flagger_out = _toMerged( - data, - flagger, - field, - data_to_insert=dat_col, - flagger_to_insert=flagger_merged_clean_reshaped, + data, flagger, field, data_to_insert=dat_col, flagger_to_insert=flagger_merged_clean_reshaped, ) return data, flagger_out @@ -146,11 +137,7 @@ def harmWrapper(heap={}): # reconstruct the drops that were performed before harmonization drops, flagger_original_clean = _outsortCrap( - dat_col, - field, - harm_info[Heap.FLAGGER], - drop_flags=harm_info[Heap.DROP], - return_drops=True, + dat_col, field, harm_info[Heap.FLAGGER], drop_flags=harm_info[Heap.DROP], return_drops=True, ) # with reconstructed pre-harmonization flags-frame -> perform the projection of the flags calculated for @@ -179,14 +166,7 @@ def harmWrapper(heap={}): dat_col = harm_info[Heap.DATA].reindex(flags_col.index, fill_value=np.nan) dat_col.name = field # transform the result into the form, data travels through saqc: - data, flagger_out = _toMerged( - data, - flagger, - field, - dat_col, - flagger_back_full, - target_index=heap[Heap.INDEX], - ) + data, flagger_out = _toMerged(data, flagger, field, dat_col, flagger_back_full, target_index=heap[Heap.INDEX],) # clear heap if nessecary: if len(heap) == 1 and Heap.INDEX in heap: del heap[Heap.INDEX] @@ -229,9 +209,7 @@ def _outsortCrap( drop_flags = toSequence(drop_flags, default=flagger.BAD) for drop_flag in drop_flags: - drop_mask = drop_mask | flagger.isFlagged( - field, flag=drop_flag, comparator="==" - ) + drop_mask = drop_mask | flagger.isFlagged(field, flag=drop_flag, comparator="==") flagger_out = flagger.getFlagger(loc=~drop_mask) if return_drops: @@ -267,13 +245,7 @@ def _insertGrid(data, freq): def _interpolateGrid( - data, - freq, - method, - order=1, - agg_method=sum, - total_range=None, - downcast_interpolation=False, + data, freq, method, order=1, agg_method=sum, total_range=None, downcast_interpolation=False, ): """The function calculates grid point values for a passed pd.Series (['data']) by applying the selected interpolation/fill method. (passed to key word 'method'). The interpolation will apply for grid points @@ -362,9 +334,9 @@ def _interpolateGrid( seconds_total = pd.Timedelta(freq).total_seconds() seconds_string = str(int(seconds_total)) + "s" # calculate the series of aggregated values - data = data.resample( - seconds_string, base=seconds_total / 2, loffset=pd.Timedelta(freq) / 2 - ).apply(agg_method) + data = data.resample(seconds_string, base=seconds_total / 2, loffset=pd.Timedelta(freq) / 2).apply( + agg_method + ) elif method == "bagg": # all values in a sampling interval get aggregated with agg_method and assigned to the last grid point @@ -398,23 +370,14 @@ def _interpolateGrid( data = _insertGrid(data, freq) data, chunk_bounds = _interpolate( - data, - method, - order=order, - inter_limit=2, - downcast_interpolation=downcast_interpolation, + data, method, order=order, inter_limit=2, downcast_interpolation=downcast_interpolation, ) if total_range is None: data = data.asfreq(freq, fill_value=np.nan) else: - methods = "\n".join( - [", ".join(shifts), - ", ".join(aggregations), - ", ".join(interpolations)] - ) - raise ValueError( - f"Unknown interpolation method: '{method}', please select from:\n{methods}") + methods = "\n".join([", ".join(shifts), ", ".join(aggregations), ", ".join(interpolations)]) + raise ValueError(f"Unknown interpolation method: '{method}', please select from:\n{methods}") if total_range is not None: data = data.reindex(total_index) @@ -442,26 +405,19 @@ def _interpolate(data, method, order=2, inter_limit=2, downcast_interpolation=Fa :return: """ - gap_mask = ( - data.rolling(inter_limit, min_periods=0).apply( - lambda x: np.sum(np.isnan(x)), raw=True - ) - ) != inter_limit + gap_mask = (data.rolling(inter_limit, min_periods=0).apply(lambda x: np.sum(np.isnan(x)), raw=True)) != inter_limit if inter_limit == 2: gap_mask = gap_mask & gap_mask.shift(-1, fill_value=True) else: gap_mask = ( - gap_mask.replace(True, np.nan) - .fillna(method="bfill", limit=inter_limit) - .replace(np.nan, True) - .astype(bool) + gap_mask.replace(True, np.nan).fillna(method="bfill", limit=inter_limit).replace(np.nan, True).astype(bool) ) # start end ending points of interpolation chunks have to be memorized to block their flagging: chunk_switches = gap_mask.astype(int).diff() chunk_starts = chunk_switches[chunk_switches == -1].index chunk_ends = chunk_switches[(chunk_switches.shift(-1) == 1)].index - chunk_bounds = chunk_starts.join(chunk_ends, how='outer', sort=True) + chunk_bounds = chunk_starts.join(chunk_ends, how="outer", sort=True) data = data[gap_mask] @@ -510,7 +466,7 @@ def _reshapeFlags( missing_flag=None, set_shift_comment=True, block_flags=None, - **kwargs + **kwargs, ): """To continue processing flags after harmonization/interpolation, old pre-harm flags have to be distributed onto new grid. @@ -575,9 +531,7 @@ def _reshapeFlags( direction = "nearest" tolerance = pd.Timedelta(freq) / 2 - flags = flagger.getFlags().reindex( - ref_index, tolerance=tolerance, method=direction, fill_value=np.nan - ) + flags = flagger.getFlags().reindex(ref_index, tolerance=tolerance, method=direction, fill_value=np.nan) # if you want to keep previous comments - only newly generated missing flags get commented: flags_series = flags.squeeze() @@ -587,9 +541,7 @@ def _reshapeFlags( ) if set_shift_comment: - flagger_new = flagger_new.setFlags( - field, flag=flags_series, force=True, **kwargs - ) + flagger_new = flagger_new.setFlags(field, flag=flags_series, force=True, **kwargs) elif method in aggregations: # prepare resampling keywords @@ -611,10 +563,7 @@ def _reshapeFlags( base = seconds_total / 2 freq_string = str(int(seconds_total)) + "s" i_start = flagger.getFlags().index[0] - if ( - abs(i_start - i_start.ceil(freq)) - <= pd.Timedelta(freq) / 2 - ): + if abs(i_start - i_start.ceil(freq)) <= pd.Timedelta(freq) / 2: shift_correcture = -1 else: shift_correcture = +1 @@ -648,22 +597,18 @@ def _reshapeFlags( else: methods = ", ".join(shifts + ["\n"] + aggregations) raise ValueError( - "Passed reshaping method keyword:'{}', is unknown. Please select from: \n '{}'.".format( - method, methods - ) + "Passed reshaping method keyword:'{}', is unknown. Please select from: \n '{}'.".format(method, methods) ) # block flagging/backtracking of chunk_starts/chunk_ends if block_flags is not None: - flagger_new = flagger_new.setFlags(field, loc=block_flags, - flag=pd.Series(np.nan, index=block_flags).astype(flagger_new.dtype), - force=True) + flagger_new = flagger_new.setFlags( + field, loc=block_flags, flag=pd.Series(np.nan, index=block_flags).astype(flagger_new.dtype), force=True + ) return flagger_new -def _backtrackFlags( - flagger_post, flagger_pre, freq, track_method="invert_fshift", co_flagging=False -): +def _backtrackFlags(flagger_post, flagger_pre, freq, track_method="invert_fshift", co_flagging=False): # in the case of "real" up/downsampling - evaluating the harm flags against the original flags makes no sence! if track_method in ["regain"]: @@ -673,10 +618,7 @@ def _backtrackFlags( flags_post = flagger_post.getFlags() flags_pre = flagger_pre.getFlags() flags_header = flags_post.columns - if ( - track_method in ["invert_fshift", "invert_bshift", "invert_nearest"] - and co_flagging is True - ): + if track_method in ["invert_fshift", "invert_bshift", "invert_nearest"] and co_flagging is True: if track_method == "invert_fshift": method = "bfill" tolerance = pd.Timedelta(freq) @@ -689,19 +631,14 @@ def _backtrackFlags( method = "nearest" tolerance = pd.Timedelta(freq) / 2 - flags_post = flags_post.reindex( - flags_pre.index, method=method, tolerance=tolerance - ) + flags_post = flags_post.reindex(flags_pre.index, method=method, tolerance=tolerance) replacement_mask = flags_post.squeeze() > flags_pre.squeeze() # there is a mysterious problem when assigning 1-d-dataframes - so we squeeze: flags_pre = flags_pre.squeeze(axis=1) flags_post = flags_post.squeeze(axis=1) flags_pre.loc[replacement_mask] = flags_post.loc[replacement_mask] - if ( - track_method in ["invert_fshift", "invert_bshift", "invert_nearest"] - and co_flagging is False - ): + if track_method in ["invert_fshift", "invert_bshift", "invert_nearest"] and co_flagging is False: if track_method == "invert_fshift": method = "backward" tolerance = pd.Timedelta(freq) @@ -715,9 +652,7 @@ def _backtrackFlags( flags_post = pd.merge_asof( flags_post, - pd.DataFrame( - flags_pre.index.values, index=flags_pre.index, columns=["pre_index"] - ), + pd.DataFrame(flags_pre.index.values, index=flags_pre.index, columns=["pre_index"]), left_index=True, right_index=True, tolerance=tolerance, @@ -730,15 +665,11 @@ def _backtrackFlags( # restore flag shape flags_post.columns = flags_header - replacement_mask = ( - flags_post.squeeze() > flags_pre.loc[flags_post.index, :].squeeze() - ) + replacement_mask = flags_post.squeeze() > flags_pre.loc[flags_post.index, :].squeeze() # there is a mysterious problem when assigning 1-d-dataframes - so we squeeze: flags_pre = flags_pre.squeeze(axis=1) flags_post = flags_post.squeeze(axis=1) - flags_pre.loc[replacement_mask[replacement_mask].index] = flags_post.loc[ - replacement_mask - ] + flags_pre.loc[replacement_mask[replacement_mask].index] = flags_post.loc[replacement_mask] # sticking to the nomenklatura of always-DF for flags: if isinstance(flags_pre, pd.Series): @@ -753,9 +684,7 @@ def _fromMerged(data, flagger, fieldname): return data.loc[mask, fieldname], flagger.getFlagger(field=fieldname, loc=mask) -def _toMerged( - data, flagger, fieldname, data_to_insert, flagger_to_insert, target_index=None -): +def _toMerged(data, flagger, fieldname, data_to_insert, flagger_to_insert, target_index=None): data = data.copy() flags = flagger.getFlags() @@ -784,12 +713,8 @@ def _toMerged( # if we are not "de-harmonizing": if target_index is None: # erase nan rows in the data, that became redundant because of harmonization and merge with data-to-insert: - data = pd.merge( - data[mask], data_to_insert, how="outer", left_index=True, right_index=True - ) - flags = pd.merge( - flags[mask], flags_to_insert, how="outer", left_index=True, right_index=True - ) + data = pd.merge(data[mask], data_to_insert, how="outer", left_index=True, right_index=True) + flags = pd.merge(flags[mask], flags_to_insert, how="outer", left_index=True, right_index=True) return data, flagger.initFlags(flags=flags) else: @@ -803,35 +728,23 @@ def _toMerged( new_index = data[mask].index.join(target_index, how="outer") data = data.reindex(new_index) flags = flags.reindex(new_index, fill_value=flagger.UNFLAGGED) - data = pd.merge( - data, data_to_insert, how="outer", left_index=True, right_index=True - ) - flags = pd.merge( - flags, flags_to_insert, how="outer", left_index=True, right_index=True - ) + data = pd.merge(data, data_to_insert, how="outer", left_index=True, right_index=True) + flags = pd.merge(flags, flags_to_insert, how="outer", left_index=True, right_index=True) # internally harmonization memorizes its own manipulation by inserting nan flags - # those we will now assign the flagger.bad flag by the "missingTest": - return flagMissing( - data, fieldname, flagger.initFlags(flags=flags), nodata=np.nan - ) + return flagMissing(data, fieldname, flagger.initFlags(flags=flags), nodata=np.nan) -@register('harmonize_shift2Grid') -def shift2Grid(data, field, flagger, freq, method='nshift', drop_flags=None, **kwargs): +@register("harmonize_shift2Grid") +def shift2Grid(data, field, flagger, freq, method="nshift", drop_flags=None, **kwargs): return harmonize( - data, - field, - flagger, - freq, - inter_method=method, - reshape_method=method, - drop_flags=drop_flags, - **kwargs) + data, field, flagger, freq, inter_method=method, reshape_method=method, drop_flags=drop_flags, **kwargs + ) -@register('harmonize_aggregate2Grid') -def aggregate2Grid(data, field, flagger, freq, value_func, flag_func="max", method='nagg', drop_flags=None, **kwargs): +@register("harmonize_aggregate2Grid") +def aggregate2Grid(data, field, flagger, freq, value_func, flag_func="max", method="nagg", drop_flags=None, **kwargs): return harmonize( data, field, @@ -842,26 +755,29 @@ def aggregate2Grid(data, field, flagger, freq, value_func, flag_func="max", meth inter_agg=value_func, reshape_agg=flag_func, drop_flags=drop_flags, - **kwargs) + **kwargs, + ) -@register('harmonize_linear2Grid') -def linear2Grid(data, field, flagger, freq, method='nagg', func="max", drop_flags=None, **kwargs): +@register("harmonize_linear2Grid") +def linear2Grid(data, field, flagger, freq, method="nagg", func="max", drop_flags=None, **kwargs): return harmonize( data, field, flagger, freq, - inter_method='time', + inter_method="time", reshape_method=method, reshape_agg=func, drop_flags=drop_flags, - **kwargs) + **kwargs, + ) -@register('harmonize_interpolate2Grid') -def interpolate2Grid(data, field, flagger, freq, method, order=1, - flag_method='nagg', flag_func="max", drop_flags=None, **kwargs): +@register("harmonize_interpolate2Grid") +def interpolate2Grid( + data, field, flagger, freq, method, order=1, flag_method="nagg", flag_func="max", drop_flags=None, **kwargs +): return harmonize( data, field, @@ -872,12 +788,23 @@ def interpolate2Grid(data, field, flagger, freq, method, order=1, reshape_method=flag_method, reshape_agg=flag_func, drop_flags=drop_flags, - **kwargs) + **kwargs, + ) -@register('harmonize_downsample') -def downsample(data, field, flagger, sample_freq, agg_freq, sample_func="mean", agg_func="mean", - invalid_flags=None, max_invalid=None, **kwargs): +@register("harmonize_downsample") +def downsample( + data, + field, + flagger, + sample_freq, + agg_freq, + sample_func="mean", + agg_func="mean", + invalid_flags=None, + max_invalid=None, + **kwargs, +): agg_func = getFuncFromInput(agg_func) @@ -896,17 +823,20 @@ def downsample(data, field, flagger, sample_freq, agg_freq, sample_func="mean", return agg_func(x) else: return np.nan + else: def aggregator(x): return agg_func(x) + else: - dummy_resampler = pd.Series(np.nan, index=[pd.Timedelta('1min')]).resample('1min') + dummy_resampler = pd.Series(np.nan, index=[pd.Timedelta("1min")]).resample("1min") if hasattr(dummy_resampler, sample_func.__name__): sample_func_name = sample_func.__name__ if max_invalid < np.inf: + def aggregator(x): y = getattr(x.resample(sample_freq), sample_func_name)() if y.isna().sum() < max_invalid: @@ -915,18 +845,22 @@ def downsample(data, field, flagger, sample_freq, agg_freq, sample_func="mean", return np.nan else: + def aggregator(x): return agg_func(getattr(x.resample(sample_freq), sample_func_name)()) else: if max_invalid < np.inf: + def aggregator(x): y = x.resample(sample_freq).apply(sample_func) if y.isna().sum() < max_invalid: return agg_func(y) else: return np.nan + else: + def aggregator(x): return agg_func(x.resample(sample_freq).apply(sample_func)) @@ -935,9 +869,10 @@ def downsample(data, field, flagger, sample_freq, agg_freq, sample_func="mean", field, flagger, agg_freq, - inter_method='bagg', - reshape_method='bagg_no_deharm', + inter_method="bagg", + reshape_method="bagg_no_deharm", inter_agg=aggregator, reshape_agg="max", drop_flags=invalid_flags, - **kwargs) + **kwargs, + ) diff --git a/saqc/funcs/machine_learning.py b/saqc/funcs/machine_learning.py index ad7152e6553717ba99d63945e33e22e987e07b8d..b13c4718bc0de283e986fd53ab6734823ff0ab9f 100644 --- a/saqc/funcs/machine_learning.py +++ b/saqc/funcs/machine_learning.py @@ -19,25 +19,13 @@ def _refCalc(reference, window_values): outdata[name + "_Dt_1"].rolling(window_values, center=False).mean() ) # mean gradient t to t-window outdata[name + "_Dt" + str(window_values)] = ( - outdata[name + "_Dt_1"] - .iloc[::-1] - .rolling(window_values, center=False) - .mean()[::-1] + outdata[name + "_Dt_1"].iloc[::-1].rolling(window_values, center=False).mean()[::-1] ) # mean gradient t to t+window return outdata @register("machinelearning") -def flagML( - data, - field, - flagger, - references, - window_values: int, - window_flags: int, - path: str, - **kwargs -): +def flagML(data, field, flagger, references, window_values: int, window_flags: int, path: str, **kwargs): """This Function uses pre-trained machine-learning model objects for flagging of a specific variable. The model is supposed to be trained using the script provided in "ressources/machine_learning/train_machine_learning.py". For flagging, Inputs to the model are the timeseries of the respective target at one specific sensors, the automatic flags that were assigned by SaQC as well as multiple reference series. @@ -53,7 +41,6 @@ def flagML( :param path: A string giving the path to the respective model object, i.e. its name and the respective value of the grouping variable. e.g. "models/model_0.2.pkl" """ - # Function for moving window calculations # Create custom df for easier processing df = data.loc[:, [field] + references] @@ -75,9 +62,7 @@ def flagML( # Add context information for field+references for i in [field] + references: - df = pd.concat( - [df, _refCalc(reference=df[i], window_values=window_values)], axis=1 - ) + df = pd.concat([df, _refCalc(reference=df[i], window_values=window_values)], axis=1) # remove rows that contain NAs (new ones occured during predictor calculation) df = df.dropna(axis=0, how="any") diff --git a/saqc/funcs/soil_moisture_tests.py b/saqc/funcs/soil_moisture_tests.py index d5b591492451a64b6416aec743c8f2ab277d8d78..cdae3dd7c85a851d48a40e6fc925c55fc7b4e533 100644 --- a/saqc/funcs/soil_moisture_tests.py +++ b/saqc/funcs/soil_moisture_tests.py @@ -91,15 +91,7 @@ def flagSoilMoistureBreaks( @register("soilMoisture_frost") -def flagSoilMoistureBySoilFrost( - data, - field, - flagger, - soil_temp_variable, - window="1h", - frost_thresh=0, - **kwargs -): +def flagSoilMoistureBySoilFrost(data, field, flagger, soil_temp_variable, window="1h", frost_thresh=0, **kwargs): """This Function is an implementation of the soil temperature based Soil Moisture flagging, as presented in: @@ -127,9 +119,9 @@ def flagSoilMoistureBySoilFrost( # retrieve reference series refseries = data[soil_temp_variable].copy() - ref_use = flagger.isFlagged( - soil_temp_variable, flag=flagger.GOOD, comparator="==" - ) | flagger.isFlagged(soil_temp_variable, flag=flagger.UNFLAGGED, comparator="==") + ref_use = flagger.isFlagged(soil_temp_variable, flag=flagger.GOOD, comparator="==") | flagger.isFlagged( + soil_temp_variable, flag=flagger.UNFLAGGED, comparator="==" + ) # drop flagged values: refseries = refseries[ref_use.values] # drop nan values from reference series, since those are values you dont want to refer to. @@ -212,7 +204,6 @@ def flagSoilMoistureByPrecipitationEvents( :param ignore_missing: """ - dataseries, moist_rate = retrieveTrustworthyOriginal(data, field, flagger) # data not hamronized: @@ -223,29 +214,29 @@ def flagSoilMoistureByPrecipitationEvents( if refseries.empty: return data, flagger - refseries = refseries.reindex(refseries.index.join(dataseries.index, how='outer')) + refseries = refseries.reindex(refseries.index.join(dataseries.index, how="outer")) # get 24 h prec. monitor prec_count = refseries.rolling(window="1D").sum() # exclude data not signifying a raise:: if raise_window is None: raise_window = 1 else: - raise_window = int(np.ceil(pd.Timedelta(raise_window)/moist_rate)) + raise_window = int(np.ceil(pd.Timedelta(raise_window) / moist_rate)) # first raise condition: raise_mask = dataseries > dataseries.shift(raise_window) # second raise condition: - std_window = int(np.ceil(pd.Timedelta(std_window)/moist_rate)) + std_window = int(np.ceil(pd.Timedelta(std_window) / moist_rate)) if ignore_missing: std_mask = dataseries.dropna().rolling(std_window).std() < ( - (dataseries - dataseries.shift(std_window)) / std_factor) + (dataseries - dataseries.shift(std_window)) / std_factor + ) else: - std_mask = dataseries.rolling(std_window).std() < ( - (dataseries - dataseries.shift(std_window)) / std_factor) + std_mask = dataseries.rolling(std_window).std() < ((dataseries - dataseries.shift(std_window)) / std_factor) dataseries = dataseries[raise_mask & std_mask] - invalid_indices = (prec_count[dataseries.index] <= sensor_depth*sensor_accuracy*soil_porosity) + invalid_indices = prec_count[dataseries.index] <= sensor_depth * sensor_accuracy * soil_porosity invalid_indices = invalid_indices[invalid_indices] # set Flags @@ -286,11 +277,13 @@ def flagSoilMoistureConstant( # get plateaus: _, comp_flagger = flagConstantVarianceBased( - data, field, flagger, + data, + field, + flagger, window=window, thresh=thresh, max_missing=max_missing, - max_consec_missing=max_consec_missing + max_consec_missing=max_consec_missing, ) new_plateaus = (comp_flagger.getFlags(field)).eq(flagger.getFlags(field)) @@ -308,20 +301,20 @@ def flagSoilMoistureConstant( # get plateau groups: group_counter = new_plateaus.cumsum() group_counter = group_counter[group_counter.diff() == 0] - group_counter.name = 'group_counter' - plateau_groups = pd.merge(group_counter, dataseries, left_index=True, right_index=True, how='inner') + group_counter.name = "group_counter" + plateau_groups = pd.merge(group_counter, dataseries, left_index=True, right_index=True, how="inner") # test mean-condition on plateau groups: - test_barrier = tolerance*dataseries.max() - plateau_group_drops = plateau_groups.groupby('group_counter').filter(lambda x: x[field].mean() <= test_barrier) + test_barrier = tolerance * dataseries.max() + plateau_group_drops = plateau_groups.groupby("group_counter").filter(lambda x: x[field].mean() <= test_barrier) # discard values that didnt pass the test from plateau candidate series: new_plateaus[plateau_group_drops.index] = 1 # we extend the plateaus to cover condition testing sets # 1: extend backwards (with a technical "one" added): - cond1_sets = new_plateaus.replace(1, method='bfill', limit=(precipitation_window + window)) + cond1_sets = new_plateaus.replace(1, method="bfill", limit=(precipitation_window + window)) # 2. extend forwards: if period_diff > 0: - cond1_sets = cond1_sets.replace(1, method='ffill', limit=period_diff) + cond1_sets = cond1_sets.replace(1, method="ffill", limit=period_diff) # get first derivative if smooth_window is None: @@ -331,21 +324,15 @@ def flagSoilMoistureConstant( first_derivative = dataseries.diff() filter_window_seconds = smooth_window.seconds smoothing_periods = int(np.ceil((filter_window_seconds / moist_rate.n))) - first_derivate = savgol_filter( - dataseries, - window_length=smoothing_periods, - polyorder=smooth_poly_deg, - deriv=1, - ) + first_derivate = savgol_filter(dataseries, window_length=smoothing_periods, polyorder=smooth_poly_deg, deriv=1,) first_derivate = pd.Series(data=first_derivate, index=dataseries.index, name=dataseries.name) # cumsumming to seperate continous plateau groups from each other: group_counter = cond1_sets.cumsum() group_counter = group_counter[group_counter.diff() == 0] - group_counter.name = 'group_counter' - group_frame = pd.merge(group_counter, first_derivate, left_index=True, right_index=True, how='inner') - group_frame = group_frame.groupby('group_counter') - condition_passed = group_frame.filter( - lambda x: (x[field].max() >= deriv_max) & (x[field].min() <= deriv_min)) + group_counter.name = "group_counter" + group_frame = pd.merge(group_counter, first_derivate, left_index=True, right_index=True, how="inner") + group_frame = group_frame.groupby("group_counter") + condition_passed = group_frame.filter(lambda x: (x[field].max() >= deriv_max) & (x[field].min() <= deriv_min)) flagger = flagger.setFlags(field, loc=condition_passed.index, **kwargs) diff --git a/saqc/funcs/spike_detection.py b/saqc/funcs/spike_detection.py index 3ba7e6bc94ae28e1ef4d9e49bc8283da733b522e..18ade571ff2820ac20f02fd5a11b295450d35030 100644 --- a/saqc/funcs/spike_detection.py +++ b/saqc/funcs/spike_detection.py @@ -19,9 +19,7 @@ from saqc.lib.tools import ( @register("spikes_slidingZscore") -def flagSpikes_slidingZscore( - data, field, flagger, window, offset, count=1, polydeg=1, z=3.5, method="modZ", **kwargs -): +def flagSpikes_slidingZscore(data, field, flagger, window, offset, count=1, polydeg=1, z=3.5, method="modZ", **kwargs): """ A outlier detection in a sliding window. The method for detection can be a simple Z-score or the more robust modified Z-score, as introduced here [1]. @@ -74,9 +72,7 @@ def flagSpikes_slidingZscore( if dx_s >= winsz_s and count == 1: pass elif dx_s >= winsz_s and count > 1: - ValueError( - "If stepsize `offset` is bigger that the window-size, every value is seen just once, so use count=1" - ) + ValueError("If stepsize `offset` is bigger that the window-size, every value is seen just once, so use count=1") elif count > winsz_s // dx_s: raise ValueError( f"Adjust `offset`, `stepsize` or `window`. A single data point is " @@ -219,9 +215,7 @@ def flagSpikes_basic(data, field, flagger, thresh=7, tolerance=0, window="15min" pre_jumps = dataseries.diff(periods=-1).abs() > thresh pre_jumps = pre_jumps[pre_jumps] # get all the entries preceeding a significant jump and its successors within "length" range - to_roll = pre_jumps.reindex( - dataseries.index, method="ffill", tolerance=window, fill_value=False - ).dropna() + to_roll = pre_jumps.reindex(dataseries.index, method="ffill", tolerance=window, fill_value=False).dropna() # define spike testing function to roll with: def spike_tester(chunk, pre_jumps_index, thresh, tol): @@ -349,21 +343,16 @@ def flagSpikes_spektrumBased( """ dataseries, data_rate = retrieveTrustworthyOriginal(data, field, flagger) - noise_func_map = { - "covar": pd.Series.var, - "rvar": pd.Series.std - } + noise_func_map = {"covar": pd.Series.var, "rvar": pd.Series.std} noise_func = noise_func_map[noise_func.lower()] if smooth_window is None: - smooth_window = 3*pd.Timedelta(data_rate) + smooth_window = 3 * pd.Timedelta(data_rate) else: smooth_window = pd.Timedelta(smooth_window) quotient_series = dataseries / dataseries.shift(+1) - spikes = (quotient_series > (1 + raise_factor)) | ( - quotient_series < (1 - raise_factor) - ) + spikes = (quotient_series > (1 + raise_factor)) | (quotient_series < (1 - raise_factor)) spikes = spikes[spikes == True] # loop through spikes: (loop may sound ugly - but since the number of spikes is supposed to not exceed the @@ -385,16 +374,11 @@ def flagSpikes_spektrumBased( end_slice = spike + smooth_window scnd_derivate = savgol_filter( - dataseries[start_slice:end_slice], - window_length=smoothing_periods, - polyorder=smooth_ploy_deg, - deriv=2, + dataseries[start_slice:end_slice], window_length=smoothing_periods, polyorder=smooth_ploy_deg, deriv=2, ) length = scnd_derivate.size - test_ratio_1 = np.abs( - scnd_derivate[int(((length + 1) / 2) - 2)] / scnd_derivate[int(((length + 1) / 2))] - ) + test_ratio_1 = np.abs(scnd_derivate[int(((length + 1) / 2) - 2)] / scnd_derivate[int(((length + 1) / 2))]) if lower_dev_bound < test_ratio_1 < upper_dev_bound: # apply noise condition: diff --git a/saqc/lib/plotting.py b/saqc/lib/plotting.py index 2373654974d374dc03d42ab5d5685f9281c77d12..9676432069b2cab519b039cc3e82f31f3a246a5a 100644 --- a/saqc/lib/plotting.py +++ b/saqc/lib/plotting.py @@ -10,21 +10,24 @@ from saqc.flagger import BaseFlagger __plotvars = [] -_colors = { - "unflagged": "silver", - "good": "seagreen", - "bad": "firebrick", - "suspicious": "gold" -} +_colors = {"unflagged": "silver", "good": "seagreen", "bad": "firebrick", "suspicious": "gold"} + +_figsize = (10, 4) -_figsize = (10,4) def plotAllHook(data, flagger, plot_nans=False): if __plotvars: _plot(data, flagger, True, __plotvars, plot_nans=plot_nans) -def plotHook(data: pd.DataFrame, flagger_old: BaseFlagger, flagger_new: BaseFlagger, varname: str, flag_test: str, plot_nans: bool = False): +def plotHook( + data: pd.DataFrame, + flagger_old: BaseFlagger, + flagger_new: BaseFlagger, + varname: str, + flag_test: str, + plot_nans: bool = False, +): # NOTE: # data might have been harmonized (recognizable by @@ -53,20 +56,14 @@ def plotHook(data: pd.DataFrame, flagger_old: BaseFlagger, flagger_new: BaseFlag # harmonization, nothing to see here return - mask = (flags_old != flags_new) + mask = flags_old != flags_new __plotvars.append(varname) _plot(data, flagger_new, mask, varname, title=flag_test, plot_nans=plot_nans) def _plot( - data, - flagger, - flagmask, - varname, - interactive_backend=True, - title="Data Plot", - plot_nans=True, + data, flagger, flagmask, varname, interactive_backend=True, title="Data Plot", plot_nans=True, ): # todo: try catch warn (once) return @@ -74,6 +71,7 @@ def _plot( import matplotlib as mpl import matplotlib.pyplot as plt from pandas.plotting import register_matplotlib_converters + # needed for datetime conversion register_matplotlib_converters() @@ -102,7 +100,7 @@ def _plot( plots = len(varnames) if plots > 1: - fig, axes = plt.subplots(plots, 1, sharex=True,figsize=_figsize) + fig, axes = plt.subplots(plots, 1, sharex=True, figsize=_figsize) axes[0].set_title(title) for i, v in enumerate(varnames): _plotByQualityFlag(data, v, flagger, flagmask, axes[i], plot_nans) @@ -136,17 +134,19 @@ def _plotByQualityFlag(data, varname, flagger, flagmask, ax, plot_nans): data, # NOTE: no lines to flagged points # data.index, np.ma.array(data.values, mask=flagger.isFlagged(varname).values), - "-", color="silver", label="data" + "-", + color="silver", + label="data", ) # ANY OLD FLAG # plot all(!) data that are already flagged in black - flagged = flagger.isFlagged(varname, flag=flagger.GOOD, comparator='>=') + flagged = flagger.isFlagged(varname, flag=flagger.GOOD, comparator=">=") oldflags = flagged & ~flagmask ax.plot(data[oldflags], ".", color="black", label="flagged by other test") if plot_nans: - _plotNans(data[oldflags], 'black', ax) + _plotNans(data[oldflags], "black", ax) # now we just want to show data that was flagged if flagmask is not True: @@ -156,25 +156,21 @@ def _plotByQualityFlag(data, varname, flagger, flagmask, ax, plot_nans): if data.empty: return - plots = [ - (flagger.UNFLAGGED, _colors["unflagged"]), - (flagger.GOOD, _colors['good']), - (flagger.BAD, _colors['bad']) - ] + plots = [(flagger.UNFLAGGED, _colors["unflagged"]), (flagger.GOOD, _colors["good"]), (flagger.BAD, _colors["bad"])] for flag, color in plots: - flagged = flagger.isFlagged(varname, flag=flag, comparator='==') + flagged = flagger.isFlagged(varname, flag=flag, comparator="==") if not data[flagged].empty: - ax.plot(data[flagged], '.', color=color, label=f"flag: {flag}") + ax.plot(data[flagged], ".", color=color, label=f"flag: {flag}") if plot_nans: _plotNans(data[flagged], color, ax) # plot SUSPICIOS - color = _colors['suspicious'] - flagged = flagger.isFlagged(varname, flag=flagger.GOOD, comparator='>') - flagged &= flagger.isFlagged(varname, flag=flagger.BAD, comparator='<') + color = _colors["suspicious"] + flagged = flagger.isFlagged(varname, flag=flagger.GOOD, comparator=">") + flagged &= flagger.isFlagged(varname, flag=flagger.BAD, comparator="<") if not data[flagged].empty: - ax.plot(data[flagged], '.', color=color, label=f"{flagger.GOOD} < flag < {flagger.BAD}") + ax.plot(data[flagged], ".", color=color, label=f"{flagger.GOOD} < flag < {flagger.BAD}") if plot_nans: _plotNans(data[flagged], color, ax) diff --git a/saqc/lib/tools.py b/saqc/lib/tools.py index 29baf036ea7dfc3b14ce21e1f05bb1b6119c3866..9323c56d19e08b055b1ff16763fc8b14e616a9ec 100644 --- a/saqc/lib/tools.py +++ b/saqc/lib/tools.py @@ -12,15 +12,16 @@ import numba as nb from saqc.lib.types import T, PandasLike STRING_2_FUNC = { - 'sum': np.sum, - 'mean': np.mean, - 'median': np.median, - 'min': np.min, - 'max': np.max, - 'first': pd.Series(np.nan, index=pd.DatetimeIndex([])).resample('0min').first, - 'last': pd.Series(np.nan, index=pd.DatetimeIndex([])).resample('0min').last + "sum": np.sum, + "mean": np.mean, + "median": np.median, + "min": np.min, + "max": np.max, + "first": pd.Series(np.nan, index=pd.DatetimeIndex([])).resample("0min").first, + "last": pd.Series(np.nan, index=pd.DatetimeIndex([])).resample("0min").last, } + def assertScalar(name, value, optional=False): if (not np.isscalar(value)) and (value is not None) and (optional is True): raise ValueError(f"'{name}' needs to be a scalar or 'None'") @@ -28,9 +29,7 @@ def assertScalar(name, value, optional=False): raise ValueError(f"'{name}' needs to be a scalar") -def toSequence( - value: Union[T, Sequence[T]], default: Union[T, Sequence[T]] = None -) -> Sequence[T]: +def toSequence(value: Union[T, Sequence[T]], default: Union[T, Sequence[T]] = None) -> Sequence[T]: if value is None: value = default if np.isscalar(value): @@ -116,7 +115,7 @@ def inferFrequency(data: PandasLike) -> pd.DateOffset: return pd.tseries.frequencies.to_offset(pd.infer_freq(data.index)) -def combineDataFrames(left: pd.DataFrame, right: pd.DataFrame, fill_value: float=np.nan) -> pd.DataFrame: +def combineDataFrames(left: pd.DataFrame, right: pd.DataFrame, fill_value: float = np.nan) -> pd.DataFrame: """ Combine the given DataFrames 'left' and 'right' such that, the output is union of the indices and the columns of both. In case @@ -125,7 +124,7 @@ def combineDataFrames(left: pd.DataFrame, right: pd.DataFrame, fill_value: float combined = left.reindex( index=left.index.union(right.index), columns=left.columns.union(right.columns, sort=False), - fill_value=fill_value + fill_value=fill_value, ) for key, values in right.iteritems(): @@ -134,7 +133,7 @@ def combineDataFrames(left: pd.DataFrame, right: pd.DataFrame, fill_value: float return combined -def retrieveTrustworthyOriginal(data: pd.DataFrame, field: str, flagger=None, level: Any=None) -> pd.DataFrame: +def retrieveTrustworthyOriginal(data: pd.DataFrame, field: str, flagger=None, level: Any = None) -> pd.DataFrame: """Columns of data passed to the saqc runner may not be sampled to its original sampling rate - thus differenciating between missng value - nans und fillvalue nans is impossible. @@ -178,7 +177,7 @@ def retrieveTrustworthyOriginal(data: pd.DataFrame, field: str, flagger=None, le # estimate original data sampling frequencie # (the original series sampling rate may not match data-input sample rate): seconds_rate = dataseries.index.to_series().diff().min().seconds - data_rate = pd.tseries.frequencies.to_offset(str(seconds_rate) + 's') + data_rate = pd.tseries.frequencies.to_offset(str(seconds_rate) + "s") return dataseries.asfreq(data_rate), data_rate @@ -192,9 +191,7 @@ def offset2seconds(offset): return pd.Timedelta.total_seconds(pd.Timedelta(offset)) -def flagWindow( - flagger_old, flagger_new, field, direction="fw", window=0, **kwargs -) -> pd.Series: +def flagWindow(flagger_old, flagger_new, field, direction="fw", window=0, **kwargs) -> pd.Series: if window == 0 or window == "": return flagger_new @@ -305,9 +302,7 @@ def assertSeries(srs: Any, argname: str = "arg") -> None: def assertPandas(pdlike: PandasLike, argname: str = "arg", allow_multiindex: bool = True) -> None: if not isinstance(pdlike, pd.Series) and not isinstance(pdlike, pd.DataFrame): - raise TypeError( - f"{argname} must be of type pd.DataFrame or pd.Series, {type(pdlike)} was given" - ) + raise TypeError(f"{argname} must be of type pd.DataFrame or pd.Series, {type(pdlike)} was given") if not allow_multiindex: assertSingleColumns(pdlike, argname) @@ -323,9 +318,7 @@ def assertMultiColumns(dfmi: pd.DataFrame, argname: str = "") -> None: def assertSingleColumns(df: PandasLike, argname: str = "") -> None: if isinstance(df, pd.DataFrame) and isinstance(df.columns, pd.MultiIndex): - raise TypeError( - f"given pd.DataFrame {argname} is not allowed to have a muliindex on columns" - ) + raise TypeError(f"given pd.DataFrame {argname} is not allowed to have a muliindex on columns") def getFuncFromInput(func): @@ -377,4 +370,3 @@ def groupConsecutives(series: pd.Series) -> Iterator[pd.Series]: break yield pd.Series(data=values[start:stop], index=index[start:stop]) start = stop - diff --git a/setup.py b/setup.py index d919dedcf0736417eb2d1c798be386c8bd135d11..310af649d789fa789467ebdeb2ec14b0e525ee92 100644 --- a/setup.py +++ b/setup.py @@ -1,19 +1,19 @@ -import setuptools +from setuptools import setup, find_packages with open("README.md", "r") as fh: long_description = fh.read() -setuptools.setup( +setup( name="saqc", version="1.0.0", author="Bert Palm, David Schaefer, Peter Luenenschloss, Lennard Schmidt", - author_email="bert.palm@ufz.de, david.schaefer@ufz.de, peter.luenenschloss@ufz.de, lennart.schmidt@ufz.de", + author_email="david.schaefer@ufz.de", description="Data quality checking and processing framework", long_description=long_description, long_description_content_type="text/markdown", url="https://git.ufz.de/rdm-software/saqc", - packages=["saqc"], + packages=find_packages(), install_requires=[ "numpy", "pandas", @@ -26,7 +26,5 @@ setuptools.setup( "python-intervals", ], license="GPLv3", - entry_points = { - 'console_scripts': ['saqc=saqc.__main__:main'], - } + entry_points={"console_scripts": ["saqc=saqc.__main__:main"],}, )