Skip to content
Snippets Groups Projects
Commit 0512e45c authored by Martin Lange's avatar Martin Lange
Browse files

use in-memory limit in time adapters

parent 90caf27a
No related branches found
No related tags found
1 merge request!238Data memory-mapping
"""
Adapters that deal with time, like temporal interpolation and integration.
"""
import os
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
......@@ -241,7 +242,7 @@ class TimeCachingAdapter(Adapter, NoBranchAdapter, ABC):
check_time(self.logger, time)
data = dtools.strip_time(self.pull_data(time, self), self._input_info.grid)
self.data.append((time, data))
self.data.append((time, self._pack(data)))
def _get_data(self, time, _target):
"""Get the output's data-set for the given time.
......@@ -267,7 +268,11 @@ class TimeCachingAdapter(Adapter, NoBranchAdapter, ABC):
def _clear_cached_data(self, time):
while len(self.data) > 1 and self.data[1][0] <= time:
self.data.pop(0)
d = self.data.pop(0)
if isinstance(d[1], str):
os.remove(d[1])
else:
self._total_mem -= d[1].nbytes
@abstractmethod
def _interpolate(self, time):
......@@ -289,13 +294,13 @@ class NextTime(TimeCachingAdapter):
def _interpolate(self, time):
if len(self.data) == 1:
return self.data[0][1]
return self._unpack(self.data[0][1])
for t, data in self.data:
if time > t:
continue
return data
return self._unpack(data)
raise FinamTimeError(
f"Time interpolation failed. This should not happen and is probably a bug. "
......@@ -318,16 +323,16 @@ class PreviousTime(TimeCachingAdapter):
def _interpolate(self, time):
if len(self.data) == 1:
return self.data[0][1]
return self._unpack(self.data[0][1])
for i, (t, data) in enumerate(self.data):
if time > t:
continue
if time == t:
return data
return self._unpack(data)
_, data_prev = self.data[i - 1]
return data_prev
return self._unpack(data_prev)
raise FinamTimeError(
f"Time interpolation failed. This should not happen and is probably a bug. "
......@@ -353,10 +358,10 @@ class StackTime(TimeCachingAdapter):
for t, data in self.data:
if time > t:
extract.append((t, data))
extract.append((t, self._unpack(data)))
continue
extract.append((t, data))
extract.append((t, self._unpack(data)))
break
arr = np.stack([d[1] for d in extract])
......@@ -396,13 +401,13 @@ class LinearTime(TimeCachingAdapter):
if time > t:
continue
if time == t:
return data
return self._unpack(data)
t_prev, data_prev = self.data[i - 1]
dt = (time - t_prev) / (t - t_prev)
result = interpolate(data_prev, data, dt)
result = interpolate(self._unpack(data_prev), self._unpack(data), dt)
return result
......@@ -459,7 +464,7 @@ class StepTime(TimeCachingAdapter):
if time > t:
continue
if time == t:
return data
return self._unpack(data)
t_prev, data_prev = self.data[i - 1]
......@@ -467,7 +472,7 @@ class StepTime(TimeCachingAdapter):
result = interpolate_step(data_prev, data, dt, self.step)
return result
return self._unpack(result)
raise FinamTimeError(
f"Time interpolation failed. This should not happen and is probably a bug. "
......
......@@ -27,7 +27,7 @@ class TimeIntegrationAdapter(TimeCachingAdapter, ABC):
check_time(self.logger, time)
data = tools.strip_time(self.pull_data(time, self), self._input_info.grid)
self.data.append((time, data))
self.data.append((time, self._pack(data)))
if self._prev_time is None:
self._prev_time = time
......@@ -111,18 +111,21 @@ class AvgOverTime(TimeIntegrationAdapter):
def _interpolate(self, time):
if len(self.data) == 1:
return self.data[0][1]
return self._unpack(self.data[0][1])
if time <= self.data[0][0]:
return self.data[0][1]
return self._unpack(self.data[0][1])
sum_value = None
t_old, v_old = self.data[0]
v_old = self._unpack(v_old)
for i in range(len(self.data) - 1):
t_old, v_old = self.data[i]
t_new, v_new = self.data[i + 1]
v_new = self._unpack(v_new)
if self._prev_time >= t_new:
t_old, v_old = t_new, v_new
continue
if time <= t_old:
break
......@@ -147,6 +150,8 @@ class AvgOverTime(TimeIntegrationAdapter):
sum_value = value if sum_value is None else sum_value + value
t_old, v_old = t_new, v_new
dt = time - self._prev_time
if dt.total_seconds() > 0:
sum_value /= dt.total_seconds() * tools.UNITS.Unit("s")
......@@ -239,20 +244,22 @@ class SumOverTime(TimeIntegrationAdapter):
if len(self.data) == 1 or time <= self.data[0][0]:
if self._per_time:
return (
self.data[0][1]
* self._initial_interval.total_seconds()
* tools.UNITS.Unit("s")
self._unpack(self.data[0][1])
* (self._initial_interval.total_seconds() * tools.UNITS.Unit("s"))
).to_reduced_units()
return self.data[0][1]
return self._unpack(self.data[0][1])
sum_value = None
t_old, v_old = self.data[0]
v_old = self._unpack(v_old)
for i in range(len(self.data) - 1):
t_old, v_old = self.data[i]
t_new, v_new = self.data[i + 1]
v_new = self._unpack(v_new)
if self._prev_time >= t_new:
t_old, v_old = t_new, v_new
continue
if time <= t_old:
break
......@@ -278,6 +285,8 @@ class SumOverTime(TimeIntegrationAdapter):
sum_value = value if sum_value is None else sum_value + value
t_old, v_old = t_new, v_new
if self._per_time:
return sum_value.to_reduced_units()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment