Source code for finam_vtk.reader

"""
VTK reader components.
"""

import finam as fm
import pyvista as pv

from .tools import (
    convert_data,
    extract_data_arrays,
    generate_times,
    grid_from_pyvista,
    mesh_type,
    needs_masking,
    prepare_unstructured_mesh,
)


[docs] class VTKStaticReader(fm.Component): """ VTK reader component that reads a single 2D data array per output at startup. Parameters ---------- path : str Path to the VTK file to read. outputs : list of str or DataArray, optional Output arrays to provide. By default all arrays found in the vtk file. grid_arguments : dict, optional Arguments forwarded to :py:func:`grid_from_pyvista`, by default None """ def __init__(self, path, outputs=None, grid_arguments=None): super().__init__() self.path = path self.grid_arguments = grid_arguments or {} self.data_arrays = outputs self.mesh = None self.mesh_type = None self.point_grid = None self.cell_grid = None self._infos = None self._data = None self.status = fm.ComponentStatus.CREATED def _initialize(self): self.mesh = pv.read(self.path) if isinstance(self.mesh, pv.MultiBlock): msg = f"VTKStaticReader: found multi block data: {self.path}" raise ValueError(msg) # update mesh as well in case cells are dropped or mesh was cast self.point_grid, self.mesh = grid_from_pyvista( self.mesh, return_mesh=True, **self.grid_arguments ) self.point_grid.data_location = fm.Location.POINTS self.mesh_type = mesh_type(self.mesh) if self.mesh_type != "PointSet": # shallow copy to be memory efficient self.cell_grid = self.point_grid.copy() self.cell_grid.data_location = fm.Location.CELLS self.data_arrays = extract_data_arrays(self.mesh, self.data_arrays) for var in self.data_arrays: self.outputs.add(name=var.name, static=True) self.create_connector() def _connect(self, start_time): if self._infos is None: self._data = {} self._infos = {} for var in self.data_arrays: data = self.mesh[var.name] self._data[var.name] = convert_data(data, needs_masking(data)) if var.association == "field": grid = fm.NoGrid(dim=self._data[var.name].ndim) elif var.association == "point": grid = self.point_grid else: grid = self.cell_grid meta = var.get_meta() self._infos[var.name] = fm.Info(time=None, grid=grid, meta=meta) self.try_connect(start_time, push_infos=self._infos, push_data=self._data) if self.status == fm.ComponentStatus.CONNECTED: # cleanup self._data = None self._infos = None self.mesh = None def _validate(self): pass def _update(self): pass def _finalize(self): pass
[docs] class PVDReader(fm.TimeComponent): """ VTK reader component that steps along a date/time coordinate dimension of a dataset. Parameters ---------- path : str Path to the PVD file to read. reference_date : datetime.datetime Reference datetime to determine times. time_unit : string Unit of the timesteps (e.g. "seconds", "hours", "days", ...) calendar : str, optional Describes the calendar used in the time calculations. All the values currently defined in the CF metadata convention are supported. Valid calendars **'standard', 'gregorian', 'proleptic_gregorian' 'noleap', '365_day', '360_day', 'julian', 'all_leap', '366_day'** by default "standard" outputs : list of str or DataArray, optional Output arrays to provide. By default all arrays found in the vtk file(s). grid_arguments : dict, optional Arguments forwarded to :py:func:`grid_from_pyvista`, by default None """ def __init__( self, path, reference_date, time_unit, calendar="standard", outputs=None, grid_arguments=None, ): super().__init__() self.path = path self.reference_date = reference_date self.time_unit = time_unit self.calendar = calendar self.data_arrays = outputs self.grid_arguments = grid_arguments or {} self.remove_low_dim_cells = self.grid_arguments.get( "remove_low_dim_cells", True ) self.reader = None self.mesh = None self.mesh_type = None self.is_unstructured = None self.point_grid = None self.cell_grid = None self.needs_masking = {} self._init_data = {} self.output_infos = {} self.data_pushed = False self.times = None self.step = 0 self._status = fm.ComponentStatus.CREATED @property def next_time(self): return ( self.times[self.step + 1] if self.times is not None and len(self.times) > self.step + 1 else None ) def _initialize(self): self.reader = pv.PVDReader(self.path) self.times = generate_times( self.reader.time_values, self.reference_date, self.time_unit, self.calendar ) self._time = self.times[0] # first mesh with initial values self.reader.set_active_time_point(0) self.mesh = self.reader.read() if self.mesh.n_blocks > 1: msg = f"PVDReader: only one file per time-step supported: {self.path}" raise ValueError(msg) self.mesh = self.mesh[0] # single block # update mesh as well in case cells are dropped or mesh was cast self.point_grid, self.mesh = grid_from_pyvista( self.mesh, return_mesh=True, **self.grid_arguments ) self.point_grid.data_location = fm.Location.POINTS self.mesh_type = mesh_type(self.mesh) self.is_unstructured = self.mesh_type == "UnstructuredGrid" if self.mesh_type != "PointSet": # shallow copy to be memory efficient self.cell_grid = self.point_grid.copy() self.cell_grid.data_location = fm.Location.CELLS self.data_arrays = extract_data_arrays(self.mesh, self.data_arrays) for var in self.data_arrays: self.outputs.add(name=var.name) self._process_initial_data() self.create_connector() def _connect(self, start_time): if self.data_pushed: self.try_connect(start_time) else: self.data_pushed = True self.try_connect( start_time, push_data=self._init_data, push_infos=self.output_infos, ) if self.status == fm.ComponentStatus.CONNECTED: del self._init_data def _process_initial_data(self): for var in self.data_arrays: data = self.mesh[var.name] # store masking info once self.needs_masking[var.name] = needs_masking(data) self._init_data[var.name] = convert_data(data, self.needs_masking[var.name]) if var.association == "field": grid = fm.NoGrid(dim=self._init_data[var.name].ndim) elif var.association == "point": grid = self.point_grid else: grid = self.cell_grid meta = var.get_meta() self.output_infos[var.name] = fm.Info(time=self._time, grid=grid, meta=meta) def _validate(self): pass def _update(self): self._time = self.next_time self.step += 1 self.reader.set_active_time_point(self.step) self.mesh = self.reader.read() if self.mesh.n_blocks > 1: msg = f"PVDReader: only one file per time-step supported: {self.path}" raise ValueError(msg) # single block if self.is_unstructured: self.mesh = prepare_unstructured_mesh( self.mesh[0], self.remove_low_dim_cells ) else: self.mesh = self.mesh[0] for var in self.data_arrays: self._outputs[var.name].push_data( convert_data(self.mesh[var.name], self.needs_masking[var.name]), self._time, ) def _finalize(self): self.reader = None self.mesh = None