Source code for finam_vtk.tools

"""VTK helper classes and functions"""

from datetime import datetime

import finam as fm
import numpy as np
import pyvista as pv
from cftime import num2pydate
from finam.data.grid_tools import INV_VTK_TYPE_MAP, VTK_CELL_DIM, get_cells_matrix

ASSOCIATION = {
    pv.FieldAssociation.POINT: "point",
    pv.FieldAssociation.CELL: "cell",
    pv.FieldAssociation.NONE: "field",
}
"""dict: Field associations map."""


def _to_pure_datetime(dt):
    return datetime(
        year=dt.year,
        month=dt.month,
        day=dt.day,
        hour=dt.hour,
        minute=dt.minute,
        second=dt.second,
        microsecond=dt.microsecond,
        tzinfo=dt.tzinfo,
        fold=dt.fold,
    )


def get_time_units(reference_date, time_unit):
    """
    Generate CF conform time units.

    Parameters
    ----------
    reference_date : datetime.datetime
        Reference datetime to determine times.
    time_unit : str
        Unit of the timesteps (e.g. "seconds", "hours", "days", ...)

    Returns
    -------
    str
        CF time units.
    """
    return f"{time_unit} since {reference_date.isoformat(sep=' ')}"


def generate_times(time_values, reference_date, time_unit, calendar="standard"):
    """
    Generate datetime list from raw timesteps.

    Parameters
    ----------
    time_values : iterable
        list of timesteps.
    reference_date : datetime.datetime
        Reference datetime to determine times.
    time_unit : str
        Unit of the timesteps (e.g. "seconds", "hours", "days", ...)
    calendar : str, optional
        Describes the calendar used in the time calculations.
        All the values currently defined in the CF metadata convention are supported.
        Valid calendars **'standard', 'gregorian', 'proleptic_gregorian'
        'noleap', '365_day', '360_day', 'julian', 'all_leap', '366_day'**
        by default "standard"

    Returns
    -------
    list of datetime.datetime
        The generated time points.
    """
    units = get_time_units(reference_date, time_unit)
    times = num2pydate(time_values, units=units, calendar=calendar)
    return list(map(_to_pure_datetime, times))


def convert_data(data, masked):
    """
    Convert data to numpy array.

    Parameters
    ----------
    data : arraylike
        The input data.
    masked : bool
        Whether the data should be masked at invalid values.

    Returns
    -------
    numpy.ndarray
        The converted data.
    """
    return np.ma.masked_invalid(data) if masked else np.asarray(data)


def mesh_type(mesh):
    """
    Determine the type of a pyvista mesh object.

    Parameters
    ----------
    mesh : pyvista.DataSet
        Pyvista pointset or grid

    Returns
    -------
    str
        Class name of the vtk data set.
    """
    return mesh.__class__.__name__


def is_unstructured(mesh):
    """
    Whether a pyvista mesh is unstructured.

    Parameters
    ----------
    mesh : pyvista.DataSet
        Pyvista pointset or grid

    Returns
    -------
    bool
        Flag for unstructured meshes.
    """
    return mesh_type(mesh) not in ["RectilinearGrid", "ImageData", "PointSet"]


def prepare_unstructured_mesh(mesh, remove_low_dim_cells=True):
    """
    Prepare an unstructured pyvista mesh.

    Parameters
    ----------
    mesh : pyvista.DataSet
        Pyvista pointset or grid
    remove_low_dim_cells : bool, optional
        Whether to remove lower dimension cells (like vertices or edges),
        by default True

    Returns
    -------
    pyvista.DataSet
        The prepared mesh.
    """
    mesh = mesh.cast_to_unstructured_grid()
    if remove_low_dim_cells:
        unique_cell_types = np.unique(mesh.celltypes)
        cell_dims = VTK_CELL_DIM[unique_cell_types]
        max_dim = max(cell_dims)
        selection = unique_cell_types[cell_dims == max_dim]
        if len(selection) < len(unique_cell_types):
            mesh = mesh.extract_cells(np.nonzero(np.isin(mesh.celltypes, selection))[0])
    return mesh


