diff --git a/src/finam/data/tools.py b/src/finam/data/tools.py index 7989a63a159420d5dca43b82c23eb2e31da53417..08273bd0a64549a1c678cebdde6fb982cef27d51 100644 --- a/src/finam/data/tools.py +++ b/src/finam/data/tools.py @@ -20,7 +20,7 @@ UNITS = pint.application_registry _UNIT_PAIRS_CACHE = {} -def prepare(data, info, time_entries=1, force_copy=False): +def prepare(data, info, time_entries=1, force_copy=False, report_conversion=False): """ Prepares data in FINAM's internal transmission format. @@ -39,17 +39,26 @@ def prepare(data, info, time_entries=1, force_copy=False): Forces the result to be a copy of the passed data. Default `False`. If not used, the result is a view of the data if no units conversion needs to be done. + report_conversion : bool, optional + If true, returns a tuple with the second element indicating the unit conversion if it was required. Returns ------- - pint.Quantity + pint.Quantity or tuple(pint.Quantity, tuple(pint.Unit, pint.Unit) or None) The prepared data as a numpy array, wrapped into a :class:`pint.Quantity`. + If ``report_conversion`` is ``True``, a tuple is returned with the second element + indicating the unit conversion if it was required. + + The second element is ``None`` if no conversion was required, + and a tuple of two :class:`pint.Unit` objects otherwise. + Raises ------ FinamDataError If the data doesn't match its info. """ + units_converted = None units = info.units if isinstance(data, pint.Quantity): if not compatible_units(data.units, units): @@ -58,25 +67,32 @@ def prepare(data, info, time_entries=1, force_copy=False): f"Got {data.units}, expected {units}." ) if not isinstance(data.magnitude, np.ndarray): - data = np.asarray(data.magnitude) * data.units + if force_copy: + data = copy.copy(data.magnitude) + else: + data = data.magnitude + + data = UNITS.Quantity(np.asarray(data), data.units) if not equivalent_units(data.units, units): + units_converted = data.units, units data = data.to(units) elif force_copy: data = data.copy() else: if isinstance(data, np.ndarray): if force_copy: - data = data * units - else: - data = UNITS.Quantity(data, units) + data = np.copy(data) + data = UNITS.Quantity(data, units) else: if force_copy: - data = np.asarray(data) * units - else: - data = UNITS.Quantity(np.asarray(data), units) + data = copy.copy(data) + data = UNITS.Quantity(np.asarray(data), units) data = _check_input_shape(data, info, time_entries) + + if report_conversion: + return data, units_converted return data @@ -272,7 +288,7 @@ def get_dimensionality(xdata): return xdata.dimensionality -def to_units(xdata, units, check_equivalent=False): +def to_units(xdata, units, check_equivalent=False, report_conversion=False): """ Convert data to given units. @@ -284,20 +300,34 @@ def to_units(xdata, units, check_equivalent=False): Desired units. check_equivalent : bool, optional Checks for equivalent units and simply re-assigns if possible. + report_conversion : bool, optional + If true, returns a tuple with the second element indicating the unit conversion if it was required. Returns ------- - pint.Quantity - Converted data. + pint.Quantity or tuple(pint.Quantity, tuple(pint.Unit, pint.Unit) or None) + The converted data. + + If ``report_conversion`` is ``True``, a tuple is returned with the second element + indicating the unit conversion if it was required. + + The second element is ``None`` if no conversion was required, + and a tuple of two :class:`pint.Unit` objects otherwise. """ check_quantified(xdata, "to_units") units = _get_pint_units(units) units2 = xdata.units - if units == units2: - return xdata - if check_equivalent and equivalent_units(units, units2): - return UNITS.Quantity(xdata.magnitude, units) - return xdata.to(units) + conversion = None + if units != units2: + if check_equivalent and equivalent_units(units, units2): + xdata = UNITS.Quantity(xdata.magnitude, units) + else: + xdata = xdata.to(units) + conversion = units2, units + + if report_conversion: + return xdata, conversion + return xdata def full_like(xdata, value): diff --git a/src/finam/schedule.py b/src/finam/schedule.py index 1b98b6750a9a4c05d7b6bb16016a2573e6a135af..171c5291f000a86913849c3c11de9b7d946fcec9 100644 --- a/src/finam/schedule.py +++ b/src/finam/schedule.py @@ -132,7 +132,7 @@ class Composition(Loggable): After the call, module inputs and outputs are available for linking. """ - self.logger.debug("init composition") + self.logger.info("init composition") if self.is_initialized: raise FinamStatusError("Composition was already initialized.") @@ -203,7 +203,7 @@ class Composition(Loggable): self._connect_components(start_time) - self.logger.debug("validate components") + self.logger.info("validate components") for mod in self.modules: mod.validate() self._check_status(mod, [ComponentStatus.VALIDATED]) @@ -245,7 +245,7 @@ class Composition(Loggable): if not self.is_connected: self.connect(start_time) - self.logger.debug("running composition") + self.logger.info("run composition") while len(time_modules) > 0: sort_modules = list(time_modules) sort_modules.sort(key=lambda m: m.time) @@ -324,7 +324,7 @@ class Composition(Loggable): def _validate_composition(self): """Validates the coupling setup by checking for dangling inputs and disallowed branching connections.""" - self.logger.debug("validate composition") + self.logger.info("validate composition") for mod in self.modules: with ErrorLogger(mod.logger if is_loggable(mod) else self.logger): for inp in mod.inputs.values(): @@ -338,7 +338,7 @@ class Composition(Loggable): _check_missing_modules(self.modules) def _connect_components(self, time): - self.logger.debug("connect components") + self.logger.info("connect components") counter = 0 while True: self.logger.debug("connect iteration %d", counter) @@ -380,7 +380,7 @@ class Composition(Loggable): counter += 1 def _finalize_components(self): - self.logger.debug("finalize components") + self.logger.info("finalize components") for mod in self.modules: self._check_status( mod, @@ -404,7 +404,7 @@ class Composition(Loggable): ada.finalize() def _finalize_composition(self): - self.logger.debug("finalize composition") + self.logger.info("finalize composition") handlers = self.logger.handlers[:] for handler in handlers: self.logger.removeHandler(handler) diff --git a/src/finam/sdk/adapter.py b/src/finam/sdk/adapter.py index 14bcb03d0bb03e822a6653363307214ee00b0fb7..fdf86ff71068ffc3fef59ef2e957e2dc7383ab43 100644 --- a/src/finam/sdk/adapter.py +++ b/src/finam/sdk/adapter.py @@ -90,7 +90,7 @@ class Adapter(IAdapter, Input, Output, ABC): time : :class:`datetime <datetime.datetime>` Simulation time of the notification. """ - self.logger.debug("source changed") + self.logger.debug("source updated") if time is not None and not isinstance(time, datetime): with ErrorLogger(self.logger): raise ValueError("Time must be of type datetime") @@ -107,7 +107,7 @@ class Adapter(IAdapter, Input, Output, ABC): time : :class:`datetime <datetime.datetime>` Simulation time of the simulation. """ - self.logger.debug("notify targets") + self.logger.trace("notify targets") if time is not None and not isinstance(time, datetime): with ErrorLogger(self.logger): raise ValueError("Time must be of type datetime") @@ -154,7 +154,12 @@ class Adapter(IAdapter, Input, Output, ABC): data = self._get_data(time, target) with ErrorLogger(self.logger): - return tools.prepare(data, self._output_info) + xdata, conv = tools.prepare(data, self._output_info, report_conversion=True) + if conv is not None: + self.logger.profile( + "converted units from %s to %s (%d entries)", *conv, xdata.size + ) + return xdata def _get_data(self, time, target): """Get the transformed data of this adapter. @@ -195,7 +200,7 @@ class Adapter(IAdapter, Input, Output, ABC): FinamNoDataError Raises the error if no info is available """ - self.logger.debug("get info") + self.logger.trace("get info") self._output_info = self._get_info(info) return self._output_info @@ -234,7 +239,7 @@ class Adapter(IAdapter, Input, Output, ABC): Info delivered parameters """ - self.logger.debug("exchanging info") + self.logger.trace("exchanging info") with ErrorLogger(self.logger): if info is None: raise FinamMetaDataError("No metadata provided") @@ -255,7 +260,7 @@ class Adapter(IAdapter, Input, Output, ABC): source : source output or adapter """ - self.logger.debug("set source") + self.logger.trace("set source") # fix to set base-logger for adapters derived from Input source logger if self.uses_base_logger_name and not is_loggable(source): with ErrorLogger(self.logger): @@ -326,7 +331,12 @@ class TimeDelayAdapter(Adapter, ITimeDelayAdapter, ABC): self._pulled(time) with ErrorLogger(self.logger): - return tools.prepare(data, self._output_info) + xdata, conv = tools.prepare(data, self._output_info, report_conversion=True) + if conv is not None: + self.logger.profile( + "converted units from %s to %s (%d entries)", *conv, xdata.size + ) + return xdata def _get_data(self, time, target): """Get the output's data-set for the given time. @@ -377,7 +387,7 @@ class TimeDelayAdapter(Adapter, ITimeDelayAdapter, ABC): FinamNoDataError Raises the error if no info is available """ - self.logger.debug("get info") + self.logger.trace("get info") self._output_info = self._get_info(info) self.initial_time = self._output_info.time return self._output_info diff --git a/src/finam/sdk/component.py b/src/finam/sdk/component.py index 24c1fd7316c02a3aca50a9dd24b141bbd70b0e4a..aa4d3a1e62590f07d30ae227548d79e621f2bf8e 100644 --- a/src/finam/sdk/component.py +++ b/src/finam/sdk/component.py @@ -146,9 +146,10 @@ class Component(IComponent, Loggable, ABC): After the method call, the component should have :attr:`.status` :attr:`.ComponentStatus.UPDATED` or :attr:`.ComponentStatus.FINISHED`. """ - self.logger.debug("update") if isinstance(self, ITimeComponent): - self.logger.debug("current time: %s", self.time) + self.logger.debug("update - current time: %s", self.time) + else: + self.logger.debug("update") self._update() @@ -335,7 +336,7 @@ class Component(IComponent, Loggable, ABC): Rules are evaluated in the given order. Later rules can overwrite attributes set by earlier rules. """ - self.logger.debug("create connector") + self.logger.trace("create connector") self._connector = ConnectHelper( self.logger_name, self.inputs, @@ -373,7 +374,7 @@ class Component(IComponent, Loggable, ABC): push_data : dict of [str, array-like] currently or newly available output data by output name """ - self.logger.debug("try connect") + self.logger.trace("try connect") if self._connector is None: raise FinamStatusError( @@ -386,7 +387,7 @@ class Component(IComponent, Loggable, ABC): push_infos=push_infos, push_data=push_data, ) - self.logger.debug("try_connect status is %s", self.status) + self.logger.trace("try_connect status is %s", self.status) def __getitem__(self, name): """Get an input or output by name. Implements access through square brackets. diff --git a/src/finam/sdk/input.py b/src/finam/sdk/input.py index 2a8a951ac184448011a600a8eee08c66c6f429fd..ec9ba53621e7ea95a231f18c7a86b25f7d872e6e 100644 --- a/src/finam/sdk/input.py +++ b/src/finam/sdk/input.py @@ -57,7 +57,7 @@ class Input(IInput, Loggable): source : :class:`.IOutput` source output or adapter """ - self.logger.debug("set source") + self.logger.trace("set source") with ErrorLogger(self.logger): if self.source is not None: @@ -88,7 +88,7 @@ class Input(IInput, Loggable): time : :class:`datetime <datetime.datetime>` Simulation time of the notification. """ - self.logger.debug("source changed") + self.logger.trace("source changed") def pull_data(self, time, target=None): """Retrieve the data from the input's source. @@ -108,7 +108,7 @@ class Input(IInput, Loggable): :class:`pint.Quantity` Data set for the given simulation time. """ - self.logger.debug("pull data") + self.logger.trace("pull data") if time is not None and not isinstance(time, datetime): with ErrorLogger(self.logger): @@ -116,15 +116,27 @@ class Input(IInput, Loggable): if self.is_static: if self._cached_data is None: - self._cached_data = self.source.get_data(time, target or self) + with ErrorLogger(self.logger): + data = self.source.get_data(time, target or self) + self._cached_data = self._convert_and_check(data) + data = self._cached_data else: data = self.source.get_data(time, target or self) + with ErrorLogger(self.logger): + data = self._convert_and_check(data) - with ErrorLogger(self.logger): - data = tools.to_units(data, self._input_info.units, check_equivalent=True) - tools.check(data, self._input_info) + return data + def _convert_and_check(self, data): + (data, conv) = tools.to_units( + data, self._input_info.units, check_equivalent=True, report_conversion=True + ) + if conv is not None: + self.logger.profile( + "converted units from %s to %s (%d entries)", *conv, data.size + ) + tools.check(data, self._input_info) return data def ping(self): @@ -147,7 +159,7 @@ class Input(IInput, Loggable): dict delivered parameters """ - self.logger.debug("exchanging info") + self.logger.trace("exchanging info") with ErrorLogger(self.logger): if self._in_info_exchanged: @@ -234,7 +246,7 @@ class CallbackInput(Input): time : :class:`datetime <datetime.datetime>` Simulation time of the notification. """ - self.logger.debug("source changed") + self.logger.trace("source changed") if time is not None and not isinstance(time, datetime): with ErrorLogger(self.logger): raise ValueError("Time must be of type datetime") diff --git a/src/finam/sdk/output.py b/src/finam/sdk/output.py index bc1250e9665cdfc4a9965ed75f7c5c1df498bf4d..cb288815a0ff5f39c5b41aff1616a24401d0ef8e 100644 --- a/src/finam/sdk/output.py +++ b/src/finam/sdk/output.py @@ -116,7 +116,7 @@ class Output(IOutput, Loggable): target : :class:`.IInput` The target to add. """ - self.logger.debug("add target") + self.logger.trace("add target") if not isinstance(target, IInput): with ErrorLogger(self.logger): raise ValueError("Only IInput can added as target for IOutput") @@ -154,12 +154,12 @@ class Output(IOutput, Loggable): time : :class:`datetime <datetime.datetime>` Simulation time of the data set. """ - self.logger.debug("push data") - if not self.has_targets: - self.logger.debug("skipping push to unconnected output") + self.logger.trace("skipping push to unconnected output") return + self.logger.trace("push data") + with ErrorLogger(self.logger): _check_time(time, self.is_static) @@ -174,19 +174,23 @@ class Output(IOutput, Loggable): time = None with ErrorLogger(self.logger): - xdata = tools.prepare(data, self.info) + xdata, conv = tools.prepare(data, self.info, report_conversion=True) if len(self.data) > 0 and not isinstance(self.data[-1][1], str): d = self.data[-1][1] if np.may_share_memory(d.data, xdata.data): raise FinamDataError( "Received data that shares memory with previously received data." ) + if conv is not None: + self.logger.profile( + "converted units from %s to %s (%d entries)", *conv, xdata.size + ) xdata = self._pack(xdata) self.data.append((time, xdata)) self._time = time - self.logger.debug("data cache: %d", len(self.data)) + self.logger.trace("data cache: %d", len(self.data)) self.notify_targets(time) @@ -198,7 +202,7 @@ class Output(IOutput, Loggable): info : :class:`.Info` Delivered data info """ - self.logger.debug("push info") + self.logger.trace("push info") if not isinstance(info, Info): with ErrorLogger(self.logger): raise FinamMetaDataError("Metadata must be of type Info") @@ -212,7 +216,7 @@ class Output(IOutput, Loggable): time : :class:`datetime <datetime.datetime>` Simulation time of the simulation. """ - self.logger.debug("notify targets") + self.logger.trace("notify targets") with ErrorLogger(self.logger): _check_time(time, self.is_static) @@ -240,7 +244,7 @@ class Output(IOutput, Loggable): FinamNoDataError Raises the error if no data is available """ - self.logger.debug("get data") + self.logger.trace("get data") with ErrorLogger(self.logger): _check_time(time, self.is_static) @@ -264,7 +268,7 @@ class Output(IOutput, Loggable): self._clear_data(time, target) if len(self.data) < data_count: - self.logger.debug( + self.logger.trace( "reduced data cache: %d -> %d", data_count, len(self.data) ) @@ -278,7 +282,7 @@ class Output(IOutput, Loggable): fn = os.path.join( self.memory_location or "", f"{id(self)}-{self._mem_counter}.npy" ) - self.logger.debug( + self.logger.profile( "dumping data to file %s (total RAM %0.2f MB)", fn, self._total_mem / 1048576, @@ -288,14 +292,14 @@ class Output(IOutput, Loggable): return fn self._total_mem += data_size - self.logger.debug( + self.logger.trace( "keeping data in RAM (total RAM %0.2f MB)", self._total_mem / 1048576 ) return data def _unpack(self, where): if isinstance(where, str): - self.logger.debug("reading data from file %s", where) + self.logger.profile("reading data from file %s", where) data = np.load(where, allow_pickle=True) return tools.UNITS.Quantity(data, self.info.units) @@ -366,7 +370,7 @@ class Output(IOutput, Loggable): FinamNoDataError Raises the error if no info is available """ - self.logger.debug("get info") + self.logger.trace("get info") if self._output_info is None: raise FinamNoDataError("No data info available") @@ -427,7 +431,7 @@ class Output(IOutput, Loggable): :class:`.IOutput` The last element of the chain. """ - self.logger.debug("chain") + self.logger.trace("chain") self.add_target(other) other.set_source(self) return other @@ -499,7 +503,7 @@ class CallbackOutput(Output): FinamNoDataError Raises the error if no data is available """ - self.logger.debug("source changed") + self.logger.trace("get data") with ErrorLogger(self.logger): _check_time(time, False) @@ -515,13 +519,17 @@ class CallbackOutput(Output): raise FinamNoDataError(f"No data available in {self.name}") with ErrorLogger(self.logger): - xdata = tools.prepare(data, self.info) + xdata, conv = tools.prepare(data, self.info, report_conversion=True) if self.last_data is not None and np.may_share_memory( tools.get_magnitude(self.last_data), tools.get_magnitude(xdata) ): raise FinamDataError( "Received data that shares memory with previously received data." ) + if conv is not None: + self.logger.profile( + "converted units from %s to %s (%d entries)", *conv, xdata.size + ) self.last_data = xdata return xdata diff --git a/src/finam/tools/connect_helper.py b/src/finam/tools/connect_helper.py index 743011b8f712a8aa1c34838a6edfef48ce7e9e95..2a21a39084d06e980e37a0e9fa225205be50eac0 100644 --- a/src/finam/tools/connect_helper.py +++ b/src/finam/tools/connect_helper.py @@ -388,9 +388,9 @@ class ConnectHelper(Loggable): try: self.out_infos[name] = self.outputs[name].info any_done = True - self.logger.debug("Successfully pulled output info for %s", name) + self.logger.trace("Successfully pulled output info for %s", name) except FinamNoDataError: - self.logger.debug("Failed to pull output info for %s", name) + self.logger.trace("Failed to pull output info for %s", name) any_done += self._push(start_time) @@ -403,9 +403,9 @@ class ConnectHelper(Loggable): start_time or info.time ) any_done = True - self.logger.debug("Successfully pulled input data for %s", name) + self.logger.trace("Successfully pulled input data for %s", name) except FinamNoDataError: - self.logger.debug("Failed to pull input data for %s", name) + self.logger.trace("Failed to pull input data for %s", name) if ( all(v is not None for v in self.in_infos.values()) @@ -480,9 +480,9 @@ class ConnectHelper(Loggable): try: self.in_infos[name] = self.inputs[name].exchange_info() any_done = True - self.logger.debug("Successfully exchanged input info for %s", name) + self.logger.trace("Successfully exchanged input info for %s", name) except FinamNoDataError: - self.logger.debug("Failed to exchange input info for %s", name) + self.logger.trace("Failed to exchange input info for %s", name) for name, info in list(self._in_info_cache.items()): if self.in_infos[name] is None: @@ -493,9 +493,9 @@ class ConnectHelper(Loggable): ) any_done = True self._in_info_cache.pop(name) - self.logger.debug("Successfully exchanged input info for %s", name) + self.logger.trace("Successfully exchanged input info for %s", name) except FinamNoDataError: - self.logger.debug("Failed to exchange input info for %s", name) + self.logger.trace("Failed to exchange input info for %s", name) return any_done @@ -508,7 +508,7 @@ class ConnectHelper(Loggable): self.infos_pushed[name] = True any_done = True self._out_info_cache.pop(name) - self.logger.debug("Successfully pushed output info for %s", name) + self.logger.trace("Successfully pushed output info for %s", name) for name, data in list(self._out_data_cache.items()): if not self.data_pushed[name] and self.infos_pushed[name]: @@ -531,7 +531,7 @@ class ConnectHelper(Loggable): self.data_pushed[name] = True self._out_data_cache.pop(name) - self.logger.debug("Successfully pushed output data for %s", name) + self.logger.trace("Successfully pushed output data for %s", name) def _check_times(infos): diff --git a/src/finam/tools/log_helper.py b/src/finam/tools/log_helper.py index 9e8a5dd902e236474d04f6b33eacad8807bc8b6d..3aeaa83e7ae8326a2ccb454ef977672c8f2d1e3c 100644 --- a/src/finam/tools/log_helper.py +++ b/src/finam/tools/log_helper.py @@ -172,8 +172,6 @@ def add_logging_level(name, num, method=None): if not method: method = name.lower() - if hasattr(logging, name): - raise AttributeError(f"{name} already defined in logging module") if hasattr(logging, name): raise AttributeError(f"{name} already defined in logging module") if hasattr(logging.getLoggerClass(), name):