Source code for finam_vtk.writer

"""
VTK writer components.
"""

from datetime import datetime, timedelta
from functools import partial
from pathlib import Path

import finam as fm

from .tools import (
    TIME_DELTAS,
    _create_pv_mesh,
    _get_reference_grid,
    _prepare_writer_arrays,
    _set_mesh_data,
    _timestep_path,
    create_data_array_list,
    get_time_unit,
    save_dict_to_json,
    write_pvd_file,
)


[docs] class VTKStaticWriter(fm.Component): """ VTK writer component for static inputs. Parameters ---------- path : pathlike Path to the VTK file to be written. Suffix will be set according to input grid type unless ``legacy=True``. inputs : list of str or DataArray. List of inputs. Input is either defined by name or a :class:`DataArray` instance. legacy : bool, optional Whether to use the legacy vtk file format. By default: False binary : bool, optional Whether to use the binary file format. By default: True """ def __init__(self, path, inputs, legacy=False, binary=True): super().__init__() self.path = Path(path) self.data_arrays = create_data_array_list(inputs) if not self.data_arrays: raise ValueError("VTKStaticWriter: no inputs specified.") self.pv_mesh = None self.is_structured = None self.file_ext = None self.legacy = legacy self.binary = binary self.status = fm.ComponentStatus.CREATED def _initialize(self): for var in self.data_arrays: var.info_kwargs.setdefault("units", None) self.inputs.add( name=var.name, time=None, static=True, grid=var.info_kwargs.get("grid", None), **var.get_meta(), ) self.create_connector(pull_data=[var.name for var in self.data_arrays]) def _connect(self, start_time): self.try_connect(start_time=start_time) if self.status != fm.ComponentStatus.CONNECTED: return ref_grid = _get_reference_grid(self.connector.in_infos, self.data_arrays) self.pv_mesh, self.is_structured, self.file_ext = _create_pv_mesh( ref_grid, self.legacy ) _prepare_writer_arrays( self.data_arrays, self.connector.in_infos, ref_grid, "VTKStaticWriter", ) self._write_data(self.connector.in_data) def _write_data(self, data): _set_mesh_data(self.pv_mesh, self.data_arrays, data, self.is_structured) self.path = self.path.with_suffix(self.file_ext) self.pv_mesh.save(self.path, binary=self.binary) def _validate(self): pass def _update(self): pass def _finalize(self): self.pv_mesh = None
[docs] class PVDTimedWriter(fm.TimeComponent): """ PVD writer component that writes in predefined time intervals. Parameters ---------- path : pathlike Path to the PVD file to be written. Will have a forced .pvd suffix. inputs : list of str or DataArray. List of inputs. Input is either defined by name or a :class:`DataArray` instance. step : datetime.timedelta Time step file_prefix : str or None, optional file name prefix for the separate VTK files. Will get an added number for each time-step. By default: None - the file stem of the PVD file is used. file_digits : int, optional Number of digits for the added number in the file name for each time-step. By default: 4 legacy : bool, optional Whether to use the legacy vtk file format. By default: False binary : bool, optional Whether to use the binary file format. By default: True aux_file : pathlike or None, optional file name for a json file with auxiliary information about reference date and time unit. By default: None - the file stem of the PVD file is used with .json extension. write_aux_file : bool, optional Whether to write the aux file. By default: True. add_time_units_attribute : bool, optional Whether to add the time units as attribute to the collection in the PVD file. By default: True. """ def __init__( self, path, inputs, step, time_unit=None, file_prefix=None, file_digits=4, legacy=False, binary=True, aux_file=None, write_aux_file=True, add_time_units_attribute=True, ): super().__init__() if step is not None and not isinstance(step, timedelta): raise ValueError("Step must be None or of type timedelta") self.path = Path(path).with_suffix(".pvd") self.aux_file = Path(aux_file or self.path.with_suffix(".json")) self.file_path = Path(file_prefix or self.path.with_suffix("")) self.file_name = self.file_path.stem self.file_digits = file_digits self.data_arrays = create_data_array_list(inputs) if not self.data_arrays: raise ValueError("PVDTimedWriter: no inputs specified.") self._step = step self.time_unit = get_time_unit(self._step, time_unit) self.pv_mesh = None self.ref_grid = None self.ref_date = None self.is_structured = None self.file_ext = None self.legacy = legacy self.binary = binary self.step_counter = 0 self.vtk_files = [] self.time_steps = [] self.write_aux_file = write_aux_file self.add_time_units_attribute = add_time_units_attribute self.status = fm.ComponentStatus.CREATED def _next_time(self): return self.time + self._step def _initialize(self): for var in self.data_arrays: var.info_kwargs.setdefault("units", None) self.inputs.add( name=var.name, time=self.time, grid=var.info_kwargs.get("grid", None), **var.get_meta(), ) self.create_connector(pull_data=[var.name for var in self.data_arrays]) def _connect(self, start_time): self.try_connect(start_time=start_time) if self.status != fm.ComponentStatus.CONNECTED: return self.ref_date = start_time self._time = start_time ref_grid = _get_reference_grid(self.connector.in_infos, self.data_arrays) self.pv_mesh, self.is_structured, self.file_ext = _create_pv_mesh( ref_grid, self.legacy ) _prepare_writer_arrays( self.data_arrays, self.connector.in_infos, ref_grid, "PVDTimedWriter", ) self._write_timestep(self.connector.in_data) def _write_timestep(self, data): name = _timestep_path( self.file_path, self.file_name, self.step_counter, self.file_digits, self.file_ext, ) self.vtk_files.append(name) self.time_steps.append( (self.time - self.ref_date) / TIME_DELTAS[self.time_unit] ) _set_mesh_data(self.pv_mesh, self.data_arrays, data, self.is_structured) self.pv_mesh.save(name, binary=self.binary) def _validate(self): pass def _update(self): self._time = self.next_time self.step_counter += 1 data = {i.name: self[i.name].pull_data(self.time) for i in self.data_arrays} self._write_timestep(data) def _finalize(self): # write aux json file if self.write_aux_file: save_dict_to_json( self.aux_file, time_unit=self.time_unit, reference_date=self.ref_date ) # write pvd file if self.add_time_units_attribute: units = f"{self.time_unit} since {self.ref_date.isoformat(sep=' ', timespec='seconds')}" else: units = None write_pvd_file(self.path, self.vtk_files, self.time_steps, units)
[docs] class PVDPushWriter(fm.Component): """ PVD writer component that writes on push to its inputs. Note that all connected data sources must push data for the same timestamps. Parameters ---------- path : pathlike Path to the PVD file to be written. Will have a forced .pvd suffix. inputs : list of str or DataArray. List of inputs. Input is either defined by name or a :class:`DataArray` instance. time_unit : str, optional Unit of the PVD timesteps. Supported values are ``"days"``, ``"hours"``, ``"minutes"``, and ``"seconds"``. By default: ``"seconds"``. file_prefix : str or None, optional File name prefix for the separate VTK files. Will get an added number for each time-step. By default: None - the file stem of the PVD file is used. file_digits : int, optional Number of digits for the added number in the file name for each time-step. By default: 4 legacy : bool, optional Whether to use the legacy vtk file format. By default: False binary : bool, optional Whether to use the binary file format. By default: True aux_file : pathlike or None, optional File name for a json file with auxiliary information about reference date and time unit. By default: None - the file stem of the PVD file is used with .json extension. write_aux_file : bool, optional Whether to write the aux file. By default: True. add_time_units_attribute : bool, optional Whether to add the time units as attribute to the collection in the PVD file. By default: True. """ def __init__( self, path, inputs, time_unit="seconds", file_prefix=None, file_digits=4, legacy=False, binary=True, aux_file=None, write_aux_file=True, add_time_units_attribute=True, ): super().__init__() self.path = Path(path).with_suffix(".pvd") self.aux_file = Path(aux_file or self.path.with_suffix(".json")) self.file_path = Path(file_prefix or self.path.with_suffix("")) self.file_name = self.file_path.stem self.file_digits = file_digits self.data_arrays = create_data_array_list(inputs) if not self.data_arrays: raise ValueError("PVDPushWriter: no inputs specified.") if time_unit not in TIME_DELTAS: raise ValueError(f"PVDPushWriter: time unit '{time_unit}' not supported.") self.time_unit = time_unit self.pv_mesh = None self.ref_date = None self.is_structured = None self.file_ext = None self.legacy = legacy self.binary = binary self.step_counter = 0 self.vtk_files = [] self.time_steps = [] self.write_aux_file = write_aux_file self.add_time_units_attribute = add_time_units_attribute self.last_update = None self.all_inputs = {var.name for var in self.data_arrays} self.pushed_inputs = set() self.status = fm.ComponentStatus.CREATED def _initialize(self): for var in self.data_arrays: var.info_kwargs.setdefault("units", None) self.inputs.add( io=fm.CallbackInput( name=var.name, callback=partial(self._data_changed, var.name), time=None, grid=var.info_kwargs.get("grid", None), **var.get_meta(), ) ) self.create_connector(pull_data=[var.name for var in self.data_arrays]) def _connect(self, start_time): self.try_connect(start_time=start_time) if self.status != fm.ComponentStatus.CONNECTED: return self.ref_date = start_time ref_grid = _get_reference_grid(self.connector.in_infos, self.data_arrays) self.pv_mesh, self.is_structured, self.file_ext = _create_pv_mesh( ref_grid, self.legacy ) _prepare_writer_arrays( self.data_arrays, self.connector.in_infos, ref_grid, "PVDPushWriter", ) self._write_timestep(self.connector.in_data, start_time) self.step_counter += 1 def _write_timestep(self, data, time): name = _timestep_path( self.file_path, self.file_name, self.step_counter, self.file_digits, self.file_ext, ) self.vtk_files.append(name) self.time_steps.append((time - self.ref_date) / TIME_DELTAS[self.time_unit]) _set_mesh_data(self.pv_mesh, self.data_arrays, data, self.is_structured) self.pv_mesh.save(name, binary=self.binary) def _validate(self): pass def _update(self): pass def _finalize(self): if self.write_aux_file and self.ref_date is not None: save_dict_to_json( self.aux_file, time_unit=self.time_unit, reference_date=self.ref_date ) if self.ref_date is not None: if self.add_time_units_attribute: units = ( f"{self.time_unit} since " f"{self.ref_date.isoformat(sep=' ', timespec='seconds')}" ) else: units = None write_pvd_file(self.path, self.vtk_files, self.time_steps, units) self.pv_mesh = None # pylint: disable-next=unused-argument def _data_changed(self, name, caller, time): if self.status in ( fm.ComponentStatus.CONNECTED, fm.ComponentStatus.CONNECTING, fm.ComponentStatus.CONNECTING_IDLE, ): self.last_update = time return if not isinstance(time, datetime): raise ValueError("Time must be of type datetime") if self.status == fm.ComponentStatus.INITIALIZED: self.last_update = time return if time != self.last_update and self.pushed_inputs: raise ValueError("Data not pushed for all inputs") self.last_update = time self.pushed_inputs.add(name) if self.pushed_inputs != self.all_inputs: return data = { var.name: self.inputs[var.name].pull_data(time) for var in self.data_arrays } self._write_timestep(data, time) self.step_counter += 1 self.pushed_inputs.clear() self.update()