-
Sebastian Müller authoredc499b3fe
test_masked.py 7.20 KiB
"""
Unit tests for masked data.
"""
import unittest
from datetime import datetime, timedelta
import numpy as np
import finam as fm
def gen_masked(seed, shape):
data = np.random.default_rng(seed).random(size=shape)
mask = np.full(shape, False)
# mask corners
mask[0, 0] = mask[0, -1] = mask[-1, 0] = mask[-1, -1] = True
return np.ma.masked_array(data, mask, fill_value=np.nan)
class TestMasked(unittest.TestCase):
def test_rescale_masked(self):
time = datetime(2000, 1, 1)
in_info = fm.Info(
time=time,
grid=fm.UniformGrid(
dims=(5, 4),
spacing=(1.0, 1.0, 1.0),
data_location=fm.Location.CELLS,
),
units="m",
missing_value=np.nan,
)
source = fm.modules.generators.CallbackGenerator(
callbacks={
"Output": (
lambda t: gen_masked(t.toordinal(), in_info.grid.data_shape),
in_info,
)
},
start=time,
step=timedelta(days=1),
)
sink = fm.modules.debug.DebugConsumer(
{"Input": fm.Info(None, grid=None, units=None)},
start=time,
step=timedelta(days=1),
)
composition = fm.Composition([source, sink])
source.outputs["Output"] >> fm.adapters.Scale(scale=2.0) >> sink.inputs["Input"]
composition.connect()
self.assertTrue(sink.data["Input"][0][0, 0].mask)
self.assertAlmostEqual(
sink.data["Input"][0][1, 1].magnitude,
2 * source.outputs["Output"].data[0][1][0][1, 1].magnitude,
)
def test_mask_tools(self):
data = gen_masked(1234, (4, 3))
mask = data.mask
xdata = fm.data.quantify(data)
sub_domain = np.full_like(data, fill_value=False, dtype=bool)
self.assertFalse(fm.data.check_data_covers_domain(data, sub_domain))
self.assertFalse(fm.data.check_data_covers_domain(xdata, sub_domain))
sub_domain = data.mask
self.assertTrue(fm.data.check_data_covers_domain(data, sub_domain))
self.assertTrue(fm.data.check_data_covers_domain(xdata, sub_domain))
sub_domain[0, 1] = True
self.assertTrue(fm.data.check_data_covers_domain(data, sub_domain))
self.assertTrue(fm.data.check_data_covers_domain(xdata, sub_domain))
with self.assertRaises(ValueError):
self.assertTrue(fm.data.check_data_covers_domain(data, [False, True]))
self.assertTrue(fm.data.check_data_covers_domain(xdata, [False, True]))
fdata = fm.data.filled(data)
fxdata = fm.data.filled(xdata)
self.assertTrue(fm.data.is_masked_array(data))
self.assertTrue(fm.data.is_masked_array(xdata))
self.assertTrue(fm.data.has_masked_values(data))
self.assertTrue(fm.data.has_masked_values(xdata))
self.assertFalse(fm.data.is_masked_array(fdata))
self.assertFalse(fm.data.is_masked_array(fxdata))
self.assertFalse(fm.data.has_masked_values(fdata))
self.assertFalse(fm.data.has_masked_values(fxdata))
np.testing.assert_allclose(data[mask], fdata[mask])
np.testing.assert_allclose(xdata[mask].magnitude, fxdata[mask].magnitude)
mfdata = fm.data.to_masked(fdata, mask=False)
mfxdata = fm.data.to_masked(fxdata, mask=False)
self.assertTrue(fm.data.is_masked_array(mfdata))
self.assertTrue(fm.data.is_masked_array(mfxdata))
self.assertFalse(fm.data.has_masked_values(mfdata))
self.assertFalse(fm.data.has_masked_values(mfxdata))
cdata_c = fm.data.to_compressed(data, order="C")
cdata_f = fm.data.to_compressed(data, order="F")
ucdata_c = fm.data.from_compressed(
cdata_c, shape=data.shape, order="C", mask=mask
)
ucdata_f = fm.data.from_compressed(
cdata_f, shape=data.shape, order="F", mask=mask
)
self.assertEqual(cdata_c.size, np.sum(~mask))
self.assertEqual(cdata_f.size, np.sum(~mask))
self.assertNotEqual(cdata_c[1], cdata_f[1])
np.testing.assert_allclose(data[mask], ucdata_c[mask])
np.testing.assert_allclose(data[mask], ucdata_f[mask])
# more specific routines
grid1 = fm.RectilinearGrid([(1.0, 2.0, 3.0, 4.0)])
grid2 = fm.RectilinearGrid([(1.0, 2.0, 3.0)])
grid3 = fm.RectilinearGrid([(1.0, 2.0, 3.0), (1.0, 2.0, 3.0)])
mask1 = np.array((1, 0, 0), dtype=bool)
mask2 = np.array((0, 0, 1), dtype=bool)
mask3 = np.array((1, 0, 1), dtype=bool)
mask4 = np.array((1, 1, 1), dtype=bool)
mask5 = np.array((0, 0, 0), dtype=bool)
mask6 = np.array((1, 0), dtype=bool)
mask7 = np.array(((0, 1), (0, 1)), dtype=bool)
data = np.ma.masked_array((10.0, 20.0, 30.0), mask1, fill_value=np.nan)
# submask check
self.assertFalse(fm.data.tools.is_sub_mask(mask1, mask2))
self.assertFalse(fm.data.tools.is_sub_mask(mask1, np.ma.nomask))
self.assertTrue(fm.data.tools.is_sub_mask(mask1, mask3))
self.assertTrue(fm.data.tools.is_sub_mask(mask2, mask3))
self.assertTrue(fm.data.tools.is_sub_mask(np.ma.nomask, mask1))
self.assertTrue(fm.data.tools.is_sub_mask(np.ma.nomask, mask2))
self.assertTrue(fm.data.tools.is_sub_mask(np.ma.nomask, mask3))
self.assertTrue(fm.data.tools.is_sub_mask(np.ma.nomask, mask4))
self.assertTrue(fm.data.tools.is_sub_mask(np.ma.nomask, mask5))
self.assertTrue(fm.data.tools.is_sub_mask(np.ma.nomask, np.ma.nomask))
self.assertTrue(fm.data.tools.is_sub_mask(mask5, np.ma.nomask))
self.assertFalse(fm.data.tools.is_sub_mask(mask1, mask6))
self.assertFalse(fm.data.tools.is_sub_mask(mask1, mask7))
# equal mask
self.assertTrue(fm.data.tools.masks_equal(None, None))
self.assertTrue(fm.data.tools.masks_equal(fm.Mask.NONE, fm.Mask.NONE))
self.assertTrue(fm.data.tools.masks_equal(fm.Mask.FLEX, fm.Mask.FLEX))
self.assertFalse(fm.data.tools.masks_equal(fm.Mask.FLEX, fm.Mask.NONE))
self.assertTrue(fm.data.tools.masks_equal(np.ma.nomask, np.ma.nomask))
self.assertTrue(fm.data.tools.masks_equal(np.ma.nomask, mask5))
self.assertTrue(fm.data.tools.masks_equal(mask5, np.ma.nomask))
self.assertFalse(fm.data.tools.masks_equal(mask1, mask6, grid1, grid2))
self.assertFalse(fm.data.tools.masks_equal(mask1, mask7, grid1, grid3))
self.assertFalse(fm.data.tools.masks_equal(mask1, mask2, grid1, grid1))
# cover domain
self.assertTrue(fm.data.tools.check_data_covers_domain(data, mask3))
self.assertFalse(fm.data.tools.check_data_covers_domain(data, mask2))
self.assertFalse(fm.data.tools.check_data_covers_domain(data, np.ma.nomask))
np.testing.assert_array_almost_equal(data, fm.data.tools.to_masked(data))
np.testing.assert_array_almost_equal((1, 2, 3), fm.data.tools.filled((1, 2, 3)))
def test_info_mask(self):
grid = fm.RectilinearGrid([(1.0, 2.0, 3.0)])
mask = np.array((1, 0, 0), dtype=bool)
with self.assertRaises(fm.FinamMetaDataError):
fm.Info(grid=grid, mask=mask)
if __name__ == "__main__":
unittest.main()