"""
VTK writer components.
"""
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path
import finam as fm
from .tools import (
TIME_DELTAS,
_create_pv_mesh,
_get_reference_grid,
_prepare_writer_arrays,
_set_mesh_data,
_timestep_path,
create_data_array_list,
get_time_unit,
save_dict_to_json,
write_pvd_file,
)
[docs]
class VTKStaticWriter(fm.Component):
"""
VTK writer component for static inputs.
Parameters
----------
path : pathlike
Path to the VTK file to be written.
Suffix will be set according to input grid type unless ``legacy=True``.
inputs : list of str or DataArray.
List of inputs. Input is either defined by name or a :class:`DataArray` instance.
legacy : bool, optional
Whether to use the legacy vtk file format.
By default: False
binary : bool, optional
Whether to use the binary file format.
By default: True
"""
def __init__(self, path, inputs, legacy=False, binary=True):
super().__init__()
self.path = Path(path)
self.data_arrays = create_data_array_list(inputs)
if not self.data_arrays:
raise ValueError("VTKStaticWriter: no inputs specified.")
self.pv_mesh = None
self.is_structured = None
self.file_ext = None
self.legacy = legacy
self.binary = binary
self.status = fm.ComponentStatus.CREATED
def _initialize(self):
for var in self.data_arrays:
var.info_kwargs.setdefault("units", None)
self.inputs.add(
name=var.name,
time=None,
static=True,
grid=var.info_kwargs.get("grid", None),
**var.get_meta(),
)
self.create_connector(pull_data=[var.name for var in self.data_arrays])
def _connect(self, start_time):
self.try_connect(start_time=start_time)
if self.status != fm.ComponentStatus.CONNECTED:
return
ref_grid = _get_reference_grid(self.connector.in_infos, self.data_arrays)
self.pv_mesh, self.is_structured, self.file_ext = _create_pv_mesh(
ref_grid, self.legacy
)
_prepare_writer_arrays(
self.data_arrays,
self.connector.in_infos,
ref_grid,
"VTKStaticWriter",
)
self._write_data(self.connector.in_data)
def _write_data(self, data):
_set_mesh_data(self.pv_mesh, self.data_arrays, data, self.is_structured)
self.path = self.path.with_suffix(self.file_ext)
self.pv_mesh.save(self.path, binary=self.binary)
def _validate(self):
pass
def _update(self):
pass
def _finalize(self):
self.pv_mesh = None
[docs]
class PVDTimedWriter(fm.TimeComponent):
"""
PVD writer component that writes in predefined time intervals.
Parameters
----------
path : pathlike
Path to the PVD file to be written. Will have a forced .pvd suffix.
inputs : list of str or DataArray.
List of inputs. Input is either defined by name or a :class:`DataArray` instance.
step : datetime.timedelta
Time step
file_prefix : str or None, optional
file name prefix for the separate VTK files.
Will get an added number for each time-step.
By default: None - the file stem of the PVD file is used.
file_digits : int, optional
Number of digits for the added number in the file name for each time-step.
By default: 4
legacy : bool, optional
Whether to use the legacy vtk file format.
By default: False
binary : bool, optional
Whether to use the binary file format.
By default: True
aux_file : pathlike or None, optional
file name for a json file with auxiliary information about
reference date and time unit.
By default: None - the file stem of the PVD file is used with .json extension.
write_aux_file : bool, optional
Whether to write the aux file. By default: True.
add_time_units_attribute : bool, optional
Whether to add the time units as attribute to the collection in the PVD file.
By default: True.
"""
def __init__(
self,
path,
inputs,
step,
time_unit=None,
file_prefix=None,
file_digits=4,
legacy=False,
binary=True,
aux_file=None,
write_aux_file=True,
add_time_units_attribute=True,
):
super().__init__()
if step is not None and not isinstance(step, timedelta):
raise ValueError("Step must be None or of type timedelta")
self.path = Path(path).with_suffix(".pvd")
self.aux_file = Path(aux_file or self.path.with_suffix(".json"))
self.file_path = Path(file_prefix or self.path.with_suffix(""))
self.file_name = self.file_path.stem
self.file_digits = file_digits
self.data_arrays = create_data_array_list(inputs)
if not self.data_arrays:
raise ValueError("PVDTimedWriter: no inputs specified.")
self._step = step
self.time_unit = get_time_unit(self._step, time_unit)
self.pv_mesh = None
self.ref_grid = None
self.ref_date = None
self.is_structured = None
self.file_ext = None
self.legacy = legacy
self.binary = binary
self.step_counter = 0
self.vtk_files = []
self.time_steps = []
self.write_aux_file = write_aux_file
self.add_time_units_attribute = add_time_units_attribute
self.status = fm.ComponentStatus.CREATED
def _next_time(self):
return self.time + self._step
def _initialize(self):
for var in self.data_arrays:
var.info_kwargs.setdefault("units", None)
self.inputs.add(
name=var.name,
time=self.time,
grid=var.info_kwargs.get("grid", None),
**var.get_meta(),
)
self.create_connector(pull_data=[var.name for var in self.data_arrays])
def _connect(self, start_time):
self.try_connect(start_time=start_time)
if self.status != fm.ComponentStatus.CONNECTED:
return
self.ref_date = start_time
self._time = start_time
ref_grid = _get_reference_grid(self.connector.in_infos, self.data_arrays)
self.pv_mesh, self.is_structured, self.file_ext = _create_pv_mesh(
ref_grid, self.legacy
)
_prepare_writer_arrays(
self.data_arrays,
self.connector.in_infos,
ref_grid,
"PVDTimedWriter",
)
self._write_timestep(self.connector.in_data)
def _write_timestep(self, data):
name = _timestep_path(
self.file_path,
self.file_name,
self.step_counter,
self.file_digits,
self.file_ext,
)
self.vtk_files.append(name)
self.time_steps.append(
(self.time - self.ref_date) / TIME_DELTAS[self.time_unit]
)
_set_mesh_data(self.pv_mesh, self.data_arrays, data, self.is_structured)
self.pv_mesh.save(name, binary=self.binary)
def _validate(self):
pass
def _update(self):
self._time = self.next_time
self.step_counter += 1
data = {i.name: self[i.name].pull_data(self.time) for i in self.data_arrays}
self._write_timestep(data)
def _finalize(self):
# write aux json file
if self.write_aux_file:
save_dict_to_json(
self.aux_file, time_unit=self.time_unit, reference_date=self.ref_date
)
# write pvd file
if self.add_time_units_attribute:
units = f"{self.time_unit} since {self.ref_date.isoformat(sep=' ', timespec='seconds')}"
else:
units = None
write_pvd_file(self.path, self.vtk_files, self.time_steps, units)
[docs]
class PVDPushWriter(fm.Component):
"""
PVD writer component that writes on push to its inputs.
Note that all connected data sources must push data for the same timestamps.
Parameters
----------
path : pathlike
Path to the PVD file to be written. Will have a forced .pvd suffix.
inputs : list of str or DataArray.
List of inputs. Input is either defined by name or a :class:`DataArray` instance.
time_unit : str, optional
Unit of the PVD timesteps. Supported values are ``"days"``, ``"hours"``,
``"minutes"``, and ``"seconds"``. By default: ``"seconds"``.
file_prefix : str or None, optional
File name prefix for the separate VTK files.
Will get an added number for each time-step.
By default: None - the file stem of the PVD file is used.
file_digits : int, optional
Number of digits for the added number in the file name for each time-step.
By default: 4
legacy : bool, optional
Whether to use the legacy vtk file format.
By default: False
binary : bool, optional
Whether to use the binary file format.
By default: True
aux_file : pathlike or None, optional
File name for a json file with auxiliary information about
reference date and time unit.
By default: None - the file stem of the PVD file is used with .json extension.
write_aux_file : bool, optional
Whether to write the aux file. By default: True.
add_time_units_attribute : bool, optional
Whether to add the time units as attribute to the collection in the PVD file.
By default: True.
"""
def __init__(
self,
path,
inputs,
time_unit="seconds",
file_prefix=None,
file_digits=4,
legacy=False,
binary=True,
aux_file=None,
write_aux_file=True,
add_time_units_attribute=True,
):
super().__init__()
self.path = Path(path).with_suffix(".pvd")
self.aux_file = Path(aux_file or self.path.with_suffix(".json"))
self.file_path = Path(file_prefix or self.path.with_suffix(""))
self.file_name = self.file_path.stem
self.file_digits = file_digits
self.data_arrays = create_data_array_list(inputs)
if not self.data_arrays:
raise ValueError("PVDPushWriter: no inputs specified.")
if time_unit not in TIME_DELTAS:
raise ValueError(f"PVDPushWriter: time unit '{time_unit}' not supported.")
self.time_unit = time_unit
self.pv_mesh = None
self.ref_date = None
self.is_structured = None
self.file_ext = None
self.legacy = legacy
self.binary = binary
self.step_counter = 0
self.vtk_files = []
self.time_steps = []
self.write_aux_file = write_aux_file
self.add_time_units_attribute = add_time_units_attribute
self.last_update = None
self.all_inputs = {var.name for var in self.data_arrays}
self.pushed_inputs = set()
self.status = fm.ComponentStatus.CREATED
def _initialize(self):
for var in self.data_arrays:
var.info_kwargs.setdefault("units", None)
self.inputs.add(
io=fm.CallbackInput(
name=var.name,
callback=partial(self._data_changed, var.name),
time=None,
grid=var.info_kwargs.get("grid", None),
**var.get_meta(),
)
)
self.create_connector(pull_data=[var.name for var in self.data_arrays])
def _connect(self, start_time):
self.try_connect(start_time=start_time)
if self.status != fm.ComponentStatus.CONNECTED:
return
self.ref_date = start_time
ref_grid = _get_reference_grid(self.connector.in_infos, self.data_arrays)
self.pv_mesh, self.is_structured, self.file_ext = _create_pv_mesh(
ref_grid, self.legacy
)
_prepare_writer_arrays(
self.data_arrays,
self.connector.in_infos,
ref_grid,
"PVDPushWriter",
)
self._write_timestep(self.connector.in_data, start_time)
self.step_counter += 1
def _write_timestep(self, data, time):
name = _timestep_path(
self.file_path,
self.file_name,
self.step_counter,
self.file_digits,
self.file_ext,
)
self.vtk_files.append(name)
self.time_steps.append((time - self.ref_date) / TIME_DELTAS[self.time_unit])
_set_mesh_data(self.pv_mesh, self.data_arrays, data, self.is_structured)
self.pv_mesh.save(name, binary=self.binary)
def _validate(self):
pass
def _update(self):
pass
def _finalize(self):
if self.write_aux_file and self.ref_date is not None:
save_dict_to_json(
self.aux_file, time_unit=self.time_unit, reference_date=self.ref_date
)
if self.ref_date is not None:
if self.add_time_units_attribute:
units = (
f"{self.time_unit} since "
f"{self.ref_date.isoformat(sep=' ', timespec='seconds')}"
)
else:
units = None
write_pvd_file(self.path, self.vtk_files, self.time_steps, units)
self.pv_mesh = None
# pylint: disable-next=unused-argument
def _data_changed(self, name, caller, time):
if self.status in (
fm.ComponentStatus.CONNECTED,
fm.ComponentStatus.CONNECTING,
fm.ComponentStatus.CONNECTING_IDLE,
):
self.last_update = time
return
if not isinstance(time, datetime):
raise ValueError("Time must be of type datetime")
if self.status == fm.ComponentStatus.INITIALIZED:
self.last_update = time
return
if time != self.last_update and self.pushed_inputs:
raise ValueError("Data not pushed for all inputs")
self.last_update = time
self.pushed_inputs.add(name)
if self.pushed_inputs != self.all_inputs:
return
data = {
var.name: self.inputs[var.name].pull_data(time) for var in self.data_arrays
}
self._write_timestep(data, time)
self.step_counter += 1
self.pushed_inputs.clear()
self.update()