Skip to content
Snippets Groups Projects

Follow-Up Translations

Merged David Schäfer requested to merge translations into develop
Compare and Show latest version
1 file
+ 2
3
Compare changes
  • Side-by-side
  • Inline
+ 76
61
@@ -8,28 +8,25 @@ from __future__ import annotations
import logging
import copy as stdcopy
from typing import List, Tuple, Sequence, Union
from dios.dios import DictOfSeries
from typing import Tuple, Sequence, Union, Optional
from typing_extensions import Literal
import inspect
import pandas as pd
import dios
import numpy as np
import timeit
import inspect
from saqc.constants import *
from dios import DictOfSeries, to_dios
from saqc.core.flags import initFlagsLike, Flags
from saqc.core.lib import APIController, ColumnSelector
from saqc.core.register import FUNC_MAP, SaQCFunction
from saqc.core.modules import FuncModules
from saqc.funcs.tools import copy
from saqc.lib.plotting import plotHook, plotAllHook
from saqc.core.translator import FloatTranslator, Translator
from saqc.lib.types import UserFlag
from saqc.core.translator.basetranslator import Translator, FloatTranslator
from saqc.lib.types import ExternalFlag, CallGraph, MaterializedGraph, PandasLike
from saqc.constants import BAD
logger = logging.getLogger("SaQC")
@@ -59,8 +56,10 @@ def _handleErrors(
# TODO: shouldt the code/function go to Saqc.__init__ ?
def _prepInput(data, flags):
dios_like = (dios.DictOfSeries, pd.DataFrame)
def _prepInput(
data: PandasLike, flags: Optional[Union[DictOfSeries, pd.DataFrame, Flags]]
) -> Tuple[DictOfSeries, Optional[Flags]]:
dios_like = (DictOfSeries, pd.DataFrame)
if isinstance(data, pd.Series):
data = data.to_frame()
@@ -75,7 +74,7 @@ def _prepInput(data, flags):
data.columns, pd.MultiIndex
):
raise TypeError("'data' should not use MultiIndex")
data = dios.to_dios(data)
data = to_dios(data)
if not hasattr(data.columns, "str"):
raise TypeError("expected dataframe columns of type string")
@@ -88,7 +87,7 @@ def _prepInput(data, flags):
):
raise TypeError("'flags' should not use MultiIndex")
if isinstance(flags, (dios.DictOfSeries, pd.DataFrame, Flags)):
if isinstance(flags, (DictOfSeries, pd.DataFrame, Flags)):
# NOTE: only test common columns, data as well as flags could
# have more columns than the respective other.
cols = flags.columns.intersection(data.columns)
@@ -124,26 +123,32 @@ class SaQC(FuncModules):
self,
data,
flags=None,
translator: Translator = None,
scheme: Translator = None,
nodata=np.nan,
to_mask=None,
error_policy="raise",
):
super().__init__(self)
data, flags = _prepInput(data, flags)
self._data = data
self._nodata = nodata
self._to_mask = to_mask
self._flags = self._initFlags(data, flags)
self._error_policy = error_policy
if translator is None:
translator = FloatTranslator()
self._translator = translator
# NOTE: will be filled by calls to `_wrap`
self._to_call: List[Tuple[ColumnSelector, APIController, SaQCFunction]] = []
def _initFlags(self, data, flags: Union[Flags, None]):
"""Init the internal Flags-object.
self._translator = scheme or FloatTranslator()
# NOTE:
# We need two lists to represent the future and the past computations
# on a `SaQC`-Object. Due to the dynamic nature of field expansion
# with regular expressions, we can't just reuse the original execution
# plan to infer all translation related information.
self._planned: CallGraph = [] # will be filled by calls to `_wrap`
self._computed: MaterializedGraph = self._translator.buildGraph(
self._flags
) # will be filled in `evaluate`
@staticmethod
def _initFlags(data: DictOfSeries, flags: Optional[Flags]) -> Flags:
"""
Init the internal Flags-object.
Ensures that all data columns are present and user passed
flags from a frame or an already initialised Flags-object
@@ -154,28 +159,47 @@ class SaQC(FuncModules):
# add columns that are present in data but not in flags
for c in data.columns.difference(flags.columns):
flags[c] = pd.Series(UNFLAGGED, index=data[c].index, dtype=float)
flags[c] = initFlagsLike(data[c])
return flags
def _constructSimple(self) -> SaQC:
return SaQC(
data=dios.DictOfSeries(),
def _construct(self, **injectables) -> SaQC:
"""
Construct a new `SaQC`-Object from `self` and optionally inject
attributes with any chechking and overhead.
Parameters
----------
**injectables: any of the `SaQC` data attributes with name and value
Note
----
For internal usage only! Setting values through `injectables` has
the potential to mess up certain invariants of the constructed object.
"""
out = SaQC(
data=DictOfSeries(),
flags=Flags(),
nodata=self._nodata,
to_mask=self._to_mask,
error_policy=self._error_policy,
scheme=self._translator,
)
for k, v in injectables.items():
if not hasattr(out, k):
raise AttributeError(f"failed to set unknown attribute: {k}")
setattr(out, k, v)
return out
def readConfig(self, fname):
from saqc.core.reader import readConfig
out = stdcopy.deepcopy(self)
out._to_call.extend(readConfig(fname, self._flags, self._nodata))
out._planned.extend(readConfig(fname, self._flags, self._nodata))
return out
@staticmethod
def _expandFields(
self, selector: ColumnSelector, func: SaQCFunction, variables: pd.Index
selector: ColumnSelector, func: SaQCFunction, variables: pd.Index
) -> Sequence[Tuple[ColumnSelector, SaQCFunction]]:
if not selector.regex:
return [(selector, func)]
@@ -196,8 +220,8 @@ class SaQC(FuncModules):
"""
Realize all the registered calculations and return a updated SaQC Object
Paramters
---------
Parameters
----------
Returns
-------
@@ -207,24 +231,26 @@ class SaQC(FuncModules):
# NOTE: It would be nicer to separate the plotting into an own
# method instead of intermingling it with the computation
data, flags = self._data, self._flags
for selector, control, function in self._to_call:
computed: MaterializedGraph = []
for selector, control, function in self._planned:
for sel, func in self._expandFields(
selector, function, data.columns.union(flags.columns)
):
logger.debug(f"processing: {sel.field}, {func.name}, {func.keywords}")
t0 = timeit.default_timer()
try:
data_result, flags_result = _saqcCallFunc(
sel, control, func, data, flags
)
# we check the passed function-kwargs after the actual call,
# because now "hard" errors would already have been raised
# (eg. `TypeError: got multiple values for argument 'data'`,
# when the user pass data=...)
_warnForUnusedKwargs(function, self._translator)
computed.append((sel, func))
except Exception as e:
t1 = timeit.default_timer()
_handleErrors(e, sel.field, control, func, self._error_policy)
continue
else:
t1 = timeit.default_timer()
if control.plot:
plotHook(
@@ -240,14 +266,12 @@ class SaQC(FuncModules):
data = data_result
flags = flags_result
if any([control.plot for _, control, _ in self._to_call]):
if any([control.plot for _, control, _ in self._planned]):
plotAllHook(data, flags)
# This is way faster for big datasets, than to throw everything in the constructor.
# Simply because of _initFlags -> merge() -> mergeDios() over all columns.
new = self._constructSimple()
new._flags, new._data = flags, data
return new
return self._construct(
_flags=flags, _data=data, _computed=self._computed + computed
)
def getResult(
self, raw=False
@@ -266,7 +290,7 @@ class SaQC(FuncModules):
if raw:
return data, flags
return data.to_df(), self._translator.backward(flags, self._to_call)
return data.to_df(), self._translator.backward(flags, realization._computed)
def _wrap(self, func: SaQCFunction):
def inner(
@@ -274,14 +298,13 @@ class SaQC(FuncModules):
*fargs,
target: str = None,
regex: bool = False,
flag: UserFlag = BAD,
flag: ExternalFlag = BAD,
plot: bool = False,
inplace: bool = False,
**fkwargs,
) -> SaQC:
if self._to_mask is not None:
fkwargs.setdefault("to_mask", self._to_mask)
fkwargs.setdefault("to_mask", self._translator.TO_MASK)
control = APIController(plot=plot)
@@ -293,15 +316,11 @@ class SaQC(FuncModules):
partial = func.bind(
*fargs,
**{
"nodata": self._nodata,
"flag": self._translator.forward(flag),
**fkwargs,
},
**{"nodata": self._nodata, "flag": self._translator(flag), **fkwargs},
)
out = self if inplace else self.copy(deep=True)
out._to_call.append((locator, control, partial))
out._planned.append((locator, control, partial))
return out
@@ -339,14 +358,10 @@ def _saqcCallFunc(locator, controller, function, data, flags):
data_result, flags_result = function(data, field, flags)
# we check the passed function-kwargs after the actual call, because now "hard" errors would already have been
# raised (Eg. `TypeError: got multiple values for argument 'data'`, when the user pass data=...)
_warnForUnusedKwargs(function)
return data_result, flags_result
def _warnForUnusedKwargs(func):
def _warnForUnusedKwargs(func, translator: Translator):
"""Warn for unused kwargs, passed to a SaQC.function.
Parameters
@@ -373,7 +388,7 @@ def _warnForUnusedKwargs(func):
# there is no need to check for
# `kw in [KEYWORD_ONLY, VAR_KEYWORD or POSITIONAL_OR_KEYWORD]`
# because this would have raised an error beforehand.
if kw not in sig_kws and kw not in ignore:
if kw not in sig_kws and kw not in ignore and kw not in translator.ARGUMENTS:
missing.append(kw)
if missing:
Loading