From 0512e45cc4d365a9591cf2af444cc7945718b834 Mon Sep 17 00:00:00 2001
From: Martin Lange <martin.lange@ufz.de>
Date: Wed, 7 Dec 2022 19:06:56 +0100
Subject: [PATCH] use in-memory limit in time adapters

---
 src/finam/adapters/time.py             | 31 +++++++++++++++-----------
 src/finam/adapters/time_integration.py | 27 ++++++++++++++--------
 2 files changed, 36 insertions(+), 22 deletions(-)

diff --git a/src/finam/adapters/time.py b/src/finam/adapters/time.py
index 16387d22..cd9f5e17 100644
--- a/src/finam/adapters/time.py
+++ b/src/finam/adapters/time.py
@@ -1,6 +1,7 @@
 """
 Adapters that deal with time, like temporal interpolation and integration.
 """
+import os
 from abc import ABC, abstractmethod
 from datetime import datetime, timedelta
 
@@ -241,7 +242,7 @@ class TimeCachingAdapter(Adapter, NoBranchAdapter, ABC):
         check_time(self.logger, time)
 
         data = dtools.strip_time(self.pull_data(time, self), self._input_info.grid)
-        self.data.append((time, data))
+        self.data.append((time, self._pack(data)))
 
     def _get_data(self, time, _target):
         """Get the output's data-set for the given time.
@@ -267,7 +268,11 @@ class TimeCachingAdapter(Adapter, NoBranchAdapter, ABC):
 
     def _clear_cached_data(self, time):
         while len(self.data) > 1 and self.data[1][0] <= time:
-            self.data.pop(0)
+            d = self.data.pop(0)
+            if isinstance(d[1], str):
+                os.remove(d[1])
+            else:
+                self._total_mem -= d[1].nbytes
 
     @abstractmethod
     def _interpolate(self, time):
@@ -289,13 +294,13 @@ class NextTime(TimeCachingAdapter):
 
     def _interpolate(self, time):
         if len(self.data) == 1:
-            return self.data[0][1]
+            return self._unpack(self.data[0][1])
 
         for t, data in self.data:
             if time > t:
                 continue
 
-            return data
+            return self._unpack(data)
 
         raise FinamTimeError(
             f"Time interpolation failed. This should not happen and is probably a bug. "
@@ -318,16 +323,16 @@ class PreviousTime(TimeCachingAdapter):
 
     def _interpolate(self, time):
         if len(self.data) == 1:
-            return self.data[0][1]
+            return self._unpack(self.data[0][1])
 
         for i, (t, data) in enumerate(self.data):
             if time > t:
                 continue
             if time == t:
-                return data
+                return self._unpack(data)
 
             _, data_prev = self.data[i - 1]
-            return data_prev
+            return self._unpack(data_prev)
 
         raise FinamTimeError(
             f"Time interpolation failed. This should not happen and is probably a bug. "
@@ -353,10 +358,10 @@ class StackTime(TimeCachingAdapter):
 
         for t, data in self.data:
             if time > t:
-                extract.append((t, data))
+                extract.append((t, self._unpack(data)))
                 continue
 
-            extract.append((t, data))
+            extract.append((t, self._unpack(data)))
             break
 
         arr = np.stack([d[1] for d in extract])
@@ -396,13 +401,13 @@ class LinearTime(TimeCachingAdapter):
             if time > t:
                 continue
             if time == t:
-                return data
+                return self._unpack(data)
 
             t_prev, data_prev = self.data[i - 1]
 
             dt = (time - t_prev) / (t - t_prev)
 
-            result = interpolate(data_prev, data, dt)
+            result = interpolate(self._unpack(data_prev), self._unpack(data), dt)
 
             return result
 
@@ -459,7 +464,7 @@ class StepTime(TimeCachingAdapter):
             if time > t:
                 continue
             if time == t:
-                return data
+                return self._unpack(data)
 
             t_prev, data_prev = self.data[i - 1]
 
@@ -467,7 +472,7 @@ class StepTime(TimeCachingAdapter):
 
             result = interpolate_step(data_prev, data, dt, self.step)
 
-            return result
+            return self._unpack(result)
 
         raise FinamTimeError(
             f"Time interpolation failed. This should not happen and is probably a bug. "
diff --git a/src/finam/adapters/time_integration.py b/src/finam/adapters/time_integration.py
index 45e4111d..5d18cd59 100644
--- a/src/finam/adapters/time_integration.py
+++ b/src/finam/adapters/time_integration.py
@@ -27,7 +27,7 @@ class TimeIntegrationAdapter(TimeCachingAdapter, ABC):
         check_time(self.logger, time)
 
         data = tools.strip_time(self.pull_data(time, self), self._input_info.grid)
-        self.data.append((time, data))
+        self.data.append((time, self._pack(data)))
 
         if self._prev_time is None:
             self._prev_time = time
@@ -111,18 +111,21 @@ class AvgOverTime(TimeIntegrationAdapter):
 
     def _interpolate(self, time):
         if len(self.data) == 1:
-            return self.data[0][1]
+            return self._unpack(self.data[0][1])
 
         if time <= self.data[0][0]:
-            return self.data[0][1]
+            return self._unpack(self.data[0][1])
 
         sum_value = None
 
+        t_old, v_old = self.data[0]
+        v_old = self._unpack(v_old)
         for i in range(len(self.data) - 1):
-            t_old, v_old = self.data[i]
             t_new, v_new = self.data[i + 1]
+            v_new = self._unpack(v_new)
 
             if self._prev_time >= t_new:
+                t_old, v_old = t_new, v_new
                 continue
             if time <= t_old:
                 break
@@ -147,6 +150,8 @@ class AvgOverTime(TimeIntegrationAdapter):
 
             sum_value = value if sum_value is None else sum_value + value
 
+            t_old, v_old = t_new, v_new
+
         dt = time - self._prev_time
         if dt.total_seconds() > 0:
             sum_value /= dt.total_seconds() * tools.UNITS.Unit("s")
@@ -239,20 +244,22 @@ class SumOverTime(TimeIntegrationAdapter):
         if len(self.data) == 1 or time <= self.data[0][0]:
             if self._per_time:
                 return (
-                    self.data[0][1]
-                    * self._initial_interval.total_seconds()
-                    * tools.UNITS.Unit("s")
+                    self._unpack(self.data[0][1])
+                    * (self._initial_interval.total_seconds() * tools.UNITS.Unit("s"))
                 ).to_reduced_units()
 
-            return self.data[0][1]
+            return self._unpack(self.data[0][1])
 
         sum_value = None
 
+        t_old, v_old = self.data[0]
+        v_old = self._unpack(v_old)
         for i in range(len(self.data) - 1):
-            t_old, v_old = self.data[i]
             t_new, v_new = self.data[i + 1]
+            v_new = self._unpack(v_new)
 
             if self._prev_time >= t_new:
+                t_old, v_old = t_new, v_new
                 continue
             if time <= t_old:
                 break
@@ -278,6 +285,8 @@ class SumOverTime(TimeIntegrationAdapter):
 
             sum_value = value if sum_value is None else sum_value + value
 
+            t_old, v_old = t_new, v_new
+
         if self._per_time:
             return sum_value.to_reduced_units()
 
-- 
GitLab