Skip to content
Snippets Groups Projects

Auto-transfer infos

Merged Martin Lange requested to merge auto-transfer-info into main
1 file
+ 43
3
Compare changes
  • Side-by-side
  • Inline
@@ -3,10 +3,15 @@ import logging
from finam.interfaces import ComponentStatus, Loggable
from ..data.tools import Info
from ..errors import FinamNoDataError
from ..tools.log_helper import ErrorLogger
class MissingInfoError(Exception):
"""Internal error type for handling missing infos for transfer rules"""
class InfoSource:
def __init__(self, name, *fields):
self.name = name
@@ -71,9 +76,6 @@ class ConnectHelper(Loggable):
self._outputs = outputs
self._cache = cache
self._in_info_rules = in_info_rules or {}
self._out_info_rules = in_info_rules or {}
with ErrorLogger(self.logger):
for name in pull_data or []:
if name not in self._inputs:
@@ -93,12 +95,34 @@ class ConnectHelper(Loggable):
name: False for name, out in self.outputs.items() if out.needs_push
}
self._in_info_rules = in_info_rules or {}
self._out_info_rules = out_info_rules or {}
self._check_info_rules()
self._in_info_cache = {}
self._out_info_cache = {}
self._out_data_cache = {}
def _apply_rules(self, rules):
info = Info(time=None, grid=None)
for rule in rules:
if isinstance(rule, FromInput):
in_info = self.in_infos[rule.name]
if in_info is None:
raise MissingInfoError()
for field in rule.fields:
info.__setattr__(field, in_info.__getattr__(field))
elif isinstance(rule, FromOutput):
out_info = self.out_infos[rule.name]
if out_info is None:
raise MissingInfoError()
for field in rule.fields:
info.__setattr__(field, out_info.__getattr__(field))
elif isinstance(rule, FromValue):
info.__setattr__(field, rule.value)
return info
def _check_info_rules(self):
for name, rules in self._in_info_rules.items():
if name not in self._inputs:
@@ -216,6 +240,22 @@ class ConnectHelper(Loggable):
push_infos = {k: v for k, v in push_infos.items() if self.out_infos[k] is None}
push_data = {k: v for k, v in push_data.items() if not self.data_pushed[k]}
for name, rules in self._in_info_rules.items():
if self.in_infos[name] is None and name not in self._in_info_cache:
try:
info = self._apply_rules(rules)
exchange_infos[name] = info
except MissingInfoError:
pass
for name, rules in self._out_info_rules.items():
if self.out_infos[name] is None and name not in self._out_info_cache:
try:
info = self._apply_rules(rules)
push_infos[name] = info
except MissingInfoError:
pass
if self._cache:
self._in_info_cache.update(exchange_infos)
self._out_info_cache.update(push_infos)
Loading