Skip to content
Snippets Groups Projects
Commit 408a1864 authored by Martin Lange's avatar Martin Lange
Browse files

fix plint warnings, re-structure transfer code

parent 731a23bb
No related branches found
No related tags found
1 merge request!154Auto-transfer infos
"""Iterative connection helpers."""
import copy
import logging
from finam.interfaces import ComponentStatus, Loggable
......@@ -13,22 +14,30 @@ class MissingInfoError(Exception):
class InfoSource:
"""Base class for info transfer rules from inputs or outputs"""
def __init__(self, name, *fields):
self.name = name
self.fields = fields
self.fields = list(*fields) or []
class FromInput(InfoSource):
"""Info transfer rule from an input"""
def __init__(self, name, *fields):
super().__init__(name, fields)
class FromOutput(InfoSource):
"""Info transfer rule from an output"""
def __init__(self, name, *fields):
super().__init__(name, fields)
class FromValue:
"""Info transfer rule from a given value"""
def __init__(self, field, value):
self.field = field
self.value = value
......@@ -110,16 +119,19 @@ class ConnectHelper(Loggable):
in_info = self.in_infos[rule.name]
if in_info is None:
raise MissingInfoError()
for field in rule.fields:
info.__setattr__(field, in_info.__getattr__(field))
_transfer_fields(in_info, info, rule.fields)
elif isinstance(rule, FromOutput):
out_info = self.out_infos[rule.name]
if out_info is None:
raise MissingInfoError()
for field in rule.fields:
info.__setattr__(field, out_info.__getattr__(field))
_transfer_fields(out_info, info, rule.fields)
elif isinstance(rule, FromValue):
info.__setattr__(field, rule.value)
if rule.field == "time":
info.time = rule.value
elif rule.field == "grid":
info.grid = rule.value
else:
info.meta[rule.field] = rule.value
return info
......@@ -145,7 +157,7 @@ class ConnectHelper(Loggable):
raise ValueError(
f"No input named '{rule.name}' to use in info transfer rule."
)
if isinstance(rule, FromOutput) and rule.name not in self._inputs:
if isinstance(rule, FromOutput) and rule.name not in self._outputs:
raise ValueError(
f"No output named '{rule.name}' to use in info transfer rule."
)
......@@ -240,21 +252,9 @@ class ConnectHelper(Loggable):
push_infos = {k: v for k, v in push_infos.items() if self.out_infos[k] is None}
push_data = {k: v for k, v in push_data.items() if not self.data_pushed[k]}
for name, rules in self._in_info_rules.items():
if self.in_infos[name] is None and name not in self._in_info_cache:
try:
info = self._apply_rules(rules)
exchange_infos[name] = info
except MissingInfoError:
pass
for name, rules in self._out_info_rules.items():
if self.out_infos[name] is None and name not in self._out_info_cache:
try:
info = self._apply_rules(rules)
push_infos[name] = info
except MissingInfoError:
pass
# Try to generate infos from transfer rules
exchange_infos.update(self._apply_in_info_rules())
push_infos.update(self._apply_out_info_rules())
if self._cache:
self._in_info_cache.update(exchange_infos)
......@@ -305,6 +305,28 @@ class ConnectHelper(Loggable):
return ComponentStatus.CONNECTING_IDLE
def _apply_in_info_rules(self):
exchange_infos = {}
for name, rules in self._in_info_rules.items():
if self.in_infos[name] is None and name not in self._in_info_cache:
try:
info = self._apply_rules(rules)
exchange_infos[name] = info
except MissingInfoError:
pass
return exchange_infos
def _apply_out_info_rules(self):
push_infos = {}
for name, rules in self._out_info_rules.items():
if not self.infos_pushed[name] and name not in self._out_info_cache:
try:
info = self._apply_rules(rules)
push_infos[name] = info
except MissingInfoError:
pass
return push_infos
def _check_names(self, exchange_infos, push_infos, push_data):
for name in exchange_infos:
if name not in self._inputs:
......@@ -371,3 +393,18 @@ class ConnectHelper(Loggable):
self.logger.debug("Failed to push output data for %s", name)
return any_done
def _transfer_fields(source_info, target_info, fields):
if len(fields) == 0:
target_info.time = source_info.time
target_info.grid = source_info.grid
target_info.meta = copy.copy(source_info.meta)
else:
for field in fields:
if field == "time":
target_info.time = source_info.time
elif field == "grid":
target_info.grid = source_info.grid
else:
target_info.meta[field] = source_info.meta[field]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment