Skip to content
Snippets Groups Projects

Handle different start and end times of components

Merged Martin Lange requested to merge start-times into main
1 file
+ 25
9
Compare changes
  • Side-by-side
  • Inline
@@ -7,8 +7,8 @@ import xarray as xr
from finam.interfaces import ComponentStatus, Loggable
from ..data.tools import Info, strip_time
from ..errors import FinamNoDataError
from ..data.tools import Info, assign_time
from ..errors import FinamNoDataError, FinamTimeError
from ..tools.log_helper import ErrorLogger
@@ -415,6 +415,9 @@ class ConnectHelper(Loggable):
and all(v for v in self.infos_pushed.values())
and all(v for v in self.data_pushed.values())
):
with ErrorLogger(self.logger):
_check_times(self.out_infos)
return ComponentStatus.CONNECTED
if any_done:
@@ -520,16 +523,20 @@ class ConnectHelper(Loggable):
return any_done
def _push_data(self, name, data, time, info_time):
if info_time != time:
out = self.outputs[name]
if out.is_static:
out.push_data(data, None)
elif info_time != time:
if isinstance(data, xr.DataArray):
data_plain = strip_time(data)
self.outputs[name].push_data(data_plain, time)
self.outputs[name].push_data(copy.copy(data_plain), info_time)
data_1 = assign_time(data, time)
out.push_data(data_1, time)
data_2 = assign_time(data, info_time)
out.push_data(copy.copy(data_2), info_time)
else:
self.outputs[name].push_data(data, time)
self.outputs[name].push_data(copy.copy(data), info_time)
out.push_data(data, time)
out.push_data(copy.copy(data), info_time)
else:
self.outputs[name].push_data(data, info_time)
out.push_data(data, info_time)
self.data_pushed[name] = True
self._out_data_cache.pop(name)
@@ -537,6 +544,15 @@ class ConnectHelper(Loggable):
return True
def _check_times(infos):
t = None
for _, info in infos.items():
if t is None:
t = info.time
elif t != info.time:
raise FinamTimeError("Input infos have different starting times.")
def _transfer_fields(source_info, target_info, fields):
if len(fields) == 0:
target_info.time = source_info.time
Loading