Source code for wecopttool.core

"""Core functionality for solving the pseudo-spectral problem for WEC.

Contains:

* The *WEC* class
* Functions for basic functionality

.. note:: All contents of this module are imported into *WecOpTool* and
          can be called directly as :python:`wecopttool.<function>`
          instead of :python:`wecopttool.core.<function>`.
"""


from __future__ import annotations


__all__ = [
    "WEC",
    "ncomponents",
    "frequency",
    "time",
    "time_mat",
    "derivative_mat",
    "derivative2_mat",
    "mimo_transfer_mat",
    "vec_to_dofmat",
    "dofmat_to_vec",
    "real_to_complex",
    "complex_to_real",
    "fd_to_td",
    "td_to_fd",
    "read_netcdf",
    "write_netcdf",
    "check_radiation_damping",
    "check_impedance",
    "force_from_rao_transfer_function",
    "force_from_impedance",
    "force_from_waves",
    "inertia",
    "standard_forces",
    "run_bem",
    "change_bem_convention",
    "add_linear_friction",
    "wave_excitation",
    "hydrodynamic_impedance",
    "atleast_2d",
    "degrees_to_radians",
    "subset_close",
    "scale_dofs",
    "decompose_state",
    "frequency_parameters",
    "time_results",
    "set_fb_centers",
]


import logging
from typing import Iterable, Callable, Any, Optional, Mapping, TypeVar, Union
from pathlib import Path
import warnings
from datetime import datetime

from numpy.typing import ArrayLike
import autograd.numpy as np
from autograd.numpy import ndarray
from autograd.builtins import isinstance, tuple, list, dict
from autograd import grad, jacobian
import xarray as xr
from xarray import DataArray, Dataset
import capytaine as cpy
from scipy.optimize import minimize, OptimizeResult, Bounds
from scipy.linalg import block_diag, dft


# logger
_log = logging.getLogger(__name__)

# autograd warnings
filter_msg = "Casting complex values to real discards the imaginary part"
warnings.filterwarnings("ignore", message=filter_msg)

# default values
_default_parameters = {'rho': 1025.0, 'g': 9.81, 'depth': np.infty}
_default_min_damping = 1e-6

# type aliases
TWEC = TypeVar("TWEC", bound="WEC")
TStateFunction = Callable[
    [TWEC, ndarray, ndarray, Dataset], ndarray]
TForceDict = dict[str, TStateFunction]
TIForceDict = Mapping[str, TStateFunction]
FloatOrArray = Union[float, ArrayLike]


