Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • FINAM/finam
  • afid.nur-kholis/finam
2 results
Show changes
Commits on Source (22)
......@@ -189,6 +189,10 @@ This is done in :meth:`.TimeComponent._connect`.
After this connection phase, models can validate their state in :meth:`.TimeComponent._validate`. We do nothing there.
.. note::
It is not strictly required to implement `_validate` but it is highly encouraged to do so.
.. code-block:: Python
# imports...
......@@ -315,6 +319,10 @@ In method :meth:`.TimeComponent._finalize`, the component can do any cleanup req
We do nothing special here.
.. note::
It is not strictly required to implement `_finalize` but it is highly encouraged to do so.
.. code-block:: Python
# imports...
......
......@@ -84,7 +84,7 @@ This method must be called at the end of :meth:`.Component._initialize`, after a
If the component has no dependencies in this phase, if can be simply called without arguments.
For components with dependencies, they can be specified like this:
For components with data dependencies, they can be specified like this:
.. code-block:: Python
......@@ -94,7 +94,10 @@ For components with dependencies, they can be specified like this:
Where strings are the names of inputs that data pull is required for.
For more on filling incomplete metadata, see section `Metadata from source or target`_.
For how to retrieve metadata fom connected components, see section `Metadata from source or target`_.
For details on how to automatically transfer metadata between coupling slots,
see the API docs for :meth:`.Component.create_connector`.
Method :meth:`try_connect() <.Component.try_connect()>`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
......
......@@ -4,7 +4,7 @@ from pathlib import Path
import numpy as np
from pyevtk.hl import imageToVTK
from ..tools import get_enum_value
from ..tools.enum_helper import get_enum_value
from .grid_tools import (
CellType,
Grid,
......
......@@ -17,7 +17,7 @@ import pint
# isort: on
from ..errors import FinamDataError, FinamMetaDataError
from .grid_spec import NoGrid
from . import grid_spec
from .grid_tools import Grid, GridBase
# set default format to cf-convention for pint.dequantify
......@@ -58,7 +58,7 @@ def _gen_dims(ndim, info, time=None):
"""
# create correct dims (time always first)
dims = ["time"] if time else []
if isinstance(info.grid, NoGrid):
if isinstance(info.grid, grid_spec.NoGrid):
# xarray has dim_0, dim_1 ... as default names
dims += [f"dim_{i}" for i in range(ndim)]
else:
......@@ -124,7 +124,7 @@ def to_xarray(data, name, info, time=None, no_time_check=False):
# reshape arrays
data = data.reshape(info.grid.data_shape, order=info.grid.order)
elif isinstance(info.grid, NoGrid):
elif isinstance(info.grid, grid_spec.NoGrid):
if len(data.shape) != info.grid.dim:
raise FinamDataError(
f"to_xarray: number of dimensions in data doesn't match expected number. "
......@@ -451,7 +451,7 @@ def _check_shape(xdata, grid, with_time):
f"check: given data has wrong shape. "
f"Got {in_shape}, expected {grid.data_shape}"
)
if isinstance(grid, NoGrid) and len(in_shape) != grid.dim:
if isinstance(grid, grid_spec.NoGrid) and len(in_shape) != grid.dim:
raise FinamDataError(
f"check: given data has wrong number of dimensions. "
f"Got {len(in_shape)}, expected {grid.dim}"
......
......@@ -93,7 +93,7 @@ class CallbackComponent(TimeComponent):
def _connect(self):
push_data = {}
if not self._data_generated:
if all((data is not None for name, data in self.connector.in_data.items())):
if self.connector.all_data_pulled:
push_data = self._callback(self.connector.in_data, self.time)
self._data_generated = True
......
......@@ -3,6 +3,7 @@
from ..data.tools import strip_data
from ..errors import FinamMetaDataError
from ..sdk import TimeComponent
from ..tools.connect_helper import FromInput, FromOutput
from ..tools.log_helper import ErrorLogger
......@@ -75,9 +76,6 @@ class TimeTrigger(TimeComponent):
self._ini_in_info = in_info
self._ini_out_info = out_info
self._in_info = None
self._out_info = None
self._start = start
if self._start is not None:
self.time = self._start
......@@ -106,53 +104,33 @@ class TimeTrigger(TimeComponent):
if self._ini_out_info is not None:
self._ini_out_info.time = self._start
in_info_rules = {}
out_info_rules = {}
if self._start is None:
if self._start_from_input:
self.inputs.add(name="In", info=self._ini_in_info)
self.outputs.add(name="Out")
out_info_rules["Out"] = [FromInput("In")]
else:
self.inputs.add(name="In")
self.outputs.add(name="Out", info=self._ini_out_info)
in_info_rules["In"] = [FromOutput("Out")]
else:
self.inputs.add(name="In", info=self._ini_in_info)
self.outputs.add(name="Out", info=self._ini_out_info)
self.create_connector(pull_data=["In"])
if self._ini_out_info is None:
out_info_rules["Out"] = [FromInput("In")]
if self._ini_in_info is None:
in_info_rules["In"] = [FromOutput("Out")]
self.create_connector(
pull_data=["In"],
in_info_rules=in_info_rules,
out_info_rules=out_info_rules,
)
def _connect(self):
in_infos = {}
out_infos = {}
if self._ini_out_info is None or (
self._start is None and self._start_from_input
):
in_info = self.connector.in_infos["In"]
if in_info is not None:
self._in_info = in_info
if self._start is None:
self.time = in_info.time
if self._ini_out_info is None:
self._out_info = in_info
else:
self._ini_out_info.time = in_info.time
self._out_info = self._ini_out_info
out_infos["Out"] = self._out_info
if self._ini_in_info is None or (
self._start is None and not self._start_from_input
):
out_info = self.connector.out_infos["Out"]
if out_info is not None:
self._out_info = out_info
if self._start is None:
self.time = out_info.time
if self._ini_in_info is None:
self._in_info = out_info
else:
self._ini_in_info.time = out_info.time
self._in_info = self._ini_in_info
in_infos["In"] = self._in_info
out_data = {}
if (
not self.connector.data_pushed["Out"]
......@@ -160,9 +138,11 @@ class TimeTrigger(TimeComponent):
):
out_data["Out"] = self.connector.in_data["In"]
self.try_connect(
exchange_infos=in_infos, push_infos=out_infos, push_data=out_data
)
self.try_connect(push_data=out_data)
in_info = self.connector.in_infos["In"]
if in_info is not None:
self.time = in_info.time
def _validate(self):
pass
......
......@@ -80,7 +80,7 @@ class WeightedSum(Component):
# just to check for all inputs equal
_push_infos = self._check_infos()
if all((data is not None for _name, data in self.connector.in_data.items())):
if self.connector.all_data_pulled:
self._in_data = self.connector.in_data
def _check_infos(self):
......
......@@ -116,11 +116,9 @@ class Component(IComponent, Loggable, ABC):
def _validate(self):
"""Validate the correctness of the component's settings and coupling.
Components must overwrite this method.
Components should overwrite this method.
"""
raise NotImplementedError(
f"Method `_validate` must be implemented by all components, but implementation is missing in {self.name}."
)
self.logger.debug("Method `_validate` not implemented by user.")
@final
def update(self):
......@@ -163,11 +161,9 @@ class Component(IComponent, Loggable, ABC):
def _finalize(self):
"""Finalize and clean up the component.
Components must overwrite this method.
Components should overwrite this method.
"""
raise NotImplementedError(
f"Method `_finalize` must be implemented by all components, but implementation is missing in {self.name}."
)
self.logger.debug("Method `_finalize` not implemented by user.")
@property
def inputs(self):
......@@ -209,20 +205,114 @@ class Component(IComponent, Loggable, ABC):
@property
def connector(self):
"""The component's ConnectHelper"""
return self._connector
"""The component's :class:`.tools.ConnectHelper`.
def create_connector(self, pull_data=None, cache=True):
See also :meth:`.create_connector` and :meth:`.try_connect`.
"""
Create the component's ConnectHelper
return self._connector
def create_connector(
self, pull_data=None, in_info_rules=None, out_info_rules=None, cache=True
):
"""Initialize the component's :class:`.tools.ConnectHelper`.
See also :meth:`.try_connect`, :attr:`.connector` and :class:`.ConnectHelper` for details.
Parameters
----------
pull_data : arraylike
Names of the inputs that are to be pulled.
in_info_rules : dict
Info transfer rules for inputs.See the examples for details.
See also :class:`.tools.FromInput`, :class:`.tools.FromOutput` and :class:`.tools.FromValue`.
out_info_rules : dict
Info transfer rules for outputs. See the examples for details.
See also :class:`.tools.FromInput`, :class:`.tools.FromOutput` and :class:`.tools.FromValue`.
cache : bool
Whether data and :class:`.Info` objects passed via :meth:`try_connect() <.Component.try_connect>`
are cached for later calls. Default ``True``.
Examples
--------
The following examples show the usage of this method in :meth:`._initialize`.
.. testsetup:: *
import finam as fm
import datetime as dt
self = fm.modules.CallbackComponent(
inputs={},
outputs={},
callback=lambda inp, _t: {},
start=dt.datetime(2000, 1, 1),
step=dt.timedelta(days=1),
)
Simple usage if no input data or any metadata from connected components is required:
.. testcode:: create-connector-simple
self.inputs.add(name="In", time=self.time, grid=fm.NoGrid())
self.outputs.add(name="Out", time=self.time, grid=fm.NoGrid())
self.create_connector()
To pull specific inputs, use ``pull_data`` like this:
.. testcode:: create-connector-pull
self.inputs.add(name="In1", time=self.time, grid=fm.NoGrid())
self.inputs.add(name="In2", time=self.time, grid=fm.NoGrid())
self.create_connector(pull_data=["In1", "In2"])
With the ``in_info_rules`` and ``out_info_rules``, metadata can be transferred between coupling slots.
Here, the metadata for an output is taken from an input:
.. testcode:: create-connector-in-to-out
self.inputs.add(name="In", time=self.time, grid=None, units=None)
self.outputs.add(name="Out")
self.create_connector(
out_info_rules={
"Out": [
fm.tools.FromInput("In")
]
}
)
The :class:`.Info` object for output ``Out`` will be created and pushed automatically in :meth:`.try_connect`
as soon as the metadata for ``In`` becomes available.
Here, the metadata of an output is composed from the metadata of two inputs and a user-defined value:
.. testcode:: create-connector-in-to-out-multi
self.inputs.add(name="In1", time=self.time, grid=None, units=None)
self.inputs.add(name="In2", time=self.time, grid=None, units=None)
self.outputs.add(name="Out")
self.create_connector(
out_info_rules={
"Out": [
fm.tools.FromInput("In1", "time", "grid"),
fm.tools.FromInput("In2", "units"),
fm.tools.FromValue("source", "FINAM"),
]
}
)
The :class:`.Info` object for output ``Out`` would be automatically composed in :meth:`.try_connect`
as soon as the infos of both inputs become available.
``time`` and ``grid`` would be taken from ``In1``, ``units`` from ``In2``,
and ``source`` would be set to ``"finam"``.
Rules are evaluated in the given order. Later rules can overwrite attributes set by earlier rules.
"""
self.logger.debug("create connector")
self._connector = ConnectHelper(
......@@ -230,6 +320,8 @@ class Component(IComponent, Loggable, ABC):
self.inputs,
self.outputs,
pull_data=pull_data,
in_info_rules=in_info_rules,
out_info_rules=out_info_rules,
cache=cache,
)
self.inputs.frozen = True
......@@ -247,17 +339,17 @@ class Component(IComponent, Loggable, ABC):
Sets the component's :attr:`.status` according to success of exchange.
See :class:`.ConnectHelper` for more details.
See also :meth:`.create_connector`, :attr:`.connector` and :class:`.ConnectHelper` for details.
Parameters
----------
time : datetime.datatime
time for data pulls
exchange_infos : dict
exchange_infos : dict of [str, Info]
currently or newly available input data infos by input name
push_infos : dict
push_infos : dict of [str, Info]
currently or newly available output data infos by output name
push_data : dict
push_data : dict of [str, array-like]
currently or newly available output data by output name
"""
self.logger.debug("try connect")
......@@ -421,6 +513,11 @@ class IOList(collections.abc.Mapping):
raise ValueError(f"IO.add: {self.name} '{io.name}' already exists.")
self._dict[io.name] = io
@property
def names(self):
"""list: all IO names in this list."""
return list(self)
def set_logger(self, module):
"""
Set the logger in the items of the IOList.
......
......@@ -37,9 +37,11 @@ Connect helper
:toctree: generated
ConnectHelper
FromInput
FromOutput
FromValue
"""
from . import connect_helper
from .connect_helper import ConnectHelper
from .connect_helper import ConnectHelper, FromInput, FromOutput, FromValue
from .cwd_helper import execute_in_cwd, set_directory
from .enum_helper import get_enum_value
from .log_helper import (
......@@ -50,8 +52,7 @@ from .log_helper import (
is_loggable,
)
__all__ = ["connect_helper"]
__all__ += ["execute_in_cwd", "set_directory"]
__all__ = ["execute_in_cwd", "set_directory"]
__all__ += ["get_enum_value"]
__all__ += [
"ErrorLogger",
......@@ -60,3 +61,4 @@ __all__ += [
"LogStdOutStdErr",
"LogCStdOutStdErr",
]
__all__ += ["ConnectHelper", "FromInput", "FromOutput", "FromValue"]
"""Iterative connection helpers."""
import copy
import logging
from abc import ABC
from finam.interfaces import ComponentStatus, Loggable
from ..data.tools import Info
from ..errors import FinamNoDataError
from ..tools.log_helper import ErrorLogger
class MissingInfoError(Exception):
"""Internal error type for handling missing infos for transfer rules"""
class InfoSource(ABC):
"""Base class for info transfer rules from inputs or outputs"""
def __init__(self, name, *fields):
self.name = name
self.fields = list(*fields) or []
class FromInput(InfoSource):
"""Info transfer rule from an input.
See :meth:`.Component.create_connector` for usage details.
Parameters
----------
name : str
Name of the input to take info from
*fields : str, optional
Info fields to take from the input.
Takes all fields if this is empty.
"""
def __init__(self, name, *fields):
super().__init__(name, fields)
class FromOutput(InfoSource):
"""Info transfer rule from an output.
See :meth:`.Component.create_connector` for usage details.
Parameters
----------
name : str
Name of the output to take info from
*fields : str, optional
Info fields to take from the output.
Takes all fields if this is empty.
"""
def __init__(self, name, *fields):
super().__init__(name, fields)
class FromValue:
"""
Info transfer rule from a given value
Parameters
----------
field : str
Field to set.
value : any
Value to set.
"""
def __init__(self, field, value):
self.field = field
self.value = value
class ConnectHelper(Loggable):
"""Helper for iterative connect.
Warning:
This class is not intended for direct use!
Use :meth:`.Components.create_connector` and :meth:`.Components.try_connect` instead.
Use :meth:`.Component.create_connector` and :meth:`.Component.try_connect` instead.
Parameters
----------
......@@ -22,6 +90,10 @@ class ConnectHelper(Loggable):
All inputs of the component.
outputs : dict
All outputs of the component.
in_info_rules : dict
Info transfer rules for inputs.
out_info_rules : dict
Info transfer rules for outputs.
pull_data : arraylike
Names of the inputs that are to be pulled.
cache : bool
......@@ -35,6 +107,8 @@ class ConnectHelper(Loggable):
inputs,
outputs,
pull_data=None,
in_info_rules=None,
out_info_rules=None,
cache=True,
):
......@@ -46,7 +120,7 @@ class ConnectHelper(Loggable):
with ErrorLogger(self.logger):
for name in pull_data or []:
if name not in self._inputs:
raise ValueError(
raise KeyError(
f"No input named '{name}' available to get info for."
)
......@@ -62,10 +136,105 @@ class ConnectHelper(Loggable):
name: False for name, out in self.outputs.items() if out.needs_push
}
self._in_info_rules = in_info_rules or {}
self._out_info_rules = out_info_rules or {}
with ErrorLogger(self.logger):
self._check_info_rules()
self._in_info_cache = {}
self._out_info_cache = {}
self._out_data_cache = {}
def add_in_info_rule(self, in_name, rule):
"""
Add an input info rule.
Parameters
----------
in_name : str
Name of the input to add an info rule to.
rule : FromOutput or FromInput or FromValue
Rule to add.
"""
if in_name in self._in_info_rules:
self._in_info_rules[in_name].append(rule)
else:
self._in_info_rules[in_name] = [rule]
with ErrorLogger(self.logger):
self._check_info_rules()
def add_out_info_rule(self, out_name, rule):
"""
Add an output info rule.
Parameters
----------
out_name : str
Name of the output to add an info rule to.
rule : FromInput or FromOutput or FromValue
Rule to add.
"""
if out_name in self._out_info_rules:
self._out_info_rules[out_name].append(rule)
else:
self._out_info_rules[out_name] = [rule]
with ErrorLogger(self.logger):
self._check_info_rules()
def _apply_rules(self, rules):
info = Info(time=None, grid=None)
for rule in rules:
if isinstance(rule, FromInput):
in_info = self.in_infos[rule.name]
if in_info is None:
raise MissingInfoError()
_transfer_fields(in_info, info, rule.fields)
elif isinstance(rule, FromOutput):
out_info = self.out_infos[rule.name]
if out_info is None:
raise MissingInfoError()
_transfer_fields(out_info, info, rule.fields)
elif isinstance(rule, FromValue):
if rule.field == "time":
info.time = rule.value
elif rule.field == "grid":
info.grid = rule.value
else:
info.meta[rule.field] = rule.value
return info
def _check_info_rules(self):
for name, rules in self._in_info_rules.items():
if name not in self._inputs:
raise KeyError(f"No input named '{name}' to apply info transfer rule.")
for rule in rules:
self._check_rule(rule)
for name, rules in self._out_info_rules.items():
if name not in self._outputs:
raise KeyError(f"No output named '{name}' to apply info transfer rule.")
for rule in rules:
self._check_rule(rule)
def _check_rule(self, rule):
if isinstance(rule, FromInput):
if rule.name not in self._inputs:
raise KeyError(
f"No input named '{rule.name}' to use in info transfer rule."
)
elif isinstance(rule, FromOutput):
if rule.name not in self._outputs:
raise KeyError(
f"No output named '{rule.name}' to use in info transfer rule."
)
elif not isinstance(rule, FromValue):
raise TypeError(
f"Rules must be one of the types FromInput, FromOutput or FromValue. "
f"Got '{rule.__class__.__name__}'."
)
@property
def logger(self):
"""Logger for this component."""
......@@ -108,6 +277,11 @@ class ConnectHelper(Loggable):
"""dict: The pulled input data so far. May contain None values."""
return self._pulled_data
@property
def all_data_pulled(self):
"""bool: True if all expected data is pulled."""
return all(data is not None for data in self.in_data.values())
@property
def infos_pushed(self):
"""dict: If an info was pushed for outputs so far."""
......@@ -149,6 +323,7 @@ class ConnectHelper(Loggable):
with ErrorLogger(self.logger):
self._check_names(exchange_infos, push_infos, push_data)
self._check_in_rules(exchange_infos, push_infos)
exchange_infos = {
k: v for k, v in exchange_infos.items() if self.in_infos[k] is None
......@@ -156,6 +331,11 @@ class ConnectHelper(Loggable):
push_infos = {k: v for k, v in push_infos.items() if self.out_infos[k] is None}
push_data = {k: v for k, v in push_data.items() if not self.data_pushed[k]}
# Try to generate infos from transfer rules
with ErrorLogger(self.logger):
exchange_infos.update(self._apply_in_info_rules())
push_infos.update(self._apply_out_info_rules())
if self._cache:
self._in_info_cache.update(exchange_infos)
self._out_info_cache.update(push_infos)
......@@ -205,18 +385,54 @@ class ConnectHelper(Loggable):
return ComponentStatus.CONNECTING_IDLE
def _apply_in_info_rules(self):
exchange_infos = {}
for name, rules in self._in_info_rules.items():
if self.in_infos[name] is None and name not in self._in_info_cache:
try:
info = self._apply_rules(rules)
exchange_infos[name] = info
except MissingInfoError:
pass
return exchange_infos
def _apply_out_info_rules(self):
push_infos = {}
for name, rules in self._out_info_rules.items():
if not self.infos_pushed[name] and name not in self._out_info_cache:
try:
info = self._apply_rules(rules)
push_infos[name] = info
except MissingInfoError:
pass
return push_infos
def _check_names(self, exchange_infos, push_infos, push_data):
for name in exchange_infos:
if name not in self._inputs:
raise ValueError(
raise KeyError(
f"No input named '{name}' available to exchange info for."
)
for name in push_infos:
if name not in self._outputs:
raise ValueError(f"No output named '{name}' available to push info.")
raise KeyError(f"No output named '{name}' available to push info.")
for name in push_data:
if name not in self._outputs:
raise ValueError(f"No output named '{name}' available to push data.")
raise KeyError(f"No output named '{name}' available to push data.")
def _check_in_rules(self, exchange_infos, push_infos):
for name in exchange_infos:
if name in self._in_info_rules:
raise ValueError(
f"There are info transfer rules given for input `{name}`. "
f"Can't provide the info directly."
)
for name in push_infos:
if name in self._out_info_rules:
raise ValueError(
f"There are info transfer rules given for output `{name}`. "
f"Can't provide the info directly."
)
def _exchange_in_infos(self):
any_done = False
......@@ -271,3 +487,18 @@ class ConnectHelper(Loggable):
self.logger.debug("Failed to push output data for %s", name)
return any_done
def _transfer_fields(source_info, target_info, fields):
if len(fields) == 0:
target_info.time = source_info.time
target_info.grid = source_info.grid
target_info.meta = copy.copy(source_info.meta)
else:
for field in fields:
if field == "time":
target_info.time = source_info.time
elif field == "grid":
target_info.grid = source_info.grid
else:
target_info.meta[field] = source_info.meta[field]
"""IOList for Components."""
"""Enum conversion helper."""
from enum import Enum
......
"""
Unit tests for the sdk implementations.
"""
import logging
import unittest
from datetime import datetime
......@@ -466,20 +467,32 @@ class TestNotImplemented(unittest.TestCase):
with self.assertRaises(NotImplementedError):
comp._connect()
with self.assertRaises(NotImplementedError):
# check that the debug log for not implementing _validate is there
with self.assertLogs(level=logging.DEBUG) as captured:
comp.validate()
with self.assertRaises(NotImplementedError):
self.assertEqual(len(captured.records), 2)
self.assertEqual(captured.records[0].levelno, logging.DEBUG)
self.assertEqual(captured.records[1].levelno, logging.DEBUG)
with self.assertLogs(level=logging.DEBUG) as captured:
comp._validate()
self.assertEqual(len(captured.records), 1)
self.assertEqual(captured.records[0].levelno, logging.DEBUG)
with self.assertRaises(NotImplementedError):
comp.update()
with self.assertRaises(NotImplementedError):
comp._update()
with self.assertRaises(NotImplementedError):
# check that the debug log for not implementing _finalize is there
with self.assertLogs(level=logging.DEBUG) as captured:
comp.finalize()
with self.assertRaises(NotImplementedError):
self.assertEqual(len(captured.records), 2)
self.assertEqual(captured.records[0].levelno, logging.DEBUG)
self.assertEqual(captured.records[1].levelno, logging.DEBUG)
with self.assertLogs(level=logging.DEBUG) as captured:
comp._finalize()
self.assertEqual(len(captured.records), 1)
self.assertEqual(captured.records[0].levelno, logging.DEBUG)
def test_adapter_not_implemented(self):
adapter = NotImplAdapter()
......
import logging
import unittest
from datetime import datetime
from finam import ComponentStatus, Info, Input, NoGrid, Output
import numpy as np
from finam import ComponentStatus, Info, Input, NoGrid, Output, UniformGrid
from finam.sdk.component import IOList
from finam.tools.connect_helper import ConnectHelper
from finam.tools.connect_helper import ConnectHelper, FromInput, FromOutput, FromValue
class TestConnectHelper(unittest.TestCase):
......@@ -118,16 +119,16 @@ class TestConnectHelper(unittest.TestCase):
outputs.add(name="Out1")
outputs.add(name="Out2")
with self.assertRaises(ValueError):
with self.assertRaises(KeyError):
_connector = ConnectHelper("TestLogger", inputs, outputs, pull_data=["In3"])
connector = ConnectHelper("TestLogger", inputs, outputs)
with self.assertRaises(ValueError):
with self.assertRaises(KeyError):
connector.connect(time=None, exchange_infos={"In3": Info(time, NoGrid())})
with self.assertRaises(ValueError):
with self.assertRaises(KeyError):
connector.connect(time=None, push_infos={"Out3": Info(time, NoGrid())})
with self.assertRaises(ValueError):
with self.assertRaises(KeyError):
connector.connect(time=None, push_data={"Out3": 0.0})
def test_connect_caching(self):
......@@ -184,3 +185,202 @@ class TestConnectHelper(unittest.TestCase):
status = connector.connect(time)
self.assertEqual(status, ComponentStatus.CONNECTED)
def test_connect_transfer_from_input(self):
time = datetime(2020, 10, 6)
inputs = IOList(None, "INPUT")
inputs.add(name="In1", time=None, grid=None)
inputs.add(name="In2", time=None, grid=None, units=None)
outputs = IOList(None, "OUTPUT")
outputs.add(name="Out1")
sources = [Output("so1"), Output("so1")]
sink = Input("si1")
sources[0] >> inputs["In1"]
sources[1] >> inputs["In2"]
outputs["Out1"] >> sink
inputs["In1"].ping()
inputs["In2"].ping()
sink.ping()
connector: ConnectHelper = ConnectHelper(
"TestLogger",
inputs,
outputs,
pull_data=list(inputs.keys()),
out_info_rules={
"Out1": [
FromInput("In1", "time", "grid"),
FromInput("In2", "units"),
FromValue("test_prop", 1),
FromValue("time", time),
]
},
cache=True,
)
connector.connect(
time=None,
push_data={"Out1": np.zeros((9, 9))},
)
sources[0].push_info(Info(time=time, grid=UniformGrid((10, 10))))
sources[1].push_info(Info(time=time, grid=NoGrid(), units="m"))
connector.connect(time=None)
self.assertEqual(
connector.in_infos,
{
"In1": Info(time=time, grid=UniformGrid((10, 10))),
"In2": Info(time=time, grid=NoGrid(), units="m"),
},
)
self.assertEqual(connector.out_infos, {"Out1": None})
self.assertEqual(connector.infos_pushed, {"Out1": False})
connector.connect(time=None)
self.assertEqual(connector.infos_pushed, {"Out1": True})
self.assertEqual(connector.out_infos, {"Out1": None})
sink.exchange_info(Info(time=time, grid=None, units=None))
connector.connect(time=None)
self.assertEqual(
connector.out_infos,
{
"Out1": Info(
time=time, grid=UniformGrid((10, 10)), units="m", test_prop=1
)
},
)
def test_connect_transfer_from_output(self):
time = datetime(2020, 10, 6)
inputs = IOList(None, "INPUT")
inputs.add(name="In1")
inputs.add(name="In2")
outputs = IOList(None, "OUTPUT")
outputs.add(name="Out1", time=None, grid=None, units=None)
sources = [Output("so1"), Output("so1")]
sink = Input("si1")
sources[0] >> inputs["In1"]
sources[1] >> inputs["In2"]
outputs["Out1"] >> sink
inputs["In1"].ping()
inputs["In2"].ping()
sink.ping()
connector: ConnectHelper = ConnectHelper(
"TestLogger",
inputs,
outputs,
pull_data=inputs.names,
cache=True,
)
connector.add_in_info_rule("In1", FromOutput("Out1", "time", "units"))
connector.add_in_info_rule("In2", FromOutput("Out1", "time", "units"))
connector.add_in_info_rule("In2", FromValue("grid", NoGrid()))
connector.add_in_info_rule("In2", FromValue("time", time))
connector.connect(
time=None,
push_data={"Out1": np.zeros((9, 9))},
)
sink.exchange_info(Info(time=time, grid=UniformGrid((10, 10)), units="m"))
connector.connect(time=None)
sources[0].push_info(Info(time=time, grid=UniformGrid((10, 10))))
sources[1].push_info(Info(time=time, grid=NoGrid(), units="m"))
connector.connect(time=None)
self.assertEqual(
connector.in_infos,
{
"In1": Info(time=time, grid=UniformGrid((10, 10))),
"In2": Info(time=time, grid=NoGrid(), units="m"),
},
)
self.assertEqual(
connector.out_infos,
{"Out1": Info(time=time, grid=UniformGrid((10, 10)), units="m")},
)
def test_connect_constructor_fail(self):
inputs = IOList(None, "INPUT")
inputs.add(name="In1")
outputs = IOList(None, "OUTPUT")
outputs.add(name="Out1")
with self.assertRaises(KeyError):
_connector: ConnectHelper = ConnectHelper(
"TestLogger",
inputs,
outputs,
in_info_rules={"InX": []},
)
with self.assertRaises(KeyError):
_connector: ConnectHelper = ConnectHelper(
"TestLogger",
inputs,
outputs,
out_info_rules={"OutX": []},
)
with self.assertRaises(KeyError):
_connector: ConnectHelper = ConnectHelper(
"TestLogger",
inputs,
outputs,
in_info_rules={"In1": [FromInput("InX", "grid")]},
)
with self.assertRaises(KeyError):
_connector: ConnectHelper = ConnectHelper(
"TestLogger",
inputs,
outputs,
in_info_rules={"In1": [FromOutput("OutX", "grid")]},
)
with self.assertRaises(TypeError):
_connector: ConnectHelper = ConnectHelper(
"TestLogger",
inputs,
outputs,
in_info_rules={"In1": [0]},
)
def test_connect_rules_fail(self):
inputs = IOList(None, "INPUT")
inputs.add(name="In1")
outputs = IOList(None, "OUTPUT")
outputs.add(name="Out1")
connector: ConnectHelper = ConnectHelper(
"TestLogger",
inputs,
outputs,
)
connector.add_in_info_rule("In1", FromOutput("Out1"))
connector.add_out_info_rule("Out1", FromInput("In1"))
with self.assertRaises(ValueError):
connector.connect(
time=None, exchange_infos={"In1": Info(time=None, grid=NoGrid())}
)
with self.assertRaises(ValueError):
connector.connect(
time=None, push_infos={"Out1": Info(time=None, grid=NoGrid())}
)