Skip to content
Snippets Groups Projects

Outputs cache, clear and interpolate data

Merged Martin Lange requested to merge outputs-know-targets into main
4 files
+ 32
8
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 29
5
@@ -6,7 +6,7 @@ from datetime import datetime
from ..data import tools
from ..data.tools import Info
from ..errors import FinamMetaDataError, FinamNoDataError
from ..errors import FinamMetaDataError, FinamNoDataError, FinamTimeError
from ..interfaces import IInput, IOutput, Loggable
from ..tools.log_helper import ErrorLogger
@@ -16,7 +16,7 @@ class Output(IOutput, Loggable):
def __init__(self, name=None, info=None, **info_kwargs):
self.targets = []
self.data = None
self.data = []
self._output_info = None
self.base_logger_name = None
if name is None:
@@ -120,7 +120,7 @@ class Output(IOutput, Loggable):
raise FinamNoDataError("Can't push data before output info was exchanged.")
with ErrorLogger(self.logger):
self.data = tools.to_xarray(data, self.name, self.info, time)
self.data.append((time, tools.to_xarray(data, self.name, self.info, time)))
self.notify_targets(time)
def push_info(self, info):
@@ -176,10 +176,34 @@ class Output(IOutput, Loggable):
raise FinamNoDataError(f"No data info available in {self.name}")
if self._out_infos_exchanged < len(self._connected_inputs):
raise FinamNoDataError(f"Data info was not yet exchanged in {self.name}")
if self.data is None:
if len(self.data) == 0:
raise FinamNoDataError(f"No data available in {self.name}")
return self.data
return self._interpolate(time)
def _interpolate(self, time):
if time < self.data[0][0] or time > self.data[-1][0]:
raise FinamTimeError(
f"Requested time {time} out of range [{self.data[0][0]}, {self.data[-1][0]}]"
)
for i, (t, data) in enumerate(self.data):
if time > t:
continue
if time == t:
return data
t_prev, data_prev = self.data[i - 1]
diff = t - t_prev
t_half = t_prev + diff
if time < t_half:
return data_prev
return data
raise FinamTimeError(
f"Time interpolation failed. This should not happen and is probably a bug. "
f"Time is {time}."
)
def get_info(self, info):
"""Exchange and get the output's data info.
Loading