[docs] class WEC: """A wave energy converter (WEC) object for performing simulations using the pseudo-spectral solution method. To create the WEC use one of the initialization methods: * :py:meth:`wecopttool.WEC.__init__` * :py:meth:`wecopttool.WEC.from_bem` * :py:meth:`wecopttool.WEC.from_floating_body` * :py:meth:`wecopttool.WEC.from_impedance`. .. note:: Direct initialization of a :py:class:`wecopttool.WEC` object as :python:`WEC(f1, nfreq, forces, ...)` using :py:meth:`wecopttool.WEC.__init__` is discouraged. Instead use one of the other initialization methods listed in the *See Also* section. To solve the pseudo-spectral problem use :py:meth:`wecopttool.WEC.solve`. """
[docs] def __init__( self, f1: float, nfreq: int, forces: TIForceDict, constraints: Optional[Iterable[Mapping]] = None, inertia_matrix: Optional[ndarray] = None, ndof: Optional[int] = None, inertia_in_forces: Optional[bool] = False, dof_names: Optional[Iterable[str]] = None, ) -> None: """Create a WEC object directly from its inertia matrix and list of forces. The :py:class:`wecopttool.WEC` class describes a WEC's equation of motion as :math:`ma=Σf` where the :python:`inertia_matrix` matrix specifies the inertia :math:`m`, and the :python:`forces` dictionary specifies the different forces to be summed. The forces can be linear or nonlinear. If :python:`inertia_in_forces is True` the equation of motion is :math:`Σf=0`, which is included to allow for initialization using an intrinsic impedance through the :python:`WEC.from_impedance` initialization function. .. note:: Direct initialization of a :py:class:`wecopttool.WEC` object as :python:`WEC(f1, nfreq, forces, ...)` is discouraged. Instead use one of the other initialization methods listed in the *See Also* section. Parameters ---------- f1 Fundamental frequency :python:`f1` [:math:`Hz`]. nfreq Number of frequencies (not including zero frequency), i.e., :python:`freqs = [0, f1, 2*f1, ..., nfreq*f1]`. forces Dictionary with entries :python:`{'force_name': fun}`, where :python:`fun` has a signature :python:`def fun(wec, x_wec, x_opt, waves):`, and returns forces in the time-domain of size :python:`(2*nfreq+1, ndof)`. constraints List of constraints, see documentation for :py:func:`scipy.optimize.minimize` for description and options of constraints dictionaries. If :python:`None`: empty list :python:`[]`. inertia_matrix Inertia matrix of size :python:`(ndof, ndof)`. Not used if :python:`inertia_in_forces` is :python:`True`. ndof Number of degrees of freedom. Must be specified if :python:`inertia_in_forces is True`, else not used. inertia_in_forces Set to True if inertial "forces" are included in the :python:`forces` argument. This scenario is rare. If using an intrinsic impedance, consider initializing with :py:meth:`wecoptool.core.WEC.from_impedance` instead. dof_names Names of the different degrees of freedom (e.g. :python:`'Heave'`). If :python:`None` the names :python:`['DOF_0', ..., 'DOF_N']` are used. Raises ------ ValueError If :python:`inertia_in_forces is True` but :python:`ndof` is not specified. ValueError If :python:`inertia_in_forces is False` but :python:`inertia_matrix` is not specified. ValueError If :python:`inertia_matrix` does not have the correct size (:python:`(ndof, ndof)`). ValueError If :python:`dof_names` does not have the correct size (:python:`ndof`). See Also -------- from_bem: Initialize a :py:class:`wecopttool.WEC` object from BEM results. from_floating_body: Initialize a :py:class:`wecopttool.WEC` object from a :py:class:`capytaine.bodies.bodies.FloatingBody` object. from_impedance: Initialize a :py:class:`wecopttool.WEC` object from an intrinsic impedance array and excitation coefficients. """ self._freq = frequency(f1, nfreq) self._time = time(f1, nfreq) self._time_mat = time_mat(f1, nfreq) self._derivative_mat = derivative_mat(f1, nfreq) self._derivative2_mat = derivative2_mat(f1, nfreq) self._forces = forces constraints = list(constraints) if (constraints is not None) else [] self._constraints = constraints # inertia options self._inertia_in_forces = inertia_in_forces def _missing(var_name, condition): msg = (f"'{var_name}' must be provided if 'inertia_in_forces' is" + f"'{condition}'.") return msg def _ignored(var_name, condition): msg = (f"'{var_name}' is not used when 'inertia_in_forces' is " + f"'{condition}' and should not be provided") return msg if inertia_in_forces: condition = "True" if inertia_matrix is not None: _log.warning(_ignored("inertia_matrix", condition)) inertia_matrix = None if ndof is None: raise ValueError(_missing("ndof", condition)) elif not inertia_in_forces: condition = "False" if inertia_matrix is None: raise ValueError(_missing("inertia_matrix", condition)) inertia_matrix = np.atleast_2d(np.squeeze(inertia_matrix)) if ndof is not None: _log.warning(_ignored("ndof", condition)) if ndof != inertia_matrix.shape[0]: _log.warning( "Provided value of 'ndof' does not match size of " + "'inertia_matrix'. Setting " + f"'ndof={inertia_matrix.shape[0]}'.") ndof = inertia_matrix.shape[0] if inertia_matrix.shape != (ndof, ndof): raise ValueError( "'inertia_matrix' must be a square matrix of size equal " + "to the number of degrees of freedom.") self._inertia_matrix = inertia_matrix self._ndof = ndof if inertia_in_forces: _inertia = None else: _inertia = inertia(f1, nfreq, inertia_matrix) self._inertia = _inertia # names if dof_names is None: dof_names = [f'DOF_{i}' for i in range(ndof)] elif len(dof_names) != ndof: raise ValueError("'dof_names' must have length 'ndof'.") self._dof_names = list(dof_names)
def __str__(self) -> str: str = (f'{self.__class__.__name__}: ' + f'DOFs ({self.ndof})={self.dof_names}, ' + f'f=[0, {self.f1}, ..., {self.nfreq}({self.f1})] Hz.') return str def __repr__(self) -> str: type_ = type(self) module = type_.__module__ qualname = type_.__qualname__ repr_org = f"<{module}.{qualname} object at {hex(id(self))}>" return repr_org + " :: " + self.__str__() # other initialization methods
[docs] @staticmethod def from_bem( bem_data: Union[Dataset, Union[str, Path]], friction: Optional[ndarray] = None, f_add: Optional[TIForceDict] = None, constraints: Optional[Iterable[Mapping]] = None, min_damping: Optional[float] = _default_min_damping, uniform_shift: Optional[bool] = False, dof_names: Optional[Iterable[str]] = None, ) -> TWEC: """Create a WEC object from linear hydrodynamic coefficients obtained using the boundary element method (BEM) code Capytaine. The :python:`bem_data` can be a dataset or the name of a *NetCDF* file containing the dataset. The returned :py:class:`wecopttool.WEC` object contains the inertia and the default linear forces: radiation, diffraction, and Froude-Krylov. Additional forces can be specified through :python:`f_add`. Note that because Capytaine uses a different sign convention, the direct results from capytaine must be modified using :py:func:`wecopttool.change_bem_convention` before calling this initialization function. Instead, the recommended approach is to use :py:func:`wecopttool.run_bem`, rather than running Capytaine directly, which outputs the results in the correct convention. The results can be saved using :py:func:`wecopttool.write_netcdf`. :py:func:`wecopttool.run_bem` also computes the inertia and hydrostatic stiffness which should be included in bem_data. Parameters ---------- bem_data Linear hydrodynamic coefficients obtained using the boundary element method (BEM) code Capytaine, with sign convention corrected. Also includes inertia and hydrostatic stiffness. friction Linear friction, in addition to radiation damping, of size :python:`(ndof, ndof)`. :python:`None` if included in :python:`bem_data` or to set to zero. f_add Dictionary with entries :python:`{'force_name': fun}`, where :python:`fun` has a signature :python:`def fun(wec, x_wec, x_opt, waves):`, and returns forces in the time-domain of size :python:`(2*nfreq+1, ndof)`. constraints List of constraints, see documentation for :py:func:`scipy.optimize.minimize` for description and options of constraints dictionaries. If :python:`None`: empty list :python:`[]`. min_damping Minimum damping level to ensure a stable system. See :py:func:`wecopttool.check_radiation_damping` for more details. uniform_shift Boolean determining whether damping corrections shifts the damping values uniformly for all frequencies or only for frequencies below :python:`min_damping`. See :py:func:`wecopttool.check_radiation_damping` for more details. dof_names Names of the different degrees of freedom (e.g. :python:`'Heave'`). If :python:`None` the names :python:`['DOF_0', ..., 'DOF_N']` are used. See Also -------- run_bem, add_linear_friction, change_bem_convention, write_netcdf, check_radiation_damping """ if isinstance(bem_data, (str, Path)): bem_data = read_netcdf(bem_data) # add friction hydro_data = add_linear_friction(bem_data, friction) inertia_matrix = hydro_data['inertia_matrix'].values # frequency array f1, nfreq = frequency_parameters( hydro_data.omega.values/(2*np.pi), False) # check real part of damping diagonal > 0 if min_damping is not None: hydro_data = check_radiation_damping( hydro_data, min_damping, uniform_shift) # forces in the dynamics equations linear_force_functions = standard_forces(hydro_data) f_add = f_add if (f_add is not None) else {} forces = linear_force_functions | f_add # constraints constraints = constraints if (constraints is not None) else [] return WEC(f1, nfreq, forces, constraints, inertia_matrix, dof_names=dof_names)
[docs] @staticmethod def from_floating_body( fb: cpy.FloatingBody, f1: float, nfreq: int, friction: Optional[ndarray] = None, f_add: Optional[TIForceDict] = None, constraints: Optional[Iterable[Mapping]] = None, min_damping: Optional[float] = _default_min_damping, wave_directions: Optional[ArrayLike] = np.array([0.0,]), rho: Optional[float] = _default_parameters['rho'], g: Optional[float] = _default_parameters['g'], depth: Optional[float] = _default_parameters['depth'], ) -> TWEC: """Create a WEC object from a Capytaine :python:`FloatingBody` (:py:class:capytaine.bodies.bodies.FloatingBody). A :py:class:capytaine.bodies.bodies.FloatingBody object contains information on the mesh and degrees of freedom. This initialization method calls :py:func:`wecopttool.run_bem` followed by :py:meth:`wecopttool.WEC.from_bem`. This will run Capytaine to obtain the linear hydrodynamic coefficients, which can take from a few minutes to several hours. Instead, if the hydrodynamic coefficients can be reused, it is recommended to run Capytaine first and save the results using :py:func:`wecopttool.run_bem` and :py:func:`wecopttool.write_netcdf`, and then initialize the :py:class:`wecopttool.WEC` object using :py:meth:`wecopttool.WEC.from_bem`. This initialization method should be reserved for the cases where the hydrodynamic coefficients constantly change and are not reused, as for example for geometry optimization. Parameters ---------- fb Capytaine FloatingBody. f1 Fundamental frequency :python:`f1` [:math:`Hz`]. nfreq Number of frequencies (not including zero frequency), i.e., :python:`freqs = [0, f1, 2*f1, ..., nfreq*f1]`. friction Linear friction, in addition to radiation damping, of size :python:`(ndof, ndof)`. :python:`None` to set to zero. f_add Dictionary with entries :python:`{'force_name': fun}`, where :python:`fun` has a signature :python:`def fun(wec, x_wec, x_opt, waves):`, and returns forces in the time-domain of size :python:`(2*nfreq, ndof)`. constraints List of constraints, see documentation for :py:func:`scipy.optimize.minimize` for description and options of constraints dictionaries. If :python:`None`: empty list :python:`[]`. min_damping Minimum damping level to ensure a stable system. See :py:func:`wecopttool.check_radiation_damping` for more details. wave_directions List of wave directions [degrees] to evaluate BEM at. rho Water density in :math:`kg/m^3`. g Gravitational acceleration in :math:`m/s^2`. depth Water depth in :math:`m`. Returns ------- WEC An instance of the :py:class:`wecopttool.WEC` class. See Also -------- run_bem, write_netcdf, WEC.from_bem """ # RUN BEM _log.info(f"Running Capytaine (BEM): {nfreq+1} frequencies x " + f"{len(wave_directions)} wave directions.") freq = frequency(f1, nfreq)[1:] bem_data = run_bem( fb, freq, wave_directions, rho=rho, g=g, depth=depth) wec = WEC.from_bem( bem_data, friction, f_add, constraints, min_damping=min_damping) return wec
[docs] @staticmethod def from_impedance( freqs: ArrayLike, impedance: DataArray, exc_coeff: DataArray, hydrostatic_stiffness: ndarray, f_add: Optional[TIForceDict] = None, constraints: Optional[Iterable[Mapping]] = None, min_damping: Optional[float] = _default_min_damping, uniform_shift: Optional[bool] = False, ) -> TWEC: """Create a WEC object from the intrinsic impedance and excitation coefficients. The intrinsic (mechanical) impedance :math:`Z(ω)` linearly relates excitation forces :math:`F(ω)` to WEC velocity :math:`U(ω)` as :math:`ZU=F`. Using linear hydrodynamic coefficients, e.g. from a BEM code like Capytaine, the impedance is given as :math:`Z(ω) = (m+A(ω))*iω + B(ω) + B_f + K/(iω)`. The impedance can also be obtained experimentally. Note that the impedance is not defined at :math:`ω=0`. Parameters ---------- freqs Frequency vector [:math:`Hz`] not including the zero frequency, :python:`freqs = [f1, 2*f1, ..., nfreq*f1]`. impedance Complex impedance of size :python:`(nfreq, ndof, ndof)`. exc_coeff Complex excitation transfer function of size :python:`(ndof, nfreq)`. hydrostatic_stiffness Linear hydrostatic restoring coefficient of size :python:`(ndof, ndof)`. f_add Dictionary with entries :python:`{'force_name': fun}`, where :python:`fun` has a signature :python:`def fun(wec, x_wec, x_opt, waves):`, and returns forces in the time-domain of size :python:`(2*nfreq, ndof)`. constraints List of constraints, see documentation for :py:func:`scipy.optimize.minimize` for description and options of constraints dictionaries. If :python:`None`: empty list :python:`[]`. min_damping Minimum damping level to ensure a stable system. See :py:func:`wecopttool.check_impedance` for more details. uniform_shift Boolean determining whether damping corrections shifts the damping values uniformly for all frequencies or only for frequencies below :python:`min_damping`. See :py:func:`wecopttool.check_radiation_damping` for more details. Raises ------ ValueError If :python:`impedance` does not have the correct size: :python:`(ndof, ndof, nfreq)`. """ f1, nfreq = frequency_parameters(freqs, False) # impedance matrix shape shape = impedance.shape ndim = impedance.ndim if (ndim!=3) or (shape[1]!=shape[2]) or (shape[0]!=nfreq): raise ValueError( "'impedance' must have shape '(nfreq, ndof, ndof)'.") impedance = check_impedance(impedance, min_damping) # impedance force omega = freqs * 2*np.pi omega0 = np.expand_dims(omega, [1,2]) transfer_func = impedance * (1j*omega0) transfer_func0 = np.expand_dims(hydrostatic_stiffness, 2) transfer_func = np.concatenate([transfer_func0, transfer_func], axis=0) transfer_func = -1 * transfer_func # RHS of equation: ma = Σf force_impedance = force_from_rao_transfer_function(transfer_func) # excitation force force_excitation = force_from_waves(exc_coeff) # all forces f_add = {} if (f_add is None) else f_add forces = { 'intrinsic_impedance': force_impedance, 'excitation': force_excitation } forces = forces | f_add # wec wec = WEC(f1, nfreq, forces, constraints, inertia_in_forces=True, ndof=shape[1]) return wec
[docs] def residual(self, x_wec: ndarray, x_opt: ndarray, waves: Dataset, ) -> float: """ Return the residual of the dynamic equation (r = m⋅a-Σf). Parameters ---------- x_wec WEC state vector. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. """ if not self.inertia_in_forces: ri = self.inertia(self, x_wec, x_opt, waves) else: ri = np.zeros([self.ncomponents, self.ndof]) # forces, -Σf for f in self.forces.values(): ri = ri - f(self, x_wec, x_opt, waves) return self.dofmat_to_vec(ri)
# solve
[docs] def solve(self, waves: Dataset, obj_fun: TStateFunction, nstate_opt: int, x_wec_0: Optional[ndarray] = None, x_opt_0: Optional[ndarray] = None, scale_x_wec: Optional[list] = None, scale_x_opt: Optional[FloatOrArray] = 1.0, scale_obj: Optional[float] = 1.0, optim_options: Optional[Mapping[str, Any]] = {}, use_grad: Optional[bool] = True, maximize: Optional[bool] = False, bounds_wec: Optional[Bounds] = None, bounds_opt: Optional[Bounds] = None, callback: Optional[TStateFunction] = None, ) -> list[OptimizeResult]: """Simulate WEC dynamics using a pseudo-spectral solution method and returns the raw results dictionary produced by :py:func:`scipy.optimize.minimize`. Parameters ---------- waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. obj_fun Objective function to minimize for pseudo-spectral solution, must have signature :python:`fun(wec, x_wec, x_opt, waves)` and return a scalar. nstate_opt Length of the optimization (controls) state vector. x_wec_0 Initial guess for the WEC dynamics state. If :python:`None` it is randomly initiated. x_opt_0 Initial guess for the optimization (control) state. If :python:`None` it is randomly initiated. scale_x_wec Factor(s) to scale each DOF in :python:`x_wec` by, to improve convergence. A single float or an array of size :python:`ndof`. scale_x_opt Factor(s) to scale :python:`x_opt` by, to improve convergence. A single float or an array of size :python:`nstate_opt`. scale_obj Factor to scale :python:`obj_fun` by, to improve convergence. optim_options Optimization options passed to the optimizer. See :py:func:`scipy.optimize.minimize`. use_grad If :python:`True`, optimization will utilize `autograd <https://github.com/HIPS/autograd>`_ for gradients. maximize Whether to maximize the objective function. The default is to minimize the objective function. bounds_wec Bounds on the WEC components of the decision variable. See :py:func:`scipy.optimize.minimize`. bounds_opt Bounds on the optimization (control) components of the decision variable. See :py:func:`scipy.optimize.minimize`. callback Function called after each iteration, must have signature :python:`fun(wec, x_wec, x_opt, waves)`. The default provides status reports at each iteration via logging at the INFO level. Raises ------ ValueError If :python:`scale_x_opt` is a scalar and :python:`nstate_opt` is not provided. Exception If the optimizer fails for any reason other than maximum number of states, i.e. for exit modes other than 0 or 9. See :py:mod:`scipy.optimize` for exit mode details. Examples -------- The :py:meth:`wecopttool.WEC.solve` method only returns the raw results dictionary produced by :py:func:`scipy.optimize.minimize`. >>> res_opt = wec.solve(waves=wave, obj_fun=pto.average_power, nstate_opt=2*nfreq+1) To get the post-processed results for the :py:class:`wecopttool.WEC` and :py:class:`wecopttool.pto.PTO` for a single realization, you may call >>> realization = 0 # realization index >>> res_wec_fd, res_wec_td = wec.post_process(wec,res_opt,wave,nsubsteps) >>> res_pto_fd, res_pto_td = pto.post_process(wec,res_opt,wave,nsubsteps) See Also -------- wecopttool.waves, wecopttool.core.wec.post_process, wecopttool.core.pto.post_process, """ results = [] # x_wec scaling vector if scale_x_wec is None: scale_x_wec = [1.0] * self.ndof elif isinstance(scale_x_wec, float) or isinstance(scale_x_wec, int): scale_x_wec = [scale_x_wec] * self.ndof scale_x_wec = scale_dofs(scale_x_wec, self.ncomponents) # x_opt scaling vector if isinstance(scale_x_opt, float) or isinstance(scale_x_opt, int): if nstate_opt is None: raise ValueError("If 'scale_x_opt' is a scalar, " + "'nstate_opt' must be provided") scale_x_opt = scale_dofs([scale_x_opt], nstate_opt) # composite scaling vector scale = np.concatenate([scale_x_wec, scale_x_opt]) # decision variable initial guess if x_wec_0 is None: x_wec_0 = np.random.randn(self.nstate_wec) if x_opt_0 is None: x_opt_0 = np.random.randn(nstate_opt) x0 = np.concatenate([x_wec_0, x_opt_0])*scale # bounds if (bounds_wec is None) and (bounds_opt is None): bounds = None else: bounds_in = [bounds_wec, bounds_opt] for idx, bii in enumerate(bounds_in): if isinstance(bii, tuple): bounds_in[idx] = Bounds(lb=[xibs[0] for xibs in bii], ub=[xibs[1] for xibs in bii]) inf_wec = np.ones(self.nstate_wec)*np.inf inf_opt = np.ones(nstate_opt)*np.inf bounds_dflt = [Bounds(lb=-inf_wec, ub=inf_wec), Bounds(lb=-inf_opt, ub=inf_opt)] bounds_list = [] for bi, bd in zip(bounds_in, bounds_dflt): if bi is not None: bo = bi else: bo = bd bounds_list.append(bo) bounds = Bounds(lb=np.hstack([le.lb for le in bounds_list])*scale, ub=np.hstack([le.ub for le in bounds_list])*scale) for realization, wave in waves.groupby('realization'): _log.info("Solving pseudo-spectral control problem " + f"for realization number {realization}.") try: wave = wave.squeeze(dim='realization') except KeyError: pass # objective function sign = -1.0 if maximize else 1.0 def obj_fun_scaled(x): x_wec, x_opt = self.decompose_state(x/scale) return obj_fun(self, x_wec, x_opt, wave)*scale_obj*sign # constraints constraints = self.constraints.copy() for i, icons in enumerate(self.constraints): icons_new = {"type": icons["type"]} def make_new_fun(icons): def new_fun(x): x_wec, x_opt = self.decompose_state(x/scale) return icons["fun"](self, x_wec, x_opt, wave) return new_fun icons_new["fun"] = make_new_fun(icons) if use_grad: icons_new['jac'] = jacobian(icons_new['fun']) constraints[i] = icons_new # system dynamics through equality constraint, ma - Σf = 0 def scaled_resid_fun(x): x_s = x/scale x_wec, x_opt = self.decompose_state(x_s) return self.residual(x_wec, x_opt, wave) eq_cons = {'type': 'eq', 'fun': scaled_resid_fun} if use_grad: eq_cons['jac'] = jacobian(scaled_resid_fun) constraints.append(eq_cons) # callback if callback is None: def callback_scipy(x): x_wec, x_opt = self.decompose_state(x) max_x_opt = np.nan if np.size(x_opt)==0 else np.max(np.abs(x_opt)) _log.info("Scaled [max(x_wec), max(x_opt), obj_fun(x)]: " + f"[{np.max(np.abs(x_wec)):.2e}, " + f"{max_x_opt:.2e}, " + f"{obj_fun_scaled(x):.2e}]") else: def callback_scipy(x): x_s = x/scale x_wec, x_opt = self.decompose_state(x_s) return callback(self, x_wec, x_opt, wave) # optimization problem optim_options['disp'] = optim_options.get('disp', True) problem = {'fun': obj_fun_scaled, 'x0': x0, 'method': 'SLSQP', 'constraints': constraints, 'options': optim_options, 'bounds': bounds, 'callback': callback_scipy, } if use_grad: problem['jac'] = grad(obj_fun_scaled) # minimize optim_res = minimize(**problem) msg = f'{optim_res.message} (Exit mode {optim_res.status})' if optim_res.status == 0: _log.info(msg) elif optim_res.status == 9: _log.warning(msg) else: raise Exception(msg) # unscale optim_res.x = optim_res.x / scale optim_res.fun = optim_res.fun / scale_obj optim_res.jac = optim_res.jac / scale_obj * scale results.append(optim_res) return results
[docs] def post_process(self, wec: TWEC, res: Union[OptimizeResult, Iterable], waves: Dataset, nsubsteps: Optional[int] = 1, ) -> tuple[list[Dataset], list[Dataset]]: """Post-process the results from :py:meth:`wecopttool.WEC.solve`. Parameters ---------- wec :py:class:`wecopttool.WEC` object. res Results produced by :py:meth:`wecopttool.WEC.solve`. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. Returns ------- results_fd Dynamic responses in the frequency-domain. results_td Dynamic responses in the time-domain. Examples -------- The :py:meth:`wecopttool.WEC.solve` method only returns the raw results dictionary produced by :py:func:`scipy.optimize.minimize`. >>> res_opt = wec.solve(waves=wave, obj_fun=pto.average_power, nstate_opt=2*nfreq+1) To get the post-processed results we may call >>> res_wec_fd, res_wec_td = wec.post_process(wec, res_opt,wave) Note that :py:meth:`wecopttool.WEC.solve` method produces a list of results objects (one for each phase realization). """ assert self == wec , ("The same wec object should be used to call " + "post-process and be passed as an input.") def _postproc(res, waves, nsubsteps): create_time = f"{datetime.utcnow()}" omega_vals = np.concatenate([[0], waves.omega.values]) freq_vals = np.concatenate([[0], waves.freq.values]) period_vals = np.concatenate([[np.inf], 1/waves.freq.values]) pos_attr = {'long_name': 'Position', 'units': 'm or rad'} vel_attr = {'long_name': 'Velocity', 'units': 'm/s or rad/s'} acc_attr = {'long_name': 'Acceleration', 'units': 'm/s^2 or rad/s^2'} omega_attr = {'long_name': 'Radial frequency', 'units': 'rad/s'} freq_attr = {'long_name': 'Frequency', 'units': 'Hz'} period_attr = {'long_name': 'Period', 'units': 's'} time_attr = {'long_name': 'Time', 'units': 's'} dof_attr = {'long_name': 'Degree of freedom'} force_attr = {'long_name': 'Force or moment', 'units': 'N or Nm'} wave_elev_attr = {'long_name': 'Wave elevation', 'units': 'm'} x_wec, x_opt = self.decompose_state(res.x) omega_coord = ("omega", omega_vals, omega_attr) freq_coord = ("omega", freq_vals, freq_attr) period_coord = ("omega", period_vals, period_attr) dof_coord = ("influenced_dof", self.dof_names, dof_attr) # frequency domain force_da_list = [] for name, force in self.forces.items(): force_td_tmp = force(self, x_wec, x_opt, waves) force_fd = self.td_to_fd(force_td_tmp) force_da = DataArray(data=force_fd, dims=["omega", "influenced_dof"], coords={ 'omega': omega_coord, 'freq': freq_coord, 'period': period_coord, 'influenced_dof': dof_coord}, attrs=force_attr ).expand_dims({'type': [name]}) force_da_list.append(force_da) fd_forces = xr.concat(force_da_list, dim='type') fd_forces.type.attrs['long_name'] = 'Type' fd_forces.name = 'force' fd_forces.attrs['long_name'] = 'Force' pos = self.vec_to_dofmat(x_wec) pos_fd = real_to_complex(pos) vel = self.derivative_mat @ pos vel_fd = real_to_complex(vel) acc = self.derivative2_mat @ pos acc_fd = real_to_complex(acc) fd_state = Dataset( data_vars={ 'pos': (['omega', 'influenced_dof'], pos_fd, pos_attr), 'vel': (['omega', 'influenced_dof'], vel_fd, vel_attr), 'acc': (['omega', 'influenced_dof'], acc_fd, acc_attr)}, coords={ 'omega': omega_coord, 'freq': freq_coord, 'period': period_coord, 'influenced_dof': dof_coord}, attrs={"time_created_utc": create_time} ) results_fd = xr.merge([fd_state, fd_forces, waves]) results_fd = results_fd.transpose('omega', 'influenced_dof', 'type', 'wave_direction') results_fd = results_fd.fillna(0) # time domain t_dat = self.time_nsubsteps(nsubsteps) time = DataArray( data=t_dat, name='time', dims='time', coords=[t_dat]) results_td = results_fd.map(lambda x: time_results(x, time)) results_td['pos'].attrs = pos_attr results_td['vel'].attrs = vel_attr results_td['acc'].attrs = acc_attr results_td['wave_elev'].attrs = wave_elev_attr results_td['force'].attrs = force_attr results_td['time'].attrs = time_attr results_td.attrs['time_created_utc'] = create_time return results_fd, results_td results_fd = [] results_td = [] for idx, ires in enumerate(res): ifd, itd = _postproc(ires, waves.sel(realization=idx), nsubsteps) results_fd.append(ifd) results_td.append(itd) return results_fd, results_td
# properties @property def forces(self) -> TForceDict: """Dictionary of forces.""" return self._forces @forces.setter def forces(self, val): self._forces = dict(val) @property def constraints(self) -> list[dict]: """List of constraints.""" return self._constraints @constraints.setter def constraints(self, val): self._constraints = list(val) @property def inertia_in_forces(self) -> bool: """Whether inertial "forces" are included in the :python:`forces` dictionary. """ return self._inertia_in_forces @property def inertia_matrix(self) -> ndarray: """Inertia (mass) matrix. :python:`None` if :python:`inertia_in_forces is True`. """ return self._inertia_matrix @property def inertia(self) -> TStateFunction: """Function representing the inertial term :math:`ma` in the WEC's dynamics equation. """ return self._inertia @property def dof_names(self) -> list[str]: """Names of the different degrees of freedom.""" return self._dof_names @property def ndof(self) -> int: """Number of degrees of freedom.""" return self._ndof @property def frequency(self) -> ndarray: """Frequency vector [:math:`Hz`].""" return self._freq @property def f1(self) -> float: """Fundamental frequency :python:`f1` [:math:`Hz`].""" return self._freq[1] @property def nfreq(self) -> int: """Number of frequencies, not including the zero-frequency.""" return len(self._freq)-1 @property def omega(self) -> ndarray: """Radial frequency vector [rad/s].""" return self._freq * (2*np.pi) @property def period(self) -> ndarray: """Period vector [s].""" return np.concatenate([[np.Infinity], 1/self._freq[1:]]) @property def w1(self) -> float: """Fundamental radial frequency [rad/s].""" return self.omega[1] @property def time(self) -> ndarray: """Time vector [s], size '(2*nfreq, ndof)', not containing the end time 'tf'.""" return self._time @property def time_mat(self) -> ndarray: """Matrix to create time-series from Fourier coefficients. For some array of Fourier coefficients :python:`x` (excluding the sine component of the highest frequency), size :python:`(2*nfreq, ndof)`, the time series is obtained via :python:`time_mat @ x`, also size :python:`(2*nfreq, ndof)`. """ return self._time_mat @property def derivative_mat(self) -> ndarray: """Matrix to create Fourier coefficients of the derivative of some quantity. For some array of Fourier coefficients :python:`x` (excluding the sine component of the highest frequency), size :python:`(2*nfreq, ndof)`, the Fourier coefficients of the derivative of :python:`x` are obtained via :python:`derivative_mat @ x`. """ return self._derivative_mat @property def derivative2_mat(self) -> ndarray: """Matrix to create Fourier coefficients of the second derivative of some quantity. For some array of Fourier coefficients :python:`x` (excluding the sine component of the highest frequency), size :python:`(2*nfreq, ndof)`, the Fourier coefficients of the second derivative of :python:`x` are obtained via :python:`derivative2_mat @ x`. """ return self._derivative2_mat @property def dt(self) -> float: """Time spacing [s].""" return self._time[1] @property def tf(self) -> float: """Final time (repeat period) [s]. Not included in :python:`time` vector. """ return 1/self.f1 @property def nt(self) -> int: """Number of timesteps.""" return self.ncomponents @property def ncomponents(self) -> int: """Number of Fourier components (:python:`2*nfreq`) for each degree of freedom. Note that the sine component of the highest frequency (the 2-point wave) is excluded as this will always evaluate to zero. """ return ncomponents(self.nfreq) @property def nstate_wec(self) -> int: """Length of the WEC dynamics state vector consisting of the Fourier coefficient of the position of each degree of freedom. """ return self.ndof * self.ncomponents # other methods
[docs] def decompose_state(self, state: ndarray ) -> tuple[ndarray, ndarray]: """Split the state vector into the WEC dynamics state and the optimization (control) state. Calls :py:func:`wecopttool.decompose_state` with the appropriate inputs for the WEC object. Examples -------- >>> x_wec, x_opt = wec.decompose_state(x) Parameters ---------- state Combined WEC and optimization states. Returns ------- state_wec WEC state vector. state_opt Optimization (control) state. See Also -------- decompose_state """ return decompose_state(state, self.ndof, self.nfreq)
[docs] def time_nsubsteps(self, nsubsteps: int) -> ndarray: """Create a time vector with finer discretization. Calls :py:func:`wecopttool.time` with the appropriate inputs for the WEC object. Parameters ---------- nsubsteps Number of substeps between implied/default time steps. See Also -------- time, WEC.time """ return time(self.f1, self.nfreq, nsubsteps)
[docs] def time_mat_nsubsteps(self, nsubsteps: int) -> ndarray: """Create a time matrix similar to :py:meth:`wecopttool.WEC.time_mat` but with finer time-domain discretization. Calls :py:func:`wecopttool.time_mat` with the appropriate inputs for the WEC object. Parameters ---------- nsubsteps Number of substeps between implied/default time steps. See Also -------- time_mat, WEC.time_mat, WEC.time_nsubsteps """ return time_mat(self.f1, self.nfreq, nsubsteps)
[docs] def vec_to_dofmat(self, vec: ndarray) -> ndarray: """Convert a vector to a matrix with one column per degree of freedom. Opposite of :py:meth:`wecopttool.WEC.dofmat_to_vec`. Calls :py:func:`wecopttool.vec_to_dofmat` with the appropriate inputs for the WEC object. Examples -------- >>> x_wec, x_opt = wec.decompose_state(x) >>> x_wec_mat = wec.vec_to_dofmat(x_wec) Parameters ---------- vec One-dimensional vector. See Also -------- vec_to_dofmat, WEC.dofmat_to_vec """ return vec_to_dofmat(vec, self.ndof)
[docs] def dofmat_to_vec(self, mat: ndarray) -> ndarray: """Flatten a matrix to a vector. Opposite of :py:meth:`wecopttool.WEC.vec_to_dofmat`. Calls :py:func:`wecopttool.dofmat_to_vec` with the appropriate inputs for the WEC object. Parameters ---------- mat Matrix with one column per degree of freedom. See Also -------- dofmat_to_vec, WEC.vec_to_dofmat """ return dofmat_to_vec(mat)
[docs] def fd_to_td(self, fd: ndarray) -> ndarray: """Convert a frequency-domain array to time-domain. Opposite of :py:meth:`wecopttool.WEC.td_to_fd`. Calls :py:func:`wecopttool.fd_to_td` with the appropriate inputs for the WEC object. Parameters ---------- fd Frequency-domain complex array with shape :python:`(WEC.nfreq+1, N)` for any :python:`N`. See Also -------- fd_to_td, WEC.td_to_fd """ return fd_to_td(fd, self.f1, self.nfreq, True)
[docs] def td_to_fd( self, td: ndarray, fft: Optional[bool] = True, ) -> ndarray: """Convert a time-domain array to frequency-domain. Opposite of :py:meth:`wecopttool.WEC.fd_to_td`. Calls :py:func:`wecopttool.fd_to_td` with the appropriate inputs for the WEC object. Parameters ---------- td Time-domain real array with shape :python:`(2*WEC.nfreq, N)` for any :python:`N`. fft Whether to use the real FFT. See Also -------- td_to_fd, WEC.fd_to_td """ return td_to_fd(td, fft, True)
[docs] def ncomponents( nfreq : int, zero_freq: Optional[bool] = True, ) -> int: """Number of Fourier components (:python:`2*nfreq`) for each DOF. The sine component of the highest frequency (the 2-point wave) is excluded as it will always evaluate to zero. If :python:`zero_freq = False` (not default), the mean (DC) component :python:`X0` is excluded, and the number of components is reduced by 1. Parameters ---------- nfreq Number of frequencies. zero_freq Whether to include the zero-frequency. """ ncomp = 2*nfreq - 1 if zero_freq: ncomp = ncomp + 1 return ncomp
[docs] def frequency( f1: float, nfreq: int, zero_freq: Optional[bool] = True, ) -> ndarray: """Construct equally spaced frequency array. The array includes :python:`0` and has length of :python:`nfreq+1`. :python:`f1` is fundamental frequency (1st harmonic). Returns the frequency array, e.g., :python:`freqs = [0, f1, 2*f1, ..., nfreq*f1]`. If :python:`zero_freq = False` (not default), the mean (DC) component :python:`0` is excluded, and the vector length is reduced by 1. Parameters ---------- f1 Fundamental frequency :python:`f1` [:math:`Hz`]. nfreq Number of frequencies. zero_freq Whether to include the zero-frequency. """ freq = np.arange(0, nfreq+1)*f1 freq = freq[1:] if not zero_freq else freq return freq
[docs] def time( f1: float, nfreq: int, nsubsteps: Optional[int] = 1, ) -> ndarray: """Assemble the time vector with :python:`nsubsteps` subdivisions. Returns the 1D time vector, in seconds, starting at time :python:`0`, and not containing the end time :python:`tf=1/f1`. The time vector has length :python:`(2*nfreq)*nsubsteps`. The timestep length is :python:`dt = dt_default * 1/nsubsteps`, where :python:`dt_default=tf/(2*nfreq)`. Parameters ---------- f1 Fundamental frequency :python:`f1` [:math:`Hz`]. nfreq Number of frequencies. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ if nsubsteps < 1: raise ValueError("'nsubsteps' must be 1 or greater") nsteps = nsubsteps * ncomponents(nfreq) return np.linspace(0, 1/f1, nsteps, endpoint=False)
[docs] def time_mat( f1: float, nfreq: int, nsubsteps: Optional[int] = 1, zero_freq: Optional[bool] = True, ) -> ndarray: """Assemble the time matrix that converts the state to a time-series. For a state :math:`x` consisting of the mean (DC) component followed by the real and imaginary components of the Fourier coefficients (excluding the imaginary component of the 2-point wave) as :math:`x=[X0, Re(X1), Im(X1), ..., Re(Xn)]`, the response vector in the time-domain (:math:`x(t)`) is given as :math:`Mx`, where :math:`M` is the time matrix. The time matrix has size :python:`(nfreq*2, nfreq*2)`. If :python:`zero_freq = False` (not default), the mean (DC) component :python:`X0` is excluded, and the matrix/vector length is reduced by 1. Parameters --------- f1 Fundamental frequency :python:`f1` [:math:`Hz`]. nfreq Number of frequencies. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. zero_freq Whether the first frequency should be zero. """ t = time(f1, nfreq, nsubsteps) omega = frequency(f1, nfreq) * 2*np.pi wt = np.outer(t, omega[1:]) ncomp = ncomponents(nfreq) time_mat = np.empty((nsubsteps*ncomp, ncomp)) time_mat[:, 0] = 1.0 time_mat[:, 1::2] = np.cos(wt) time_mat[:, 2::2] = -np.sin(wt[:, :-1]) # remove 2pt wave sine component if not zero_freq: time_mat = time_mat[:, 1:] return time_mat
[docs] def derivative_mat( f1: float, nfreq: int, zero_freq: Optional[bool] = True, ) -> ndarray: """Assemble the derivative matrix that converts the state vector of a response to the state vector of its derivative. For a state :math:`x` consisting of the mean (DC) component followed by the real and imaginary components of the Fourier coefficients (excluding the imaginary component of the 2-point wave) as :math:`x=[X0, Re(X1), Im(X1), ..., Re(Xn)]`, the state of its derivative is given as :math:`Dx`, where :math:`D` is the derivative matrix. The time matrix has size :python:`(nfreq*2, nfreq*2)`. If :python:`zero_freq = False` (not default), the mean (DC) component :python:`X0` is excluded, and the matrix/vector length is reduced by 1. Parameters --------- f1 Fundamental frequency :python:`f1` [:math:`Hz`]. nfreq Number of frequencies. zero_freq Whether the first frequency should be zero. """ def block(n): return np.array([[0, -1], [1, 0]]) * n*f1 * 2*np.pi blocks = [block(n+1) for n in range(nfreq)] if zero_freq: blocks = [0.0] + blocks deriv_mat = block_diag(*blocks) return deriv_mat[:-1, :-1] # remove 2pt wave sine component
[docs] def derivative2_mat( f1: float, nfreq: int, zero_freq: Optional[bool] = True, ) -> ndarray: """Assemble the second derivative matrix that converts the state vector of a response to the state vector of its second derivative. For a state :math:`x` consisting of the mean (DC) component followed by the real and imaginary components of the Fourier coefficients (excluding the imaginary component of the 2-point wave) as :math:`x=[X0, Re(X1), Im(X1), ..., Re(Xn)]`, the state of its second derivative is given as :math:`(DD)x`, where :math:`DD` is the second derivative matrix. The time matrix has size :python:`(nfreq*2, nfreq*2)`. If :python:`zero_freq = False` (not default), the mean (DC) component :python:`X0` is excluded, and the matrix/vector length is reduced by 1. Parameters --------- f1 Fundamental frequency :python:`f1` [:math:`Hz`]. nfreq Number of frequencies. zero_freq Whether the first frequency should be zero. """ vals = [((n+1)*f1 * 2*np.pi)**2 for n in range(nfreq)] diagonal = np.repeat(-np.ones(nfreq) * vals, 2)[:-1] # remove 2pt wave sine if zero_freq: diagonal = np.concatenate(([0.0], diagonal)) return np.diag(diagonal)
[docs] def mimo_transfer_mat( transfer_mat: DataArray, zero_freq: Optional[bool] = True, ) -> ndarray: """Create a block matrix of the MIMO transfer function. The input is a complex transfer matrix that relates the complex Fourier representation of two variables. For example, it can be an impedance matrix or an RAO transfer matrix. The input complex impedance matrix has shape :python`(nfreq, ndof, ndof)`. Returns the 2D real matrix that transform the state representation of the input variable variable to the state representation of the output variable. Here, a state representation :python:`x` consists of the mean (DC) component followed by the real and imaginary components of the Fourier coefficients (excluding the imaginary component of the 2-point wave) as :python:`x=[X0, Re(X1), Im(X1), ..., Re(Xn)]`. If :python:`zero_freq = False` (not default), the mean (DC) component :python:`X0` is excluded, and the matrix/vector length is reduced by 1. Parameters ---------- transfer_mat Complex transfer matrix. zero_freq Whether the first frequency should be zero. """ ndof = transfer_mat.shape[1] assert transfer_mat.shape[2] == ndof elem = [[None]*ndof for _ in range(ndof)] def block(re, im): return np.array([[re, -im], [im, re]]) for idof in range(ndof): for jdof in range(ndof): if zero_freq: Zp0 = transfer_mat[0, idof, jdof] assert np.all(np.isreal(Zp0)) Zp0 = np.real(Zp0) Zp = transfer_mat[1:, idof, jdof] else: Zp0 = [0.0] Zp = transfer_mat[:, idof, jdof] re = np.real(Zp) im = np.imag(Zp) blocks = [block(ire, iim) for (ire, iim) in zip(re[:-1], im[:-1])] blocks = [Zp0] + blocks + [re[-1]] elem[idof][jdof] = block_diag(*blocks) return np.block(elem)
[docs] def vec_to_dofmat(vec: ArrayLike, ndof: int) -> ndarray: """Convert a vector back to a matrix with one column per DOF. Returns a matrix with :python:`ndof` columns. The number of rows is inferred from the size of the input vector. Opposite of :py:func:`wecopttool.dofmat_to_vec`. Parameters ---------- vec 1D array consisting of concatenated arrays of several DOFs, as :python:`vec = [vec_1, vec_2, ..., vec_ndof]`. ndof Number of degrees of freedom. See Also -------- dofmat_to_vec, """ return np.reshape(vec, (-1, ndof), order='F')
[docs] def dofmat_to_vec(mat: ArrayLike) -> ndarray: """Flatten a matrix that has one column per DOF. Returns a 1D vector. Opposite of :py:func:`wecopttool.vec_to_dofmat`. Parameters ---------- mat Matrix to be flattened. See Also -------- vec_to_dofmat, """ return np.reshape(mat, -1, order='F')
[docs] def real_to_complex( fd: ArrayLike, zero_freq: Optional[bool] = True, ) -> ndarray: """Convert from two real amplitudes to one complex amplitude per frequency. The input is a real 2D array with each column containing the real and imaginary components of the Fourier coefficients for some response, excluding the imaginary component of the highest frequency (2-point wave). The column length is :python:`2*nfreq`. The entries of a column representing a response :python:`x` are :python:`x=[X0, Re(X1), Im(X1), ..., Re(Xn)]`. Returns a complex 2D array with each column containing the complex Fourier coefficients. Columns are length :python:`nfreq+1`, and the first row corresponds to the real-valued zero-frequency (mean, DC) components. The entries of a column representing a response :python:`x` are :python:`x=[X0, X1, ..., Xn]`. If :python:`zero_freq = False`, the mean (DC) component :python:`X0` is excluded, and the column length is reduced by 1. Parameters ---------- fd Array containing the real and imaginary components of the Fourier coefficients. zero_freq Whether the mean (DC) component is included. See Also -------- complex_to_real, """ fd= atleast_2d(fd) if zero_freq: assert fd.shape[0]%2==0 mean = fd[0:1, :] fd = fd[1:, :] fdc = np.append(fd[0:-1:2, :] + 1j*fd[1::2, :], [fd[-1, :]], axis=0) if zero_freq: fdc = np.concatenate((mean, fdc), axis=0) return fdc
[docs] def complex_to_real( fd: ArrayLike, zero_freq: Optional[bool] = True, ) -> ndarray: """Convert from one complex amplitude to two real amplitudes per frequency. The input is a complex 2D array with each column containing the Fourier coefficients for some response. Columns are length :python:`nfreq+1`, and the first row corresponds to the real-valued zero-frequency (mean, DC) components. The entries of a column representing a response :python:`x` are :python:`x=[X0, X1, ..., Xn]`. Returns a real 2D array with each column containing the real and imaginary components of the Fourier coefficients. The imaginary component of the highest frequency (the 2-point wave) is excluded, as it will always evaluate to zero. The column length is :python:`2*nfreq`. The entries of a column representing a response :python:`x` are :python:`x=[X0, Re(X1), Im(X1), ..., Re(Xn)]`. If :python:`zero_freq = False` (not default), the mean (DC) component :python:`X0` is excluded, and the vector length is reduced by 1. Parameters ---------- fd Array containing the complex Fourier coefficients. zero_freq Whether the mean (DC) component is included. See Also -------- real_to_complex, """ fd = atleast_2d(fd) nfreq = fd.shape[0] - 1 if zero_freq else fd.shape[0] ndof = fd.shape[1] if zero_freq: assert np.all(np.isreal(fd[0, :])) a = np.real(fd[0:1, :]) b = np.real(fd[1:-1, :]) c = np.imag(fd[1:-1, :]) d = np.real(fd[-1:, :]) else: b = np.real(fd[:-1, :]) c = np.imag(fd[:-1, :]) d = np.real(fd[-1:, :]) out = np.concatenate([np.transpose(b), np.transpose(c)]) out = np.reshape(np.reshape(out, [-1], order='F'), [-1, ndof]) if zero_freq: out = np.concatenate([a, out, d]) assert out.shape == (2*nfreq, ndof) else: out = np.concatenate([out, d]) assert out.shape == (2*nfreq-1, ndof) return out
[docs] def fd_to_td( fd: ArrayLike, f1: Optional[float] = None, nfreq: Optional[int] = None, zero_freq: Optional[bool] = True, ) -> ndarray: """Convert a complex array of Fourier coefficients to a real array of time-domain responses. The input is a complex 2D array with each column containing the Fourier coefficients for some response. Columns are length :python:`nfreq+1`, and the first row corresponds to the real-valued zero-frequency (mean, DC) components. The entries of a column representing a response :python:`x` are :python:`x=[X0, X1, ..., Xn]`. Returns a real array with same number of columns and :python:`2*nfreq` rows, containing the time-domain response at times :python:`wecopttool.time(f1, nfreq, nsubsteps=1)`. The imaginary component of the highest frequency (the 2-point wave) is excluded, as it will always evaluate to zero. If both :python:`f1` and :python:`nfreq` are provided, it uses the time matrix :python:`wecopttool.time_mat(f1, nfreq, nsubsteps=1)`, else it uses the inverse real FFT (:py:func:`numpy.fft.irfft`). If :python:`zero_freq = False` (not default), the mean (DC) component :python:`X0` is excluded, and the matrix/vector length is reduced by 1. Opposite of :py:meth:`wecopttool.td_to_fd`. Parameters ---------- fd Array containing the complex Fourier coefficients. f1 Fundamental frequency :python:`f1` [:math:`Hz`]. nfreq Number of frequencies. zero_freq Whether the mean (DC) component is included. Raises ------ ValueError If only one of :python:`f1` or :python:`nfreq` is provided. Must provide both or neither. See Also -------- td_to_fd, time, time_mat """ fd = atleast_2d(fd) if zero_freq: msg = "The first row must be real when `zero_freq=True`." assert np.allclose(np.imag(fd[0, :]), 0), msg if (f1 is not None) and (nfreq is not None): tmat = time_mat(f1, nfreq, zero_freq=zero_freq) td = tmat @ complex_to_real(fd, zero_freq) elif (f1 is None) and (nfreq is None): n = 2*(fd.shape[0]-1) fd = np.concatenate((fd[:1, :], fd[1:-1, :]/2, fd[-1:, :])) td = np.fft.irfft(fd, n=n, axis=0, norm='forward') else: raise ValueError( "Provide either both 'f1' and 'nfreq' or neither.") return td
[docs] def td_to_fd( td: ArrayLike, fft: Optional[bool] = True, zero_freq: Optional[bool] = True, ) -> ndarray: """Convert a real array of time-domain responses to a complex array of Fourier coefficients. If :python:`zero_freq = False` (not default), the mean (DC) component :python:`X0` is excluded, and the matrix/vector length is reduced by 1. Opposite of :py:func:`wecopttool.fd_to_td` Parameters ---------- td Real array of time-domains responses. fft Whether to use the real FFT. zero_freq Whether the mean (DC) component is returned. See Also -------- fd_to_td """ td= atleast_2d(td) n = td.shape[0] if fft: fd = np.fft.rfft(td, n=n, axis=0, norm='forward') else: fd = np.dot(dft(n, 'n')[:n//2+1, :], td) fd = np.concatenate((fd[:1, :], fd[1:-1, :]*2, fd[-1:, :])) if not zero_freq: fd = fd[1:, :] return fd
[docs] def read_netcdf(fpath: Union[str, Path]) -> Dataset: """Read a *NetCDF* file with possibly complex entries as a :py:class:`xarray.Dataset`. Can handle complex entries in the *NetCDF* by using :py:func:`capytaine.io.xarray.merge_complex_values`. Parameters ---------- fpath Path to the *NetCDF* file. See Also -------- write_netcdf, """ with xr.open_dataset(fpath) as ds: ds.load() return cpy.io.xarray.merge_complex_values(ds)
[docs] def write_netcdf(fpath: Union[str, Path], data: Dataset) -> None: """Save an :py:class:`xarray.Dataset` with possibly complex entries as a *NetCDF* file. Can handle complex entries in the *NetCDF* by using :py:func:`capytaine.io.xarray.separate_complex_values` Parameters ---------- fpath Name of file to save. data Dataset to save. See Also -------- read_netcdf, """ cpy.io.xarray.separate_complex_values(data).to_netcdf(fpath)
[docs] def check_radiation_damping( hydro_data: Dataset, min_damping: Optional[float] = 1e-6, uniform_shift: Optional[bool] = False, ) -> Dataset: """Ensure that the linear hydrodynamics (friction + radiation damping) have positive damping. Shifts the :python:`friction` or :python:`radiation_damping` up if necessary. Returns the (possibly) updated Dataset with :python:`damping` :math:`>=` :python:`min_damping`. Parameters ---------- hydro_data Linear hydrodynamic data. min_damping Minimum threshold for damping. Default is 1e-6. uniform_shift Boolean that determines whether the damping correction for each degree of freedom is frequency dependent or not. If :python:`True`, the damping correction is applied to :python:`friction` and shifts the damping for all frequencies. If :python:`False`, the damping correction is applied to :python:`radiation_damping` and only shifts the damping for frequencies with negative damping values. Default is :python:`False`. """ hydro_data_new = hydro_data.copy(deep=True) radiation = hydro_data_new['radiation_damping'] friction = hydro_data_new['friction'] ndof = len(hydro_data_new.influenced_dof) assert ndof == len(hydro_data.radiating_dof) for idof in range(ndof): iradiation = radiation.isel(radiating_dof=idof, influenced_dof=idof) ifriction = friction.isel(radiating_dof=idof, influenced_dof=idof) if uniform_shift: dmin = (iradiation+ifriction).min() if dmin <= 0.0 + min_damping: dof = hydro_data_new.influenced_dof.values[idof] delta = min_damping-dmin _log.warning( f'Linear damping for DOF "{dof}" has negative or close ' + 'to zero terms. Shifting up radiation damping by ' + f'{delta.values} N/(m/s).') hydro_data_new['radiation_damping'][:, idof, idof] = (iradiation + delta) else: new_damping = iradiation.where( iradiation+ifriction>min_damping, other=min_damping) dof = hydro_data_new.influenced_dof.values[idof] if (new_damping==min_damping).any(): _log.warning( f'Linear damping for DOF "{dof}" has negative or close to ' + 'zero terms. Shifting up damping terms ' + f'{np.where(new_damping==min_damping)[0]} to a minimum of ' + f'{min_damping} N/(m/s)') hydro_data_new['radiation_damping'][:, idof, idof] = new_damping return hydro_data_new
[docs] def check_impedance( Zi: DataArray, min_damping: Optional[float] = 1e-6, uniform_shift: Optional[bool] = False, ) -> DataArray: """Ensure that the real part of the impedance (resistive) is positive. Adds to real part of the impedance. Returns the (possibly) updated impedance with :math:`Re(Zi)>=` :python:`min_damping`. Parameters ---------- Zi Linear hydrodynamic impedance. min_damping Minimum threshold for damping. Default is 1e-6. """ Zi_diag = np.diagonal(Zi,axis1=1,axis2=2) Zi_shifted = Zi.copy() for dof in range(Zi_diag.shape[1]): if uniform_shift: dmin = np.min(np.real(Zi_diag[:, dof])) if dmin < min_damping: delta = min_damping - dmin Zi_shifted[:, dof, dof] = Zi_diag[:, dof] \ + np.abs(delta) _log.warning( f'Real part of impedance for {dof} has negative or close to ' + f'zero terms. Shifting up by {delta:.2f}') else: points = np.where(np.real(Zi_diag[:, dof])<min_damping) Zi_dof_real = Zi_diag[:,dof].real.copy() Zi_dof_imag = Zi_diag[:,dof].imag.copy() Zi_dof_real[Zi_dof_real < min_damping] = min_damping Zi_shifted[:, dof, dof] = Zi_dof_real + Zi_dof_imag*1j if (Zi_dof_real==min_damping).any(): _log.warning( f'Real part of impedance for {dof} has negative or close to ' + f'zero terms. Shifting up elements ' f'{np.where(Zi_dof_real==min_damping)[0]} to a minimum of ' + f' {min_damping} N/(m/s)') return Zi_shifted
[docs] def force_from_rao_transfer_function( rao_transfer_mat: DataArray, zero_freq: Optional[bool] = True, ) -> TStateFunction: """Create a force function from its position transfer matrix. This is the position equivalent to the velocity-based :py:func:`wecopttool.force_from_impedance`. If :python:`zero_freq = False` (not default), the mean (DC) component of the transfer matrix (first row) is excluded. Parameters ---------- rao_transfer_mat Complex position transfer matrix. zero_freq Whether the first frequency should be zero. Default is :python:`True`. See Also -------- force_from_impedance, """ def force(wec, x_wec, x_opt, waves): transfer_mat = mimo_transfer_mat(rao_transfer_mat, zero_freq) force_fd = wec.vec_to_dofmat(np.dot(transfer_mat, x_wec)) return np.dot(wec.time_mat, force_fd) return force
[docs] def force_from_impedance( omega: ArrayLike, impedance: DataArray, ) -> TStateFunction: """Create a force function from its impedance. Parameters ---------- omega Radial frequency vector. impedance Complex impedance matrix. See Also -------- force_from_rao_transfer_function, """ return force_from_rao_transfer_function(impedance*(1j*omega), False)
[docs] def force_from_waves(force_coeff: DataArray, ) -> TStateFunction: """Create a force function from waves excitation coefficients. Parameters ---------- force_coeff Complex excitation coefficients indexed by frequency and direction angle. """ def force(wec, x_wec, x_opt, waves): force_fd = complex_to_real(wave_excitation(force_coeff, waves), False) return np.dot(wec.time_mat[:, 1:], force_fd) return force
[docs] def inertia( f1: float, nfreq: int, inertia_matrix: ArrayLike, ) -> TStateFunction: """Create the inertia "force" from the inertia matrix. Parameters ---------- f1 Fundamental frequency :python:`f1` [:math:`Hz`]. nfreq Number of frequencies. inertia_matrix Inertia matrix. """ omega = np.expand_dims(frequency(f1, nfreq, False)*2*np.pi, [1,2]) inertia_matrix = np.expand_dims(inertia_matrix, 0) rao_transfer_function = -1*omega**2*inertia_matrix + 0j inertia_fun = force_from_rao_transfer_function( rao_transfer_function, False) return inertia_fun
[docs] def standard_forces(hydro_data: Dataset) -> TForceDict: """Create functions for linear hydrodynamic forces. Returns a dictionary with the standard linear forces: radiation, hydrostatic, friction, Froude—Krylov, and diffraction. The functions are type :python:`StateFunction` (see Type Aliases in API Documentation). Parameters ---------- hydro_data Linear hydrodynamic data. """ # intrinsic impedance w = hydro_data['omega'] A = hydro_data['added_mass'] B = hydro_data['radiation_damping'] K = hydro_data['hydrostatic_stiffness'] Bf = hydro_data['friction'] rao_transfer_functions = dict() rao_transfer_functions['radiation'] = (1j*w*B + -1*w**2*A, False) rao_transfer_functions['friction'] = (1j*w*Bf, False) # include zero_freq in hydrostatics hs = ((K + 0j).expand_dims({"omega": B.omega}, 0)) tmp = hs.isel(omega=0).copy(deep=True) tmp['omega'] = tmp['omega'] * 0 hs = xr.concat([tmp, hs], dim='omega') #, data_vars='minimal') rao_transfer_functions['hydrostatics'] = (hs, True) linear_force_functions = dict() for name, (value, zero_freq) in rao_transfer_functions.items(): value = value.transpose("omega", "radiating_dof", "influenced_dof") value = -1*value # RHS of equation: ma = Σf linear_force_functions[name] = ( force_from_rao_transfer_function(value, zero_freq)) # wave excitation excitation_coefficients = { 'Froude_Krylov': hydro_data['Froude_Krylov_force'], 'diffraction': hydro_data['diffraction_force'] } for name, value in excitation_coefficients.items(): linear_force_functions[name] = force_from_waves(value) return linear_force_functions
[docs] def run_bem( fb: cpy.FloatingBody, freq: Iterable[float] = [np.infty], wave_dirs: Iterable[float] = [0], rho: float = _default_parameters['rho'], g: float = _default_parameters['g'], depth: float = _default_parameters['depth'], write_info: Optional[Mapping[str, bool]] = None, njobs: int = 1, ) -> Dataset: """Run Capytaine for a range of frequencies and wave directions. This simplifies running *Capytaine* and ensures the output are in the correct convention (see :py:func:`wecopttool.change_bem_convention`). It creates the *test matrix*, calls :py:meth:`capytaine.bodies.bodies.FloatingBody.keep_immersed_part`, calls :py:meth:`capytaine.bem.solver.BEMSolver.fill_dataset`, and changes the sign convention using :py:func:`wecopttool.change_bem_convention`. Parameters ---------- fb The WEC as a Capytaine floating body (mesh + DOFs). freq List of frequencies [:math:`Hz`] to evaluate BEM at. wave_dirs List of wave directions [degrees] to evaluate BEM at. rho Water density in :math:`kg/m^3`. g Gravitational acceleration in :math:`m/s^2`. depth Water depth in :math:`m`. write_info Which additional information to write. Options are: :python:`['hydrostatics', 'mesh', 'wavelength', 'wavenumber']`. See :py:func:`capytaine.io.xarray.assemble_dataset` for more details. njobs Number of jobs to run in parallel. See :py:meth:`capytaine.bem.solver.BEMSolver.fill_dataset` See Also -------- change_bem_convention, """ if wave_dirs is not None: wave_dirs = np.atleast_1d(degrees_to_radians(wave_dirs)) solver = cpy.BEMSolver() test_matrix = Dataset(coords={ 'rho': [rho], 'water_depth': [depth], 'omega': [ifreq*2*np.pi for ifreq in freq], 'wave_direction': wave_dirs, 'radiating_dof': list(fb.dofs.keys()), 'g': [g], }) if wave_dirs is None: # radiation only problem, no diffraction or excitation test_matrix = test_matrix.drop_vars('wave_direction') if write_info is None: write_info = {'hydrostatics': True, 'mesh': False, 'wavelength': False, 'wavenumber': False, } wec_im = fb.copy(name=f"{fb.name}_immersed").keep_immersed_part() wec_im = set_fb_centers(wec_im, rho=rho) if not hasattr(wec_im, 'inertia_matrix'): _log.warning('FloatingBody has no inertia_matrix field. ' + 'If the FloatingBody mass is defined, it will be ' + 'used for calculating the inertia matrix here. ' + 'Otherwise, the neutral buoyancy assumption will ' + 'be used to auto-populate.') wec_im.inertia_matrix = wec_im.compute_rigid_body_inertia(rho=rho) if not hasattr(wec_im, 'hydrostatic_stiffness'): _log.warning('FloatingBody has no hydrostatic_stiffness field. ' + 'Capytaine will auto-populate the hydrostatic ' + 'stiffness based on the provided mesh.') wec_im.hydrostatic_stiffness = wec_im.compute_hydrostatic_stiffness(rho=rho, g=g) bem_data = solver.fill_dataset( test_matrix, wec_im, n_jobs=njobs, **write_info) return change_bem_convention(bem_data)
[docs] def change_bem_convention(bem_data: Dataset) -> Dataset: """Change the convention from :math:`-iωt` to :math:`+iωt`. Change the linear hydrodynamic coefficients from the Capytaine convention (:math:`x(t)=Xe^{-iωt}`), where :math:`X` is the frequency-domain response, to the more standard convention used in WecOptTool (:math:`x(t)=Xe^{+iωt}`). NOTE: This might change in Capytaine in the future. Parameters ---------- bem_data Linear hydrodynamic coefficients for the WEC. """ bem_data['Froude_Krylov_force'] = np.conjugate( bem_data['Froude_Krylov_force']) bem_data['diffraction_force'] = np.conjugate(bem_data['diffraction_force']) return bem_data
[docs] def add_linear_friction( bem_data: Dataset, friction: Optional[ArrayLike] = None ) -> Dataset: """Add linear friction to BEM data. Returns the Dataset with the additional information added. Parameters ---------- bem_data Linear hydrodynamic coefficients obtained using the boundary element method (BEM) code Capytaine, with sign convention corrected. Also includes inertia and hydrostatic stiffness. friction Linear friction, in addition to radiation damping, of size :python:`(ndof, ndof)`. :python:`None` if included in :python:`bem_data` or to set to zero. """ dims = ['radiating_dof', 'influenced_dof'] hydro_data = bem_data.copy(deep=True) if friction is not None: if 'friction' in hydro_data.variables.keys(): if not np.allclose(data, hydro_data.variables[name]): raise ValueError( f'Variable "friction" is already in BEM data ' + f'with different values.') else: _log.warning( f'Variable "{name}" is already in BEM data ' + 'with same value.') else: data = atleast_2d(friction) hydro_data['friction'] = (dims, friction) elif friction is None: ndof = len(hydro_data["influenced_dof"]) hydro_data['friction'] = (dims, np.zeros([ndof, ndof])) return hydro_data
[docs] def wave_excitation(exc_coeff: DataArray, waves: Dataset) -> ndarray: """Calculate the complex, frequency-domain, excitation force due to waves. The resulting force is indexed only by frequency and not direction angle. The input :python:`waves` frequencies must be same as :python:`exc_coeff`, but the directions can be a subset. Parameters ---------- exc_coeff Complex excitation coefficients indexed by frequency and direction angle. waves Complex frequency-domain wave elevation. Raises ------ ValueError If the frequency vectors of :python:`exc_coeff` and :python:`waves` are different. ValueError If any of the directions in :python:`waves` is not in :python:`exc_coeff`. """ omega_w = waves['omega'].values omega_e = exc_coeff['omega'].values dir_w = waves['wave_direction'].values dir_e = exc_coeff['wave_direction'].values exc_coeff = exc_coeff.values wave_elev_fd = np.expand_dims(waves.values, -1) if not np.allclose(omega_w, omega_e): raise ValueError(f"Wave and excitation frequencies do not match. WW: {omega_w}, EE: {omega_e}") subset, sub_ind = subset_close(dir_w, dir_e) if not subset: raise ValueError( "Some wave directions are not in excitation coefficients " + f"\n Wave direction(s): {(np.rad2deg(dir_w))} (deg)" + f"\n BEM direction(s): {np.rad2deg(dir_e)} (deg).") return np.sum(wave_elev_fd*exc_coeff[:, sub_ind, :], axis=1)
[docs] def hydrodynamic_impedance(hydro_data: Dataset) -> Dataset: """Calculate hydrodynamic intrinsic impedance. Parameters ---------- hydro_data Dataset with linear hydrodynamic coefficients produced by :py:func:`wecopttool.add_linear_friction`. """ hydro_impedance = (hydro_data['inertia_matrix'] \ + hydro_data['added_mass'])*1j*hydro_data['omega'] \ + hydro_data['radiation_damping'] + hydro_data['friction'] \ + hydro_data['hydrostatic_stiffness']/1j/hydro_data['omega'] return hydro_impedance.transpose('omega', 'radiating_dof', 'influenced_dof')
[docs] def atleast_2d(array: ArrayLike) -> ndarray: """Ensure an array is at least 2D, otherwise add trailing dimensions to make it 2D. This differs from :py:func:`numpy.atleast_2d` in that the additional dimensions are appended at the end rather than at the begining. This might be an option in :py:func:`numpy.atleast_2d` in the future, see `NumPy #12336 <https://github.com/numpy/numpy/issues/12336>`_. Parameters ---------- array Input array. """ array = np.atleast_1d(array) return np.expand_dims(array, -1) if len(array.shape)==1 else array
[docs] def degrees_to_radians( degrees: FloatOrArray, sort: Optional[bool] = True, ) -> Union[float, ndarray]: """Convert a 1D array of angles in degrees to radians in the range :math:`[-π, π)` and optionally sort them. Parameters ---------- degrees 1D array of angles in degrees. sort Whether to sort the angles from smallest to largest in :math:`[-π, π)`. """ radians = np.asarray(np.remainder(np.deg2rad(degrees), 2*np.pi)) radians[radians > np.pi] -= 2*np.pi if radians.size > 1 and sort: radians = np.sort(radians) return radians
[docs] def subset_close( set_a: FloatOrArray, set_b: FloatOrArray, rtol: float = 1.e-5, atol: float = 1.e-8, equal_nan: bool = False, ) -> tuple[bool, list]: """Check if the first set :python:`set_a` is contained, to some tolerance, in the second set :python:`set_b`. Parameters ---------- set_a First array which is tested for being subset. set_b Second array which is tested for containing :python:`set_a`. rtol The relative tolerance parameter. Passed to :py:func:`numpy.isclose`. atol The absolute tolerance parameter. Passed to :py:func:`numpy.isclose`. equal_nan Whether to compare NaNs as equal. Passed to :py:func:`numpy.isclose`. Returns ------- subset Whether the first array is a subset of the second array. ind List with integer indices where the first array's elements are located inside the second array. Only contains values if :python:`subset==True`. Raises ------ ValueError If either of the two arrays contains repeated elements. """ set_a = np.atleast_1d(set_a) set_b = np.atleast_1d(set_b) if len(np.unique(set_a.round(decimals = 6))) != len(set_a): raise ValueError("Elements in set_a not unique") if len(np.unique(set_b.round(decimals = 6))) != len(set_b): raise ValueError("Elements in set_b not unique") ind = [] for el in set_a: a_in_b = np.isclose(set_b, el, rtol=rtol, atol=atol, equal_nan=equal_nan) if np.sum(a_in_b) == 1: ind.append(np.flatnonzero(a_in_b)[0]) if np.sum(a_in_b) > 1: _log.warning('Multiple matching elements in subset, ' + 'selecting closest match.') ind.append(np.argmin(np.abs(a_in_b - el))) subset = len(set_a) == len(ind) ind = ind if subset else [] return subset, ind
[docs] def scale_dofs(scale_list: Iterable[float], ncomponents: int) -> ndarray: """Create a scaling vector based on a different scale for each DOF. Returns a 1D array of length :python:`NDOF*ncomponents` where the number of DOFs (:python:`NDOF`) is the length of :python:`scale_list`. The first :python:`ncomponents` entries have the value of the first scale :python:`scale_list[0]`, the next :python:`ncomponents` entries have the value of the second scale :python:`scale_list[1]`, and so on. Parameters ---------- scale_list Scale for each DOF. ncomponents Number of elements in the state vector for each DOF. """ ndof = len(scale_list) scale = [] for dof in range(ndof): scale += [scale_list[dof]] * ncomponents return np.array(scale)
[docs] def decompose_state( state: ndarray, ndof: int, nfreq: int, ) -> tuple[ndarray, ndarray]: """Split the state vector into the WEC dynamics state and the optimization (control) state. The WEC dynamics state consists of the Fourier coefficients of the position of each degree of freedom. The optimization state depends on the chosen control states for the problem. Parameters ---------- state Combined WEC and optimization states. ndof Number of degrees of freedom for the WEC dynamics. nfreq Number of frequencies. Returns ------- state_wec WEC state vector. state_opt Optimization (control) state. """ nstate_wec = ndof * ncomponents(nfreq) return state[:nstate_wec], state[nstate_wec:]
[docs] def frequency_parameters( freqs: ArrayLike, zero_freq: bool = True, ) -> tuple[float, int]: """Return the fundamental frequency and the number of frequencies in a frequency array. This function can be used as a check for inputs to other functions since it raises an error if the frequency vector does not have the correct format :python:`freqs = [0, f1, 2*f1, ..., nfreq*f1]` (or :python:`freqs = [f1, 2*f1, ..., nfreq*f1]` if :python:`zero_freq = False`). Parameters ---------- freqs The frequency array, starting at zero and having equal spacing. zero_freq Whether the first frequency should be zero. Returns ------- f1 Fundamental frequency :python:`f1` [:math:`Hz`] nfreq Number of frequencies (not including zero frequency), i.e., :python:`freqs = [0, f1, 2*f1, ..., nfreq*f1]`. Raises ------ ValueError If the frequency vector is not evenly spaced. ValueError If the zero-frequency was expected but not included or not expected but included. """ if np.isclose(freqs[0], 0.0): if zero_freq: freqs0 = freqs[:] else: raise ValueError('Zero frequency was included.') else: if zero_freq: raise ValueError( 'Frequency array must start with the zero frequency.') else: freqs0 = np.concatenate([[0.0,], freqs]) f1 = freqs0[1] nfreq = len(freqs0) - 1 f_check = np.arange(0, f1*(nfreq+0.5), f1) if not np.allclose(f_check, freqs0): raise ValueError("Frequency array 'omega' must be evenly spaced by" + "the fundamental frequency " + "(i.e., 'omega = [0, f1, 2*f1, ..., nfreq*f1]')") return f1, nfreq
[docs] def time_results(fd: DataArray, time: DataArray) -> ndarray: """Create a :py:class:`xarray.DataArray` of time-domain results from :py:class:`xarray.DataArray` of frequency-domain results. Parameters ---------- fd Frequency domain response. time Time array. """ out = np.zeros((*fd.isel(omega=0).shape, len(time))) for w, mag in zip(fd.omega, fd): out = out + \ np.real(mag)*np.cos(w*time) - np.imag(mag)*np.sin(w*time) return out
[docs] def set_fb_centers( fb: FloatingBody, rho: float = _default_parameters["rho"], ) -> FloatingBody: """Sets default properties if not provided by the user: - `center_of_mass` is set to the geometric centroid - `rotation_center` is set to the center of mass """ valid_properties = ['center_of_mass', 'rotation_center'] for property in valid_properties: if not hasattr(fb, property): setattr(fb, property, None) if getattr(fb, property) is None: if property == 'center_of_mass': def_val = fb.center_of_buoyancy log_str = ( "Using the geometric centroid as the center of gravity (COG).") elif property == 'rotation_center': def_val = fb.center_of_mass log_str = ( "Using the center of gravity (COG) as the rotation center " + "for hydrostatics.") setattr(fb, property, def_val) _log.warning(log_str) elif getattr(fb, property) is not None: _log.warning( f'{property} already defined as {getattr(fb, property)}.') return fb