"""
VTK reader components.
"""
import finam as fm
import pyvista as pv
from .tools import (
convert_data,
extract_data_arrays,
generate_times,
grid_from_pyvista,
mesh_type,
needs_masking,
prepare_unstructured_mesh,
)
[docs]
class VTKStaticReader(fm.Component):
"""
VTK reader component that reads a single 2D data array per output at startup.
Parameters
----------
path : str
Path to the VTK file to read.
outputs : list of str or DataArray, optional
Output arrays to provide. By default all arrays found in the vtk file.
grid_arguments : dict, optional
Arguments forwarded to :py:func:`grid_from_pyvista`, by default None
"""
def __init__(self, path, outputs=None, grid_arguments=None):
super().__init__()
self.path = path
self.grid_arguments = grid_arguments or {}
self.data_arrays = outputs
self.mesh = None
self.mesh_type = None
self.point_grid = None
self.cell_grid = None
self._infos = None
self._data = None
self.status = fm.ComponentStatus.CREATED
def _initialize(self):
self.mesh = pv.read(self.path)
if isinstance(self.mesh, pv.MultiBlock):
msg = f"VTKStaticReader: found multi block data: {self.path}"
raise ValueError(msg)
# update mesh as well in case cells are dropped or mesh was cast
self.point_grid, self.mesh = grid_from_pyvista(
self.mesh, return_mesh=True, **self.grid_arguments
)
self.point_grid.data_location = fm.Location.POINTS
self.mesh_type = mesh_type(self.mesh)
if self.mesh_type != "PointSet":
# shallow copy to be memory efficient
self.cell_grid = self.point_grid.copy()
self.cell_grid.data_location = fm.Location.CELLS
self.data_arrays = extract_data_arrays(self.mesh, self.data_arrays)
for var in self.data_arrays:
self.outputs.add(name=var.name, static=True)
self.create_connector()
def _connect(self, start_time):
if self._infos is None:
self._data = {}
self._infos = {}
for var in self.data_arrays:
data = self.mesh[var.name]
self._data[var.name] = convert_data(data, needs_masking(data))
if var.association == "field":
grid = fm.NoGrid(dim=self._data[var.name].ndim)
elif var.association == "point":
grid = self.point_grid
else:
grid = self.cell_grid
meta = var.get_meta()
self._infos[var.name] = fm.Info(time=None, grid=grid, meta=meta)
self.try_connect(start_time, push_infos=self._infos, push_data=self._data)
if self.status == fm.ComponentStatus.CONNECTED:
# cleanup
self._data = None
self._infos = None
self.mesh = None
def _validate(self):
pass
def _update(self):
pass
def _finalize(self):
pass
[docs]
class PVDReader(fm.TimeComponent):
"""
VTK reader component that steps along a date/time coordinate dimension of a dataset.
Parameters
----------
path : str
Path to the PVD file to read.
reference_date : datetime.datetime
Reference datetime to determine times.
time_unit : string
Unit of the timesteps (e.g. "seconds", "hours", "days", ...)
calendar : str, optional
Describes the calendar used in the time calculations.
All the values currently defined in the CF metadata convention are supported.
Valid calendars **'standard', 'gregorian', 'proleptic_gregorian'
'noleap', '365_day', '360_day', 'julian', 'all_leap', '366_day'**
by default "standard"
outputs : list of str or DataArray, optional
Output arrays to provide. By default all arrays found in the vtk file(s).
grid_arguments : dict, optional
Arguments forwarded to :py:func:`grid_from_pyvista`, by default None
"""
def __init__(
self,
path,
reference_date,
time_unit,
calendar="standard",
outputs=None,
grid_arguments=None,
):
super().__init__()
self.path = path
self.reference_date = reference_date
self.time_unit = time_unit
self.calendar = calendar
self.data_arrays = outputs
self.grid_arguments = grid_arguments or {}
self.remove_low_dim_cells = self.grid_arguments.get(
"remove_low_dim_cells", True
)
self.reader = None
self.mesh = None
self.mesh_type = None
self.is_unstructured = None
self.point_grid = None
self.cell_grid = None
self.needs_masking = {}
self._init_data = {}
self.output_infos = {}
self.data_pushed = False
self.times = None
self.step = 0
self._status = fm.ComponentStatus.CREATED
@property
def next_time(self):
return (
self.times[self.step + 1]
if self.times is not None and len(self.times) > self.step + 1
else None
)
def _initialize(self):
self.reader = pv.PVDReader(self.path)
self.times = generate_times(
self.reader.time_values, self.reference_date, self.time_unit, self.calendar
)
self._time = self.times[0]
# first mesh with initial values
self.reader.set_active_time_point(0)
self.mesh = self.reader.read()
if self.mesh.n_blocks > 1:
msg = f"PVDReader: only one file per time-step supported: {self.path}"
raise ValueError(msg)
self.mesh = self.mesh[0] # single block
# update mesh as well in case cells are dropped or mesh was cast
self.point_grid, self.mesh = grid_from_pyvista(
self.mesh, return_mesh=True, **self.grid_arguments
)
self.point_grid.data_location = fm.Location.POINTS
self.mesh_type = mesh_type(self.mesh)
self.is_unstructured = self.mesh_type == "UnstructuredGrid"
if self.mesh_type != "PointSet":
# shallow copy to be memory efficient
self.cell_grid = self.point_grid.copy()
self.cell_grid.data_location = fm.Location.CELLS
self.data_arrays = extract_data_arrays(self.mesh, self.data_arrays)
for var in self.data_arrays:
self.outputs.add(name=var.name)
self._process_initial_data()
self.create_connector()
def _connect(self, start_time):
if self.data_pushed:
self.try_connect(start_time)
else:
self.data_pushed = True
self.try_connect(
start_time,
push_data=self._init_data,
push_infos=self.output_infos,
)
if self.status == fm.ComponentStatus.CONNECTED:
del self._init_data
def _process_initial_data(self):
for var in self.data_arrays:
data = self.mesh[var.name]
# store masking info once
self.needs_masking[var.name] = needs_masking(data)
self._init_data[var.name] = convert_data(data, self.needs_masking[var.name])
if var.association == "field":
grid = fm.NoGrid(dim=self._init_data[var.name].ndim)
elif var.association == "point":
grid = self.point_grid
else:
grid = self.cell_grid
meta = var.get_meta()
self.output_infos[var.name] = fm.Info(time=self._time, grid=grid, meta=meta)
def _validate(self):
pass
def _update(self):
self._time = self.next_time
self.step += 1
self.reader.set_active_time_point(self.step)
self.mesh = self.reader.read()
if self.mesh.n_blocks > 1:
msg = f"PVDReader: only one file per time-step supported: {self.path}"
raise ValueError(msg)
# single block
if self.is_unstructured:
self.mesh = prepare_unstructured_mesh(
self.mesh[0], self.remove_low_dim_cells
)
else:
self.mesh = self.mesh[0]
for var in self.data_arrays:
self._outputs[var.name].push_data(
convert_data(self.mesh[var.name], self.needs_masking[var.name]),
self._time,
)
def _finalize(self):
self.reader = None
self.mesh = None