def grid_from_pyvista(
    mesh,
    return_mesh=False,
    remove_low_dim_cells=True,
    minimal_spatial_dim=True,
    **kwargs,
):
    """
    Generate finam grid from a pyvista object.

    Parameters
    ----------
    mesh : pyvista.DataSet
        Pyvista pointset or grid
    return_mesh : bool, optional
        Whether to return the mesh again. Useful if remove_low_dim_cells is True,
        or mesh was cast to unstructured, by default False
    remove_low_dim_cells : bool, optional
        Whether to remove lower dimension cells (like vertices or edges),
        by default True
    minimal_spatial_dim : bool, optional
        Force grid to have minimal dimension (e.g. 2D grids are defined as 3D in vtk),
        by default True
    **kwargs
        Keyword arguments forwarded to the finam Grid class used.

    Returns
    -------
    finam.Grid
        The resulting finam grid.

    Raises
    ------
    ValueError
        When the grid has unsupported cell types.
    ValueError
        When the mesh type is unsupported by finam.
    """
    mtype = mesh_type(mesh)
    if is_unstructured(mesh):
        mesh = prepare_unstructured_mesh(mesh, remove_low_dim_cells)
        cell_types = INV_VTK_TYPE_MAP[mesh.celltypes]
        cells = get_cells_matrix(cell_types, mesh.cells)
        if np.any((t_mask := cell_types == -1)):
            t_err = np.unique(mesh.celltypes(t_mask))
            msg = f"finam-VTK: mesh holds unsupported cell types ({t_err})"
            raise ValueError(msg)
        points = np.array(mesh.points, copy=True)
        p_count, p_dim = points.shape
        if minimal_spatial_dim and p_count > 1:
            while p_dim > 1 and np.all(np.isclose(points[:, p_dim - 1], 0)):
                p_dim -= 1
        grid = fm.UnstructuredGrid(
            points=points[:, :p_dim], cells=cells, cell_types=cell_types, **kwargs
        )
    elif mtype == "ImageData":
        grid = fm.UniformGrid(
            dims=mesh.dimensions, spacing=mesh.spacing, origin=mesh.origin, **kwargs
        )
    elif mtype == "RectilinearGrid":
        grid = fm.RectilinearGrid(axes=[mesh.x, mesh.y, mesh.z], **kwargs)
    elif mtype == "PointSet":
        grid = fm.UnstructuredPoints(points=mesh.points, **kwargs)
    else:
        msg = f"finam-VTK: Unknown mesh type: {mtype}"
        raise ValueError(msg)
    if return_mesh:
        return grid, mesh
    return grid


[docs] def read_vtk_grid(path, remove_low_dim_cells=True, minimal_spatial_dim=True, **kwargs): """ Read a finam grid from a VTK file. Parameters ---------- path : pathlike Path to the vtk file. remove_low_dim_cells : bool, optional Whether to remove lower dimension cells (like vertices or edges), by default True minimal_spatial_dim : bool, optional Force grid to have minimal dimension (e.g. 2D grids are defined as 3D in vtk), by default True **kwargs Keyword arguments forwarded to the finam Grid class used. Returns ------- finam.Grid The resulting finam grid. """ return grid_from_pyvista( mesh=pv.read(path), remove_low_dim_cells=remove_low_dim_cells, minimal_spatial_dim=minimal_spatial_dim, **kwargs, )
[docs] class DataArray: """ Specifications for a VTK data array. Parameters ---------- name : str Data array name in the VTK file. association : str, optionl Indicate how data is associated ("point", "cell", or "field"). Can be None to be determined. **info_kwargs Optional keyword arguments to instantiate an Info object (i.e. 'grid' and 'meta') Used to overwrite meta data, to change units or to provide a desired grid specification. """ def __init__(self, name, association=None, **info_kwargs): self.name = name self.association = association self.info_kwargs = info_kwargs
[docs] def get_meta(self): """Get the meta-data dictionary of this data array.""" meta = self.info_kwargs.get("meta", {}) meta.update( { k: v for k, v in self.info_kwargs.items() if k not in ["time", "grid", "meta"] } ) return meta
def __repr__(self): name, association = self.name, self.association return f"DataArray({name=}, {association=}, **{self.info_kwargs})"
def create_data_array_list(data_arrays): """ Create a list of DataArray instances. Parameters ---------- data_arrays : list of str or DataArray List containing DataArray instances or names. Returns ------- list of DataArray List containing only DataArray instances. """ return [ arr if isinstance(arr, DataArray) else DataArray(arr) for arr in data_arrays ] def extract_data_arrays(mesh, data_arrays=None): """ Extract the data_array information from a pyvista mesh. Parameters ---------- mesh : pyvista.DataSet Pyvista mesh. data_arrays : list of DataArray or str, optional List of desired data_arrays given by name or a :class:`DataArray` instance. By default, all data_arrays present in the mesh. Returns ------- data_arrays : list of DataArray Data array information. """ if data_arrays is None: data_arrays = [] for name in mesh.field_data: data_arrays.append(DataArray(name=name, association="field")) for name in mesh.point_data: data_arrays.append(DataArray(name=name, association="point")) for name in mesh.cell_data: data_arrays.append(DataArray(name=name, association="cell")) else: data_arrays = create_data_array_list(data_arrays) for var in data_arrays: association = ASSOCIATION[mesh.get_array_association(var.name)] if var.association is None: var.association = association elif var.association != association: msg = ( f"{var.name}: data is associated with '{association}', " f"but '{var.association}' was given." ) raise ValueError(msg) return data_arrays def needs_masking(data): """ Whether data has non-finite values (infs, nans). Parameters ---------- data : arraylike Data to check Returns ------- bool Finity status. """ return np.any(~(np.isfinite(data)))