Skip to content
Snippets Groups Projects
Commit 3310508b authored by Sebastian Müller's avatar Sebastian Müller 🐈
Browse files

data.tools: move mask related routine to submodule

parent 0a337b63
No related branches found
No related tags found
1 merge request!286Add mask to Info object
......@@ -2,25 +2,27 @@
from .core import (
Info,
Mask,
assert_type,
check,
check_data_covers_domain,
filled,
from_compressed,
full,
full_like,
has_masked_values,
has_time_axis,
prepare,
strip_time,
to_datetime,
)
from .mask import (
Mask,
check_data_covers_domain,
from_compressed,
has_masked_values,
is_masked_array,
is_sub_mask,
mask_specified,
masks_compatible,
masks_equal,
prepare,
strip_time,
to_compressed,
to_datetime,
to_masked,
)
from .units import (
......
......@@ -2,7 +2,6 @@
import copy
import datetime
from enum import Enum
import numpy as np
import pandas as pd
......@@ -10,6 +9,14 @@ import pandas as pd
from ...errors import FinamDataError, FinamMetaDataError
from .. import grid_spec
from ..grid_base import Grid, GridBase
from .mask import (
MASK_INDICATORS,
Mask,
is_masked_array,
mask_specified,
masks_compatible,
masks_equal,
)
from .units import (
UNITS,
check_quantified,
......@@ -17,20 +24,8 @@ from .units import (
equivalent_units,
get_units,
is_quantified,
quantify,
)
_MASK_INDICATORS = ["_FillValue", "missing_value"]
class Mask(Enum):
"""Mask settings for Info."""
FLEX = 0
"""Data can be masked or unmasked."""
NONE = 1
"""Data is expected to be unmasked and given as plain numpy arrays."""
def prepare(data, info, time_entries=1, force_copy=False, report_conversion=False):
"""
......@@ -350,42 +345,6 @@ def _check_shape(shape, grid):
)
def is_masked_array(data):
"""
Check if data is a masked array.
Parameters
----------
data : Any
The given data array.
Returns
-------
bool
Whether the data is a MaskedArray.
"""
if is_quantified(data):
return np.ma.isMaskedArray(data.magnitude)
return np.ma.isMaskedArray(data)
def has_masked_values(data):
"""
Determine whether the data has masked values.
Parameters
----------
data : Any
The given data array.
Returns
-------
bool
Whether the data is a MaskedArray and has any masked values.
"""
return np.ma.is_masked(data)
def filled(data, fill_value=None):
"""
Return input as an array with masked data replaced by a fill value.
......@@ -421,156 +380,6 @@ def filled(data, fill_value=None):
return data.filled(fill_value)
def to_masked(data, **kwargs):
"""
Return a masked version of the data.
Parameters
----------
data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
The reference object input.
**kwargs
keyword arguments forwarded to :any:`numpy.ma.array`
Returns
-------
pint.Quantity or numpy.ma.MaskedArray
New object with the same shape and type but as a masked array.
Units will be taken from the input if present.
"""
if is_masked_array(data) and not kwargs:
return data
if is_quantified(data):
return UNITS.Quantity(np.ma.array(data.magnitude, **kwargs), data.units)
return np.ma.array(data, **kwargs)
def to_compressed(xdata, order="C", mask=None):
"""
Return all the non-masked data as a 1-D array respecting the given array order.
Parameters
----------
data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
The reference object input.
order : str
order argument for :any:`numpy.ravel`
mask : :any:`Mask` value or valid boolean mask for :any:`MaskedArray`, optional
mask to use when data is not masked already
Returns
-------
:class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
New object with the flat shape and only unmasked data but and same type as input.
Units will be taken from the input if present.
See also
--------
:func:`numpy.ma.compressed`:
Numpy routine doing the same but only for C-order.
"""
is_masked = is_masked_array(xdata)
if is_masked or (mask is not None and mask_specified(mask)):
data = np.ravel(xdata.data if is_masked else xdata, order)
mask = xdata.mask if is_masked else mask
if mask is not np.ma.nomask:
data = data.compress(np.logical_not(np.ravel(mask, order)))
return quantify(data, xdata.units) if is_quantified(xdata) else data
return np.reshape(xdata, -1, order=order)
def from_compressed(xdata, shape, order="C", mask=None, **kwargs):
"""
Fill a (masked) array following a given mask or shape with the provided data.
This will only create a masked array if kwargs are given (especially a mask).
Otherwise this is simply reshaping the given data.
Filling is performed in the given array order.
Parameters
----------
data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
The reference object input.
shape : str
shape argument for :any:`numpy.reshape`
order : str
order argument for :any:`numpy.reshape`
mask : :any:`Mask` value or valid boolean mask for :any:`MaskedArray`
mask to use
**kwargs
keyword arguments forwarded to :any:`numpy.ma.array`
Returns
-------
:class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
New object with the desired shape and same type as input.
Units will be taken from the input if present.
Will only be a masked array if kwargs are given.
See also
--------
to_compressed:
Inverse operation.
:any:`numpy.ma.array`:
Routine consuming kwargs to create a masked array.
:any:`numpy.reshape`:
Equivalent routine if no mask is provided.
Notes
-----
If both `mask` and `shape` are given, they need to match in size.
"""
if mask is None or mask is np.ma.nomask or not mask_specified(mask):
if kwargs and mask is Mask.NONE:
msg = "from_compressed: Can't create masked array with mask=Mask.NONE"
raise FinamDataError(msg)
data = np.reshape(xdata, shape, order=order)
return to_masked(data, **kwargs) if kwargs or mask is np.ma.nomask else data
if is_quantified(xdata):
# pylint: disable-next=unexpected-keyword-arg
data = quantify(np.empty_like(xdata, shape=np.prod(shape)), xdata.units)
else:
# pylint: disable-next=unexpected-keyword-arg
data = np.empty_like(xdata, shape=np.prod(shape))
data[np.logical_not(np.ravel(mask, order=order))] = xdata
return to_masked(np.reshape(data, shape, order=order), mask=mask, **kwargs)
def check_data_covers_domain(data, mask=None):
"""
Check if the given data covers a domain defined by a mask on the same grid.
Parameters
----------
data : Any
The given data array for a single time-step.
mask : None or bool or array of bool, optional
Mask defining the target domain on the same grid as the data,
by default None
Returns
-------
bool
Whether the data covers the desired domain.
Raises
------
ValueError
When mask is given and mask and data don't share the same shape.
"""
if not _is_single_mask_value(mask) and np.shape(mask) != np.shape(data):
raise ValueError("check_data_covers_domain: mask and data shape differ.")
if not has_masked_values(data):
return True
if _is_single_mask_value(mask):
return bool(mask)
return np.all(mask[data.mask])
def _is_single_mask_value(mask):
return mask is None or mask is np.ma.nomask or mask is False or mask is True
def assert_type(cls, slot, obj, types):
"""Type assertion."""
for t in types:
......@@ -583,128 +392,6 @@ def assert_type(cls, slot, obj, types):
)
def masks_compatible(this, incoming, incoming_donwstream):
"""
Check if an incoming mask is compatible with a given mask.
Parameters
----------
this : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None
mask specification to check against
incoming : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None
incoming mask to check for compatibility
incoming_donwstream : bool
Whether the incoming mask is from downstream data
Returns
-------
bool
mask compatibility
"""
if incoming_donwstream:
upstream, downstream = this, incoming
else:
upstream, downstream = incoming, this
# None is incompatible
if upstream is None:
return False
# Mask.FLEX accepts anything, Mask.NONE only Mask.NONE
if not mask_specified(downstream):
if not mask_specified(upstream):
return downstream == Mask.FLEX or upstream == Mask.NONE
return downstream == Mask.FLEX
# if mask is specified, upstream mask must also be specified
if not mask_specified(upstream):
return False
# if both mask given, compare them
return masks_equal(downstream, upstream)
def masks_equal(this, other):
"""
Check two masks for equality.
Parameters
----------
this : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None
first mask
incoming : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None
second mask
Returns
-------
bool
mask equality
"""
if this is None and other is None:
return True
if not mask_specified(this) and not mask_specified(other):
return this == other
# need a valid mask at this point
if not np.ma.is_mask(this) or not np.ma.is_mask(other):
return False
# special treatment of "nomask"
if this is np.ma.nomask:
if other is np.ma.nomask:
return True
return not np.any(other)
if other is np.ma.nomask:
return not np.any(this)
# compare masks
if not np.ndim(this) == np.ndim(other):
return False
if not np.all(np.shape(this) == np.shape(other)):
return False
return np.all(this == other)
def is_sub_mask(mask, submask):
"""
Check for a sub-mask.
Parameters
----------
mask : arraylike
The original mask.
submask : arraylike
The potential submask.
Returns
-------
bool
Whether 'submask' is a sub-mask of 'mask'.
"""
if not np.ma.is_mask(mask) or not np.ma.is_mask(submask):
return False
if mask is np.ma.nomask:
return True
if submask is np.ma.nomask:
return not np.any(mask)
if not np.ndim(mask) == np.ndim(submask):
return False
if not np.all(np.shape(mask) == np.shape(submask)):
return False
return np.all(submask[mask])
def mask_specified(mask):
"""
Determine whether given mask selection indicates a masked array.
Parameters
----------
mask : :any:`Mask` value or valid boolean mask for :any:`MaskedArray`
mask to check
Returns
-------
bool
False if mask is Mask.FLEX or Mask.NONE, True otherwise
"""
return not any(mask is val for val in list(Mask))
def _format_mask(mask):
if mask_specified(mask) and mask is not np.ma.nomask:
return "<ndarray>"
......@@ -769,7 +456,7 @@ class Info:
def fill_value(self):
"""Fill value for masked data."""
return self.meta.get(
_MASK_INDICATORS[0], self.meta.get(_MASK_INDICATORS[1], None)
MASK_INDICATORS[0], self.meta.get(MASK_INDICATORS[1], None)
)
def copy(self):
......
"""Data tools for FINAM."""
from enum import Enum
import numpy as np
from ...errors import FinamDataError
from .units import UNITS, is_quantified, quantify
MASK_INDICATORS = ["_FillValue", "missing_value"]
class Mask(Enum):
"""Mask settings for Info."""
FLEX = 0
"""Data can be masked or unmasked."""
NONE = 1
"""Data is expected to be unmasked and given as plain numpy arrays."""
def is_masked_array(data):
"""
Check if data is a masked array.
Parameters
----------
data : Any
The given data array.
Returns
-------
bool
Whether the data is a MaskedArray.
"""
if is_quantified(data):
return np.ma.isMaskedArray(data.magnitude)
return np.ma.isMaskedArray(data)
def has_masked_values(data):
"""
Determine whether the data has masked values.
Parameters
----------
data : Any
The given data array.
Returns
-------
bool
Whether the data is a MaskedArray and has any masked values.
"""
return np.ma.is_masked(data)
def to_masked(data, **kwargs):
"""
Return a masked version of the data.
Parameters
----------
data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
The reference object input.
**kwargs
keyword arguments forwarded to :any:`numpy.ma.array`
Returns
-------
pint.Quantity or numpy.ma.MaskedArray
New object with the same shape and type but as a masked array.
Units will be taken from the input if present.
"""
if is_masked_array(data) and not kwargs:
return data
if is_quantified(data):
return UNITS.Quantity(np.ma.array(data.magnitude, **kwargs), data.units)
return np.ma.array(data, **kwargs)
def to_compressed(xdata, order="C", mask=None):
"""
Return all the non-masked data as a 1-D array respecting the given array order.
Parameters
----------
data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
The reference object input.
order : str
order argument for :any:`numpy.ravel`
mask : :any:`Mask` value or valid boolean mask for :any:`MaskedArray`, optional
mask to use when data is not masked already
Returns
-------
:class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
New object with the flat shape and only unmasked data but and same type as input.
Units will be taken from the input if present.
See also
--------
:func:`numpy.ma.compressed`:
Numpy routine doing the same but only for C-order.
"""
is_masked = is_masked_array(xdata)
if is_masked or (mask is not None and mask_specified(mask)):
data = np.ravel(xdata.data if is_masked else xdata, order)
mask = xdata.mask if is_masked else mask
if mask is not np.ma.nomask:
data = data.compress(np.logical_not(np.ravel(mask, order)))
return quantify(data, xdata.units) if is_quantified(xdata) else data
return np.reshape(xdata, -1, order=order)
def from_compressed(xdata, shape, order="C", mask=None, **kwargs):
"""
Fill a (masked) array following a given mask or shape with the provided data.
This will only create a masked array if kwargs are given (especially a mask).
Otherwise this is simply reshaping the given data.
Filling is performed in the given array order.
Parameters
----------
data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
The reference object input.
shape : str
shape argument for :any:`numpy.reshape`
order : str
order argument for :any:`numpy.reshape`
mask : :any:`Mask` value or valid boolean mask for :any:`MaskedArray`
mask to use
**kwargs
keyword arguments forwarded to :any:`numpy.ma.array`
Returns
-------
:class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
New object with the desired shape and same type as input.
Units will be taken from the input if present.
Will only be a masked array if kwargs are given.
See also
--------
to_compressed:
Inverse operation.
:any:`numpy.ma.array`:
Routine consuming kwargs to create a masked array.
:any:`numpy.reshape`:
Equivalent routine if no mask is provided.
Notes
-----
If both `mask` and `shape` are given, they need to match in size.
"""
if mask is None or mask is np.ma.nomask or not mask_specified(mask):
if kwargs and mask is Mask.NONE:
msg = "from_compressed: Can't create masked array with mask=Mask.NONE"
raise FinamDataError(msg)
data = np.reshape(xdata, shape, order=order)
return to_masked(data, **kwargs) if kwargs or mask is np.ma.nomask else data
if is_quantified(xdata):
# pylint: disable-next=unexpected-keyword-arg
data = quantify(np.empty_like(xdata, shape=np.prod(shape)), xdata.units)
else:
# pylint: disable-next=unexpected-keyword-arg
data = np.empty_like(xdata, shape=np.prod(shape))
data[np.logical_not(np.ravel(mask, order=order))] = xdata
return to_masked(np.reshape(data, shape, order=order), mask=mask, **kwargs)
def check_data_covers_domain(data, mask=None):
"""
Check if the given data covers a domain defined by a mask on the same grid.
Parameters
----------
data : Any
The given data array for a single time-step.
mask : None or bool or array of bool, optional
Mask defining the target domain on the same grid as the data,
by default None
Returns
-------
bool
Whether the data covers the desired domain.
Raises
------
ValueError
When mask is given and mask and data don't share the same shape.
"""
if not _is_single_mask_value(mask) and np.shape(mask) != np.shape(data):
raise ValueError("check_data_covers_domain: mask and data shape differ.")
if not has_masked_values(data):
return True
if _is_single_mask_value(mask):
return bool(mask)
return np.all(mask[data.mask])
def _is_single_mask_value(mask):
return mask is None or mask is np.ma.nomask or mask is False or mask is True
def masks_compatible(this, incoming, incoming_donwstream):
"""
Check if an incoming mask is compatible with a given mask.
Parameters
----------
this : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None
mask specification to check against
incoming : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None
incoming mask to check for compatibility
incoming_donwstream : bool
Whether the incoming mask is from downstream data
Returns
-------
bool
mask compatibility
"""
if incoming_donwstream:
upstream, downstream = this, incoming
else:
upstream, downstream = incoming, this
# None is incompatible
if upstream is None:
return False
# Mask.FLEX accepts anything, Mask.NONE only Mask.NONE
if not mask_specified(downstream):
if not mask_specified(upstream):
return downstream == Mask.FLEX or upstream == Mask.NONE
return downstream == Mask.FLEX
# if mask is specified, upstream mask must also be specified
if not mask_specified(upstream):
return False
# if both mask given, compare them
return masks_equal(downstream, upstream)
def masks_equal(this, other):
"""
Check two masks for equality.
Parameters
----------
this : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None
first mask
incoming : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None
second mask
Returns
-------
bool
mask equality
"""
if this is None and other is None:
return True
if not mask_specified(this) and not mask_specified(other):
return this == other
# need a valid mask at this point
if not np.ma.is_mask(this) or not np.ma.is_mask(other):
return False
# special treatment of "nomask"
if this is np.ma.nomask:
if other is np.ma.nomask:
return True
return not np.any(other)
if other is np.ma.nomask:
return not np.any(this)
# compare masks
if not np.ndim(this) == np.ndim(other):
return False
if not np.all(np.shape(this) == np.shape(other)):
return False
return np.all(this == other)
def is_sub_mask(mask, submask):
"""
Check for a sub-mask.
Parameters
----------
mask : arraylike
The original mask.
submask : arraylike
The potential submask.
Returns
-------
bool
Whether 'submask' is a sub-mask of 'mask'.
"""
if not np.ma.is_mask(mask) or not np.ma.is_mask(submask):
return False
if mask is np.ma.nomask:
return True
if submask is np.ma.nomask:
return not np.any(mask)
if not np.ndim(mask) == np.ndim(submask):
return False
if not np.all(np.shape(mask) == np.shape(submask)):
return False
return np.all(submask[mask])
def mask_specified(mask):
"""
Determine whether given mask selection indicates a masked array.
Parameters
----------
mask : :any:`Mask` value or valid boolean mask for :any:`MaskedArray`
mask to check
Returns
-------
bool
False if mask is Mask.FLEX or Mask.NONE, True otherwise
"""
return not any(mask is val for val in list(Mask))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment