"""VTK helper classes and functions"""
from datetime import datetime
import finam as fm
import numpy as np
import pyvista as pv
from cftime import num2pydate
from finam.data.grid_tools import INV_VTK_TYPE_MAP, VTK_CELL_DIM, get_cells_matrix
ASSOCIATION = {
pv.FieldAssociation.POINT: "point",
pv.FieldAssociation.CELL: "cell",
pv.FieldAssociation.NONE: "field",
}
"""dict: Field associations map."""
def _to_pure_datetime(dt):
return datetime(
year=dt.year,
month=dt.month,
day=dt.day,
hour=dt.hour,
minute=dt.minute,
second=dt.second,
microsecond=dt.microsecond,
tzinfo=dt.tzinfo,
fold=dt.fold,
)
def get_time_units(reference_date, time_unit):
"""
Generate CF conform time units.
Parameters
----------
reference_date : datetime.datetime
Reference datetime to determine times.
time_unit : str
Unit of the timesteps (e.g. "seconds", "hours", "days", ...)
Returns
-------
str
CF time units.
"""
return f"{time_unit} since {reference_date.isoformat(sep=' ')}"
def generate_times(time_values, reference_date, time_unit, calendar="standard"):
"""
Generate datetime list from raw timesteps.
Parameters
----------
time_values : iterable
list of timesteps.
reference_date : datetime.datetime
Reference datetime to determine times.
time_unit : str
Unit of the timesteps (e.g. "seconds", "hours", "days", ...)
calendar : str, optional
Describes the calendar used in the time calculations.
All the values currently defined in the CF metadata convention are supported.
Valid calendars **'standard', 'gregorian', 'proleptic_gregorian'
'noleap', '365_day', '360_day', 'julian', 'all_leap', '366_day'**
by default "standard"
Returns
-------
list of datetime.datetime
The generated time points.
"""
units = get_time_units(reference_date, time_unit)
times = num2pydate(time_values, units=units, calendar=calendar)
return list(map(_to_pure_datetime, times))
def convert_data(data, masked):
"""
Convert data to numpy array.
Parameters
----------
data : arraylike
The input data.
masked : bool
Whether the data should be masked at invalid values.
Returns
-------
numpy.ndarray
The converted data.
"""
return np.ma.masked_invalid(data) if masked else np.asarray(data)
def mesh_type(mesh):
"""
Determine the type of a pyvista mesh object.
Parameters
----------
mesh : pyvista.DataSet
Pyvista pointset or grid
Returns
-------
str
Class name of the vtk data set.
"""
return mesh.__class__.__name__
def is_unstructured(mesh):
"""
Whether a pyvista mesh is unstructured.
Parameters
----------
mesh : pyvista.DataSet
Pyvista pointset or grid
Returns
-------
bool
Flag for unstructured meshes.
"""
return mesh_type(mesh) not in ["RectilinearGrid", "ImageData", "PointSet"]
def prepare_unstructured_mesh(mesh, remove_low_dim_cells=True):
"""
Prepare an unstructured pyvista mesh.
Parameters
----------
mesh : pyvista.DataSet
Pyvista pointset or grid
remove_low_dim_cells : bool, optional
Whether to remove lower dimension cells (like vertices or edges),
by default True
Returns
-------
pyvista.DataSet
The prepared mesh.
"""
mesh = mesh.cast_to_unstructured_grid()
if remove_low_dim_cells:
unique_cell_types = np.unique(mesh.celltypes)
cell_dims = VTK_CELL_DIM[unique_cell_types]
max_dim = max(cell_dims)
selection = unique_cell_types[cell_dims == max_dim]
if len(selection) < len(unique_cell_types):
mesh = mesh.extract_cells(np.nonzero(np.isin(mesh.celltypes, selection))[0])
return mesh
def grid_from_pyvista(
mesh,
return_mesh=False,
remove_low_dim_cells=True,
minimal_spatial_dim=True,
**kwargs,
):
"""
Generate finam grid from a pyvista object.
Parameters
----------
mesh : pyvista.DataSet
Pyvista pointset or grid
return_mesh : bool, optional
Whether to return the mesh again. Useful if remove_low_dim_cells is True,
or mesh was cast to unstructured, by default False
remove_low_dim_cells : bool, optional
Whether to remove lower dimension cells (like vertices or edges),
by default True
minimal_spatial_dim : bool, optional
Force grid to have minimal dimension (e.g. 2D grids are defined as 3D in vtk),
by default True
**kwargs
Keyword arguments forwarded to the finam Grid class used.
Returns
-------
finam.Grid
The resulting finam grid.
Raises
------
ValueError
When the grid has unsupported cell types.
ValueError
When the mesh type is unsupported by finam.
"""
mtype = mesh_type(mesh)
if is_unstructured(mesh):
mesh = prepare_unstructured_mesh(mesh, remove_low_dim_cells)
cell_types = INV_VTK_TYPE_MAP[mesh.celltypes]
cells = get_cells_matrix(cell_types, mesh.cells)
if np.any((t_mask := cell_types == -1)):
t_err = np.unique(mesh.celltypes(t_mask))
msg = f"finam-VTK: mesh holds unsupported cell types ({t_err})"
raise ValueError(msg)
points = np.array(mesh.points, copy=True)
p_count, p_dim = points.shape
if minimal_spatial_dim and p_count > 1:
while p_dim > 1 and np.all(np.isclose(points[:, p_dim - 1], 0)):
p_dim -= 1
grid = fm.UnstructuredGrid(
points=points[:, :p_dim], cells=cells, cell_types=cell_types, **kwargs
)
elif mtype == "ImageData":
grid = fm.UniformGrid(
dims=mesh.dimensions, spacing=mesh.spacing, origin=mesh.origin, **kwargs
)
elif mtype == "RectilinearGrid":
grid = fm.RectilinearGrid(axes=[mesh.x, mesh.y, mesh.z], **kwargs)
elif mtype == "PointSet":
grid = fm.UnstructuredPoints(points=mesh.points, **kwargs)
else:
msg = f"finam-VTK: Unknown mesh type: {mtype}"
raise ValueError(msg)
if return_mesh:
return grid, mesh
return grid
[docs]
def read_vtk_grid(path, remove_low_dim_cells=True, minimal_spatial_dim=True, **kwargs):
"""
Read a finam grid from a VTK file.
Parameters
----------
path : pathlike
Path to the vtk file.
remove_low_dim_cells : bool, optional
Whether to remove lower dimension cells (like vertices or edges),
by default True
minimal_spatial_dim : bool, optional
Force grid to have minimal dimension (e.g. 2D grids are defined as 3D in vtk),
by default True
**kwargs
Keyword arguments forwarded to the finam Grid class used.
Returns
-------
finam.Grid
The resulting finam grid.
"""
return grid_from_pyvista(
mesh=pv.read(path),
remove_low_dim_cells=remove_low_dim_cells,
minimal_spatial_dim=minimal_spatial_dim,
**kwargs,
)
[docs]
class DataArray:
"""
Specifications for a VTK data array.
Parameters
----------
name : str
Data array name in the VTK file.
association : str, optionl
Indicate how data is associated ("point", "cell", or "field").
Can be None to be determined.
**info_kwargs
Optional keyword arguments to instantiate an Info object (i.e. 'grid' and 'meta')
Used to overwrite meta data, to change units or to provide a desired grid specification.
"""
def __init__(self, name, association=None, **info_kwargs):
self.name = name
self.association = association
self.info_kwargs = info_kwargs
def __repr__(self):
name, association = self.name, self.association
return f"DataArray({name=}, {association=}, **{self.info_kwargs})"
def create_data_array_list(data_arrays):
"""
Create a list of DataArray instances.
Parameters
----------
data_arrays : list of str or DataArray
List containing DataArray instances or names.
Returns
-------
list of DataArray
List containing only DataArray instances.
"""
return [
arr if isinstance(arr, DataArray) else DataArray(arr) for arr in data_arrays
]
def extract_data_arrays(mesh, data_arrays=None):
"""
Extract the data_array information from a pyvista mesh.
Parameters
----------
mesh : pyvista.DataSet
Pyvista mesh.
data_arrays : list of DataArray or str, optional
List of desired data_arrays given by name or a :class:`DataArray` instance.
By default, all data_arrays present in the mesh.
Returns
-------
data_arrays : list of DataArray
Data array information.
"""
if data_arrays is None:
data_arrays = []
for name in mesh.field_data:
data_arrays.append(DataArray(name=name, association="field"))
for name in mesh.point_data:
data_arrays.append(DataArray(name=name, association="point"))
for name in mesh.cell_data:
data_arrays.append(DataArray(name=name, association="cell"))
else:
data_arrays = create_data_array_list(data_arrays)
for var in data_arrays:
association = ASSOCIATION[mesh.get_array_association(var.name)]
if var.association is None:
var.association = association
elif var.association != association:
msg = (
f"{var.name}: data is associated with '{association}', "
f"but '{var.association}' was given."
)
raise ValueError(msg)
return data_arrays
def needs_masking(data):
"""
Whether data has non-finite values (infs, nans).
Parameters
----------
data : arraylike
Data to check
Returns
-------
bool
Finity status.
"""
return np.any(~(np.isfinite(data)))