Skip to content
Snippets Groups Projects
Commit 042eebaf authored by Sebastian Müller's avatar Sebastian Müller 🐈
Browse files

update bindings for FINAM 0.4

parent a9c46a53
No related branches found
No related tags found
No related merge requests found
......@@ -3,34 +3,30 @@ Simple coupling setup using live view modules.
"""
from datetime import datetime, timedelta
import numpy as np
from finam.adapters import base, time
from finam.core.schedule import Composition
from finam.modules.visual import time_series
from matplotlib import pyplot as plt
from finam_mhm_module import Mhm
def grid_select(grid):
col, row = 3, 5
return grid[col * 9 + row]
plot = time_series.TimeSeriesView(
start=datetime(1990, 1, 1),
step=timedelta(days=1),
inputs=["Runoff"],
intervals=[1],
import finam as fm
import finam_mhm as fm_mhm
import finam_plot as fm_plt
import finam_netcdf as fm_nc
start_date = datetime(1990, 1, 1)
day = timedelta(days=1)
mhm = fm_mhm.MHM(cwd="../../MHM/mhm/test_domain")
runoff_viewer = fm_plt.ImagePlot(vmin=0.0, vmax=650)
# netcdf writing files
writer = fm_nc.NetCdfTimedWriter(
path="qmod.nc",
inputs={"QMOD": fm_nc.Layer(var="QMOD", xyz=("x", "y"))},
time_var="time",
start=start_date,
step=day,
)
mhm = Mhm(cwd="../../MHM/mhm")
composition = Composition([mhm, plot])
composition = fm.Composition([mhm, writer, runoff_viewer])
composition.initialize()
grid_value = mhm.outputs["L1_TOTAL_RUNOFF"] >> base.GridToValue(func=grid_select)
grid_value >> time.LinearInterpolation() >> plot.inputs["Runoff"]
mhm.outputs["L11_QMOD"] >> writer.inputs["QMOD"]
mhm.outputs["L11_QMOD"] >> runoff_viewer.inputs["Grid"]
composition.run(datetime(1992, 1, 1))
plt.show()
composition.run(end_time=datetime(1992, 1, 1))
from .component import MHM
"""
FINAM mHM module.
"""
from datetime import datetime, timedelta
import numpy as np
import finam as fm
import mhm
OUTPUT_META = {
"L0_GRIDDED_LAI": dict(unit="1", long_name="leaf area index"),
"L1_FSEALED": dict(unit="1", long_name="Fraction of sealed area"),
"L1_FNOTSEALED": dict(unit="1", long_name="Fraction of unsealed area"),
"L1_INTER": dict(unit="mm / h", long_name="Interception"),
"L1_SNOWPACK": dict(unit="mm / h", long_name="Snowpack"),
"L1_SEALSTW": dict(
unit="mm / h", long_name="Retention storage of impervious areas"
),
"L1_UNSATSTW": dict(unit="mm / h", long_name="upper soil storage"),
"L1_SATSTW": dict(unit="mm / h", long_name="groundwater storage"),
"L1_NEUTRONS": dict(unit="mm / h", long_name="Ground Albedo Neutrons"),
"L1_PET_CALC": dict(unit="mm / h", long_name="potential evapotranspiration"),
"L1_AETCANOPY": dict(
unit="mm / h", long_name="Real evaporation intensity from canopy"
),
"L1_AETSEALED": dict(
unit="mm / h", long_name="Real evap. from free water surfaces"
),
"L1_TOTAL_RUNOFF": dict(unit="m^3 / h", long_name="Generated runoff"),
"L1_RUNOFFSEAL": dict(
unit="mm / h", long_name="Direct runoff from impervious areas"
),
"L1_FASTRUNOFF": dict(unit="mm / h", long_name="Fast runoff component"),
"L1_SLOWRUNOFF": dict(unit="mm / h", long_name="Slow runoff component"),
"L1_BASEFLOW": dict(unit="mm / h", long_name="Baseflow"),
"L1_PERCOL": dict(unit="mm / h", long_name="Percolation"),
"L1_PREEFFECT": dict(unit="mm / h", long_name="Effective precip. depth"),
"L1_SOILMOIST_VOL_ALL": dict(
unit="1", long_name="average soil moisture over all layers"
),
"L11_QMOD": dict(unit="m^3 / s", long_name="Simulated discharge"),
"L11_QOUT": dict(unit="m^3 / s", long_name="Total outflow from cells"),
}
"""dict: meta information about available outputs in mHM."""
INPUT_META = {
"L0_GRIDDED_LAI": dict(unit="1", long_name="leaf area index"),
}
"""dict: meta information about available inputs in mHM."""
class MHM(fm.TimeComponent):
def __init__(
self,
namelist_mhm="mhm.nml",
namelist_mhm_param="mhm_parameter.nml",
namelist_mhm_output="mhm_outputs.nml",
namelist_mrm_output="mrm_outputs.nml",
cwd=".",
input_names=None,
):
super().__init__()
self.OUTPUT_NAMES = list(OUTPUT_META)
self.INPUT_NAMES = (
[] if input_names is None else [n.upper() for n in input_names]
)
for in_name in self.INPUT_NAMES:
if in_name not in INPUT_META:
raise ValueError(f"mHM: input '{in_name}' is not available.")
self.namelist_mhm = namelist_mhm
self.namelist_mhm_param = namelist_mhm_param
self.namelist_mhm_output = namelist_mhm_output
self.namelist_mrm_output = namelist_mrm_output
self.cwd = cwd # needed for @fm.tools.execute_in_cwd
# mHM always has hourly stepping
self.step = timedelta(hours=1)
@property
def next_time(self):
"""Next pull time."""
return self.time + self.step
def _get(self, var):
value = mhm.get_variable(var)
value.fill_value = np.nan
return value.filled()
@fm.tools.execute_in_cwd
def _initialize(self):
# only show errors
mhm.model.set_verbosity(level=1)
# init
mhm.model.init(
namelist_mhm=self.namelist_mhm,
namelist_mhm_param=self.namelist_mhm_param,
namelist_mhm_output=self.namelist_mhm_output,
namelist_mrm_output=self.namelist_mrm_output,
cwd=".",
)
# disable file output of mHM
mhm.model.disable_output()
mhm.run.prepare()
# only one domain possible
mhm.run.prepare_domain()
# set time
year, month, day, hour = mhm.run.current_time()
self.time = datetime(year=year, month=month, day=max(day, 0), hour=max(hour, 0))
# first time step compensate by negative values in mHM
if day < 0 or hour < 0:
self.time += timedelta(days=min(day, 0), hours=min(hour, 0))
# store Grid specifications
self.gridspec = {}
# get grid info l0 (swap rows/cols to get "ij" indexing)
nrows, ncols, __, xll, yll, cell_size, no_data = mhm.get.l0_domain_info()
self.no_data = no_data
self.gridspec["L0"] = fm.EsriGrid(
ncols=ncols, nrows=nrows, cellsize=cell_size, xllcorner=xll, yllcorner=yll
)
# get grid info l1 (swap rows/cols to get "ij" indexing)
nrows, ncols, __, xll, yll, cell_size, no_data = mhm.get.l1_domain_info()
self.gridspec["L1"] = fm.EsriGrid(
ncols=ncols, nrows=nrows, cellsize=cell_size, xllcorner=xll, yllcorner=yll
)
# get grid info l11 (swap rows/cols to get "ij" indexing)
nrows, ncols, __, xll, yll, cell_size, no_data = mhm.get.l11_domain_info()
self.gridspec["L11"] = fm.EsriGrid(
ncols=ncols, nrows=nrows, cellsize=cell_size, xllcorner=xll, yllcorner=yll
)
print(self.gridspec["L11"].nrows)
print(self.gridspec["L11"].ncols)
print(self.gridspec["L11"].axes_names)
# get grid info l2 (swap rows/cols to get "ij" indexing)
nrows, ncols, __, xll, yll, cell_size, no_data = mhm.get.l2_domain_info()
self.gridspec["L2"] = fm.EsriGrid(
ncols=ncols, nrows=nrows, cellsize=cell_size, xllcorner=xll, yllcorner=yll
)
for var in self.OUTPUT_NAMES:
grid_name = var.split("_")[0]
self.outputs.add(name=var, time=self.time, grid=self.gridspec[grid_name], **OUTPUT_META[var])
for var in self.INPUT_NAMES:
grid_name = var.split("_")[0]
self.inputs.add(name=var, time=self.time, grid=self.gridspec[grid_name], **INPUT_META[var])
self.create_connector()
def _connect(self, start_time):
push_data = {var: self._get(var) for var in self.OUTPUT_NAMES}
self.try_connect(start_time=start_time, push_data=push_data)
@fm.tools.execute_in_cwd
def _update(self):
# Don't run further than mHM can
if mhm.run.finished():
return
mhm.run.do_time_step()
# update time
year, month, day, hour = mhm.run.current_time()
self.time = datetime(year=year, month=month, day=day, hour=hour)
# push outputs
for var in self.OUTPUT_NAMES:
if not self.outputs[var].has_targets:
continue
self.outputs[var].push_data(
data=self._get(var),
time=self.time,
)
@fm.tools.execute_in_cwd
def _finalize(self):
with fm.tools.LogCStdOutStdErr(self.logger):
mhm.run.finalize_domain()
mhm.run.finalize()
mhm.model.finalize()
from .mhm import Mhm
"""
FINAM mHM module.
"""
from datetime import datetime
import mhm_pybind as mp
import numpy as np
from finam.core.interfaces import ComponentStatus
from finam.core.sdk import ATimeComponent, Input, Output
from finam.data.grid import Grid, GridSpec
from finam.tools.cwd_helper import execute_in_cwd
class Mhm(ATimeComponent):
OUTPUT_NAMES = [
"L0_GRIDDED_LAI",
"L1_FSEALED",
"L1_FNOTSEALED",
"L1_INTER",
"L1_SNOWPACK",
"L1_SEALSTW",
"L1_UNSATSTW",
"L1_SATSTW",
"L1_NEUTRONS",
"L1_PET_CALC",
"L1_AETCANOPY",
"L1_AETSEALED",
"L1_TOTAL_RUNOFF",
"L1_RUNOFFSEAL",
"L1_FASTRUNOFF",
"L1_SLOWRUNOFF",
"L1_BASEFLOW",
"L1_PERCOL",
"L1_PREEFFECT",
"L1_SOILMOIST_VOL_ALL",
"L11_QMOD",
"L11_QOUT",
"L11_QTIN",
"L11_QTR",
]
def __init__(
self,
namelist_mhm="mhm.nml",
namelist_mhm_param="mhm_parameter.nml",
namelist_mhm_output="mhm_outputs.nml",
namelist_mrm_output="mrm_outputs.nml",
cwd=".",
input_names=None,
):
self.INPUT_NAMES = [] if input_names is None else list(input_names)
super(Mhm, self).__init__()
self.namelist_mhm = namelist_mhm
self.namelist_mhm_param = namelist_mhm_param
self.namelist_mhm_output = namelist_mhm_output
self.namelist_mrm_output = namelist_mrm_output
self.cwd = cwd # needed for @execute_in_cwd
self._status = ComponentStatus.CREATED
@execute_in_cwd
def initialize(self):
super().initialize()
mp.mhm.init(
namelist_mhm=self.namelist_mhm,
namelist_mhm_param=self.namelist_mhm_param,
namelist_mhm_output=self.namelist_mhm_output,
namelist_mrm_output=self.namelist_mrm_output,
cwd=".",
)
mp.run.prepare()
mp.run.prepare_domain()
# set time
year, month, day, hour = mp.run.current_time()
hour = max(hour, 0) # fix for first time step
self._time = datetime(year=year, month=month, day=day, hour=hour)
self.gridspec = {}
# get grid info l0 (swap rows/cols to get "ij" indexing)
nrows, ncols, __, xll, yll, cell_size, no_data = mp.get.l0_domain_info()
self.no_data = no_data
self.gridspec["L0"] = GridSpec(
ncols=ncols, nrows=nrows, cell_size=cell_size, xll=xll, yll=yll
)
# get grid info l1 (swap rows/cols to get "ij" indexing)
nrows, ncols, __, xll, yll, cell_size, no_data = mp.get.l1_domain_info()
self.gridspec["L1"] = GridSpec(
ncols=ncols, nrows=nrows, cell_size=cell_size, xll=xll, yll=yll
)
# get grid info l11 (swap rows/cols to get "ij" indexing)
nrows, ncols, __, xll, yll, cell_size, no_data = mp.get.l11_domain_info()
self.gridspec["L11"] = GridSpec(
ncols=ncols, nrows=nrows, cell_size=cell_size, xll=xll, yll=yll
)
# get grid info l2 (swap rows/cols to get "ij" indexing)
nrows, ncols, __, xll, yll, cell_size, no_data = mp.get.l2_domain_info()
self.gridspec["L2"] = GridSpec(
ncols=ncols, nrows=nrows, cell_size=cell_size, xll=xll, yll=yll
)
for var in self.OUTPUT_NAMES:
self.outputs[var] = Output()
for var in self.INPUT_NAMES:
self.inputs[var] = Input()
self._status = ComponentStatus.INITIALIZED
def connect(self):
super().connect()
for var in self.OUTPUT_NAMES:
if not self.outputs[var].has_targets:
continue
self.outputs[var].push_data(
data=Grid(
spec=self.gridspec[var.split("_")[0]],
no_data=self.no_data,
# flip upside down to use lower-left corner as origin
data=np.flipud(
mp.get_variable(var, indexing="ij").filled()
).reshape(-1),
),
time=self.time,
)
self._status = ComponentStatus.CONNECTED
def validate(self):
super().validate()
# TODO: add checks if connected outputs are compatible with process selection
self._status = ComponentStatus.VALIDATED
@execute_in_cwd
def update(self):
super().update()
# Don't run further than mHM can
if mp.run.finished():
return
mp.run.do_time_step()
mp.run.write_output() # do we want this here?
# update time
year, month, day, hour = mp.run.current_time()
self._time = datetime(year=year, month=month, day=day, hour=hour)
# push outputs
for var in self.OUTPUT_NAMES:
if not self.outputs[var].has_targets:
continue
self.outputs[var].push_data(
data=Grid(
spec=self.gridspec[var.split("_")[0]],
no_data=self.no_data,
# flip upside down to use lower-left corner as origin
data=np.flipud(
mp.get_variable(var, indexing="ij").filled()
).reshape(-1),
),
time=self.time,
)
self._status = ComponentStatus.UPDATED
@execute_in_cwd
def finalize(self):
super().finalize()
mp.run.finalize_domain()
mp.run.finalize()
mp.mhm.finalize()
self._status = ComponentStatus.FINALIZED
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment