Source code for wecopttool.pto

"""Provide power take-off (PTO) forces and produced energy functions
for common PTO control approaches.

The PTO produced energy can be used as the objective function for the
control optimization.
The PTO force can be included as an additional force in the WEC
dynamics.

Contains:

* The *PTO* class
* Controller functions

"""


from __future__ import annotations


__all__ = [
    "PTO",
    "controller_unstructured",
    "controller_pid",
    "controller_pi",
    "controller_p",
]


from typing import Optional, TypeVar, Callable, Union

import autograd.numpy as np
from autograd.builtins import isinstance, tuple, list, dict
from autograd.numpy import ndarray
from scipy.linalg import block_diag
from scipy.optimize import OptimizeResult
from xarray import DataArray, Dataset
from datetime import datetime
from scipy.optimize import OptimizeResult

from wecopttool.core import complex_to_real, td_to_fd
from wecopttool.core import dofmat_to_vec, vec_to_dofmat
from wecopttool.core import TWEC, TStateFunction, FloatOrArray


# type aliases
TPTO = TypeVar("TPTO", bound="PTO")
TLOSS = Callable[[FloatOrArray, FloatOrArray], FloatOrArray]


[docs] class PTO: """A power take-off (PTO) object to be used in conjunction with a :py:class:`wecopttool.WEC` object. """
[docs] def __init__(self, ndof: int, kinematics: Union[TStateFunction, ndarray], controller: Optional[TStateFunction] = None, impedance: Optional[ndarray] = None, loss: Optional[TLOSS] = None, names: Optional[list[str]] = None, ) -> None: """Create a PTO object. The :py:class:`wecopttool.pto.PTO` class describes the kinematics, control logic, impedance and/or non-linear power loss of a power take-off system. The forces/moments applied by a :py:class:`wecopttool.pto.PTO` object can be applied to a :py:class:`wecopttool.WEC` object through the :py:attr:`wecopttool.WEC.f_add` property. The power produced by a :py:class:`wecopttool.pto.PTO` object can be used for the :python:`obj_fun` of pseudo-spectral optimization problem when calling :py:meth:`wecopttool.WEC.solve`. Parameters ---------- ndof Number of degrees of freedom. kinematics Transforms state from WEC to PTO frame. May be a matrix (for linear kinematics) or function (for nonlinear kinematics). controller Function with signature :python:`def fun(pto, wec, x_wec, x_opt, waves, nsubsteps):` or matrix with shape (PTO DOFs, WEC DOFs) that converts from the WEC DOFs to the PTO DOFs. impedance Matrix representing the PTO impedance. loss Function that maps flow and effort variables to a non-linear power loss. The output is the dissipated power (loss) in Watts. This should be a positive value. names PTO names. """ self._ndof = ndof # names if names is None: names = [f'PTO_{i}' for i in range(ndof)] elif ndof == 1 and isinstance(names, str): names = [names] self._names = names # kinematics if callable(kinematics): def kinematics_fun(wec, x_wec, x_opt, waves, nsubsteps=1): pos_wec = wec.vec_to_dofmat(x_wec) tmat = self._tmat(wec, nsubsteps) pos_wec_td = np.dot(tmat, pos_wec) return kinematics(pos_wec_td) else: def kinematics_fun(wec, x_wec, x_opt, waves, nsubsteps=1): n = wec.nt*nsubsteps return np.repeat(kinematics[:, :, np.newaxis], n, axis=-1) self._kinematics = kinematics_fun # controller if controller is None: controller = controller_unstructured def force(wec, x_wec, x_opt, waves, nsubsteps=1): return controller(self, wec, x_wec, x_opt, waves, nsubsteps) self._force = force # power if impedance is not None: check_1 = impedance.shape[0] == impedance.shape[1] == 2*self.ndof check_2 = len(impedance.shape) == 3 if not (check_1 and check_2): raise TypeError( "Impedance should have size [2*ndof, 2*ndof, nfreq]" ) for i in range(impedance.shape[2]-1): check_3 = ( np.allclose(np.real(impedance[:, :, i+1]), np.real(impedance[:, :, 0])) ) if not check_3: raise ValueError( "Real component of impedance must be constant for " + "all frequencies." ) impedance_abcd = _make_abcd(impedance, ndof) self._transfer_mat = _make_mimo_transfer_mat(impedance_abcd, ndof) else: self._transfer_mat = None self._impedance = impedance self._loss = loss
@property def ndof(self) -> int: """Number of degrees of freedom.""" return self._ndof @property def names(self) -> ndarray: """DOF Names.""" return self._names @property def kinematics(self) -> TStateFunction: """Kinematics function. """ return self._kinematics @property def force(self) -> TStateFunction: """PTO force in PTO coordinates.""" return self._force @property def impedance(self) -> ndarray: """Impedance matrix.""" return self._impedance @property def loss(self) -> TLOSS: """Nonlinear power loss function with outputs in Watts.""" return self._loss @property def transfer_mat(self) -> ndarray: """Transfer matrix.""" return self._transfer_mat def _tmat(self, wec, nsubsteps: Optional[int] = 1): if nsubsteps==1: tmat = wec.time_mat else: tmat = wec.time_mat_nsubsteps(nsubsteps) return tmat def _fkinematics(self, f_wec: ndarray, wec: TWEC, x_wec: ndarray, x_opt: Optional[ndarray] = None, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> ndarray: """Return time-domain values in the PTO frame. Parameters ---------- f_wec Fourier coefficients of some quantity "f" in the WEC frame. wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ time_mat = self._tmat(wec, nsubsteps) f_wec_td = np.dot(time_mat, f_wec) assert f_wec_td.shape == (wec.nt*nsubsteps, wec.ndof) f_wec_td = np.expand_dims(np.transpose(f_wec_td), axis=0) kinematics_mat = self.kinematics(wec, x_wec, x_opt, waves, nsubsteps) return np.transpose(np.sum(kinematics_mat*f_wec_td, axis=1))
[docs] def position(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> ndarray: """Calculate the PTO position time-series. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ pos_wec = wec.vec_to_dofmat(x_wec) return self._fkinematics(pos_wec, wec, x_wec, x_opt, waves, nsubsteps)
[docs] def velocity(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> ndarray: """Calculate the PTO velocity time-series. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ pos_wec = wec.vec_to_dofmat(x_wec) vel_wec = np.dot(wec.derivative_mat, pos_wec) return self._fkinematics(vel_wec, wec, x_wec, x_opt, waves, nsubsteps)
[docs] def acceleration(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> np.ndarray: """Calculate the PTO acceleration time-series. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ pos_wec = wec.vec_to_dofmat(x_wec) acc_wec = np.dot(wec.derivative2_mat, pos_wec) return self._fkinematics(acc_wec, wec, x_wec, x_opt, waves, nsubsteps)
[docs] def force_on_wec(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> ndarray: """Calculate the PTO force on WEC. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ force_td = self.force(wec, x_wec, x_opt, waves, nsubsteps) assert force_td.shape == (wec.nt*nsubsteps, self.ndof) force_td = np.expand_dims(np.transpose(force_td), axis=0) assert force_td.shape == (1, self.ndof, wec.nt*nsubsteps) kinematics_mat = self.kinematics(wec, x_wec, x_opt, waves, nsubsteps) kinematics_mat = np.transpose(kinematics_mat, (1,0,2)) return np.transpose(np.sum(kinematics_mat*force_td, axis=1))
[docs] def mechanical_power(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> np.ndarray: """Calculate the mechanical power time-series in each PTO DOF for a given system state. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ force_td = self.force(wec, x_wec, x_opt, waves, nsubsteps) vel_td = self.velocity(wec, x_wec, x_opt, waves, nsubsteps) return vel_td * force_td
[docs] def mechanical_energy(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> float: """Calculate the mechanical energy in each PTO DOF for a given system state. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ power_td = self.mechanical_power(wec, x_wec, x_opt, waves, nsubsteps) return np.sum(power_td) * wec.dt/nsubsteps
[docs] def mechanical_average_power(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> float: """Calculate average mechanical power in each PTO DOF for a given system state. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ energy = self.mechanical_energy(wec, x_wec, x_opt, waves, nsubsteps) return energy / wec.tf
[docs] def power_variables(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> tuple[ndarray, ndarray]: """Calculate the power variables (flow q and effort e) time-series in each PTO DOF for a given system state. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ # convert q1 (PTO velocity), e1 (PTO force) # to q2 (flow variable), e2 (effort variable) if self.impedance is not None: q1_td = self.velocity(wec, x_wec, x_opt, waves) e1_td = self.force(wec, x_wec, x_opt, waves) q1 = complex_to_real(td_to_fd(q1_td, False)) e1 = complex_to_real(td_to_fd(e1_td, False)) vars_1 = np.hstack([q1, e1]) vars_1_flat = dofmat_to_vec(vars_1) vars_2_flat = np.dot(self.transfer_mat, vars_1_flat) vars_2 = vec_to_dofmat(vars_2_flat, 2*self.ndof) q2 = vars_2[:, :self.ndof] e2 = vars_2[:, self.ndof:] time_mat = self._tmat(wec, nsubsteps) q2_td = np.dot(time_mat, q2) e2_td = np.dot(time_mat, e2) else: q2_td = self.velocity(wec, x_wec, x_opt, waves, nsubsteps) e2_td = self.force(wec, x_wec, x_opt, waves, nsubsteps) return q2_td, e2_td
[docs] def power(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> ndarray: """Calculate the power time-series in each PTO DOF for a given system state. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ q2_td, e2_td = self.power_variables(wec, x_wec, x_opt, waves, nsubsteps) # power power_out = q2_td * e2_td if self.loss is not None: power_out = power_out + self.loss(q2_td, e2_td) return power_out
[docs] def energy(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> float: """Calculate the energy in each PTO DOF for a given system state. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ power_td = self.power(wec, x_wec, x_opt, waves, nsubsteps) return np.sum(power_td) * wec.dt/nsubsteps
[docs] def average_power(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> float: """Calculate the average power in each PTO DOF for a given system state. Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ energy = self.energy(wec, x_wec, x_opt, waves, nsubsteps) return energy / wec.tf
[docs] def transduced_flow(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> float: """Calculate the transduced flow variable time-series in each PTO DOF for a given system state. Equals the PTO velocity if no impedance is defined. Examples for PTO impedance and corresponding flow variables: - OWC: (pneumatic admittance)^-1 : flow = volumetric air flow - Drive-train: rotational impedance : flow = rotational velocity - Generator: winding impedance: flow = electric current - Drive-train and Generator combined: flow = electric current Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ q2_td, _ = self.power_variables(wec, x_wec, x_opt, waves, nsubsteps) return q2_td
[docs] def transduced_effort(self, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> float: """Calculate the transduced flow variable time-series in each PTO DOF for a given system state. Equals the PTO force if no impedance is defined. Examples for PTO impedance and corresponding effort variables: - OWC: (pneumatic admittance)^-1 : effort = air pressure - Drive-train: rotational impedance : effort = torque - Generator: winding impedance: effort = voltage - Drive-train and Generator combined: effort = voltage Parameters ---------- wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ _, e2_td = self.power_variables(wec, x_wec, x_opt, waves, nsubsteps) return e2_td
[docs] def post_process(self, wec: TWEC, res: Union[OptimizeResult, list], waves: Optional[DataArray] = None, nsubsteps: Optional[int] = 1, ) -> tuple[list[Dataset], list[Dataset]]: """Transform the results from optimization solution to a form that the user can work with directly. Examples -------- The :py:meth:`wecopttool.WEC.solve` method only returns the raw results dictionary produced by :py:func:`scipy.optimize.minimize`. >>> res_opt = wec.solve(waves=wave, obj_fun=pto.average_power, nstate_opt=2*nfreq+1) To get the post-processed results for the :py:class:`wecopttool.pto.PTO`, you may call >>> res_pto_fd, res_pto_td = pto.post_process(wec,res_opt[0],wave) For smoother plots, you can set :python:`nsubsteps` to a value greater than 1. >>> res_pto_fd, res_pto_td = pto.post_process(wec,res_opt, nsubsteps=4) >>> res_pto_td[0].power.plot() Parameters ---------- wec :py:class:`wecopttool.WEC` object. res Results produced by :py:meth:`wecopttool.WEC.solve`. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. Returns ------- results_fd list of :py:class:`xarray.Dataset` with frequency domain results. results_td list of :py:class:`xarray.Dataset` with time domain results. """ def _postproc(wec, res, waves, nsubsteps): create_time = f"{datetime.utcnow()}" x_wec, x_opt = wec.decompose_state(res.x) # position pos_td = self.position(wec, x_wec, x_opt, waves, nsubsteps) pos_fd = wec.td_to_fd(pos_td[::nsubsteps]) # velocity vel_td = self.velocity(wec, x_wec, x_opt, waves, nsubsteps) vel_fd = wec.td_to_fd(vel_td[::nsubsteps]) # acceleration acc_td = self.acceleration(wec, x_wec, x_opt, waves, nsubsteps) acc_fd = wec.td_to_fd(acc_td[::nsubsteps]) # force force_td = self.force(wec, x_wec, x_opt, waves, nsubsteps) force_fd = wec.td_to_fd(force_td[::nsubsteps]) # power elec_power_td = self.power(wec, x_wec, x_opt, waves, nsubsteps) elec_power_fd = wec.td_to_fd(elec_power_td[::nsubsteps]) # mechanical power mech_power_td = self.mechanical_power(wec, x_wec, x_opt, waves, nsubsteps) mech_power_fd = wec.td_to_fd(mech_power_td[::nsubsteps]) # stack mechanical and electrical power power_names = ['mech','elec'] power_fd = np.stack((mech_power_fd,elec_power_fd)) power_td = np.stack((mech_power_td,elec_power_td)) pos_attr = {'long_name': 'Position', 'units': 'm or rad'} vel_attr = {'long_name': 'Velocity', 'units': 'm/s or rad/s'} acc_attr = {'long_name': 'Acceleration', 'units': 'm/s^2 or rad/s^2'} force_attr = {'long_name': 'PTO force or torque', 'units': 'N or Nm'} power_attr = {'long_name': 'Power', 'units': 'W'} mech_power_attr = {'long_name': 'Mechanical power', 'units': 'W'} omega_attr = {'long_name': 'Radial frequency', 'units': 'rad/s'} freq_attr = {'long_name': 'Frequency', 'units': 'Hz'} period_attr = {'long_name': 'Period', 'units': 's'} dof_attr = {'long_name': 'PTO degree of freedom'} time_attr = {'long_name': 'Time', 'units': 's'} type_attr = {'long_name': 'Power type'} t_dat = wec.time_nsubsteps(nsubsteps) results_fd = Dataset( data_vars={ 'pos': (['omega','dof'], pos_fd, pos_attr), 'vel': (['omega','dof'], vel_fd, vel_attr), 'acc': (['omega','dof'], acc_fd, acc_attr), 'force': (['omega','dof'], force_fd, force_attr), 'power': (['type','omega','dof'], power_fd, power_attr), }, coords={ 'omega':('omega', wec.omega, omega_attr), 'freq':('omega', wec.frequency, freq_attr), 'period':('omega', wec.period, period_attr), 'dof':('dof', self.names, dof_attr), 'type':('type', power_names, power_attr)}, attrs={"time_created_utc": create_time} ) results_td = Dataset( data_vars={ 'pos': (['time','dof'], pos_td, pos_attr), 'vel': (['time','dof'], vel_td, vel_attr), 'acc': (['time','dof'], acc_td, acc_attr), 'force': (['time','dof'], force_td, force_attr), 'power': (['type','time','dof'], power_td, power_attr), }, coords={ 'time':('time', t_dat, time_attr), 'dof':('dof', self.names, dof_attr), 'type':('type', power_names, power_attr)}, attrs={"time_created_utc": create_time} ) if self.impedance is not None: #transduced flow and effort variables q2_td, e2_td = self.power_variables(wec, x_wec, x_opt, waves, nsubsteps) q2_fd = wec.td_to_fd(q2_td[::nsubsteps]) e2_fd = wec.td_to_fd(e2_td[::nsubsteps]) q2_attr = {'long_name': 'Transduced Flow', 'units': 'A or m^3/s or rad/s or m/s'} e2_attr = {'long_name': 'Transduced Effort', 'units': 'V or N/m^2 or Nm or Ns'} results_td = results_td.assign({ 'trans_flo': (['time','dof'], q2_td, q2_attr), 'trans_eff': (['time','dof'], e2_td, e2_attr), }) results_fd = results_fd.assign({ 'trans_flo': (['omega','dof'], q2_fd, q2_attr), 'trans_eff': (['omega','dof'], e2_fd, e2_attr), }) return results_fd, results_td results_fd = [] results_td = [] for idx, ires in enumerate(res): ifd, itd = _postproc(wec, ires, waves.sel(realization=idx), nsubsteps) results_fd.append(ifd) results_td.append(itd) return results_fd, results_td
# power conversion chain def _make_abcd(impedance: ndarray, ndof: int) -> ndarray: """Transform the impedance matrix into ABCD form from a MIMO transfer function. Parameters ---------- impedance Matrix representing the PTO impedance. Size 2*n_dof. ndof Number of degrees of freedom. """ z_11 = impedance[:ndof, :ndof, :] # Fu z_12 = impedance[:ndof, ndof:, :] # Fi z_21 = impedance[ndof:, :ndof, :] # Vu z_22 = impedance[ndof:, ndof:, :] # Vi z_12_inv = np.linalg.inv(z_12.T).T mmult = lambda a,b: np.einsum('mnr,mnr->mnr', a, b) abcd_11 = -1 * mmult(z_12_inv, z_11) abcd_12 = z_12_inv abcd_21 = z_21 - mmult(z_22, mmult(z_12_inv, z_11)) abcd_22 = mmult(z_22, z_12_inv) row_1 = np.hstack([abcd_11, abcd_12]) row_2 = np.hstack([abcd_21, abcd_22]) return np.vstack([row_1, row_2]) def _make_mimo_transfer_mat( impedance_abcd: ndarray, ndof: int, ) -> np.ndarray: """Create a block matrix of a MIMO transfer function. Parameters ---------- impedance PTO impedance in ABCD form. ndof Number of degrees of freedom. """ def block(re, im): return np.array([[re, -im], [im, re]]) for idof in range(2*ndof): for jdof in range(2*ndof): Zp = impedance_abcd[idof, jdof, :] re = np.real(Zp) im = np.imag(Zp) # Exclude the sine component of the 2-point wave blocks = [block(ire, iim) for (ire, iim) in zip(re[:-1], im[:-1])] # re[0] added for the zero frequency power loss (DC), could be re[n] blocks = [re[0]] + blocks + [re[-1]] if jdof==0: row = block_diag(*blocks) else: row = np.hstack([row, block_diag(*blocks)]) if idof==0: mat = row else: mat = np.vstack([mat, row]) return mat # controllers
[docs] def controller_unstructured( pto: TPTO, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, ) -> ndarray: """Unstructured numerical optimal controller that returns a time history of PTO forces. Parameters ---------- pto :py:class:`wecopttool.pto.PTO` object. wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. """ tmat = pto._tmat(wec, nsubsteps) x_opt = np.reshape(x_opt[:len(tmat[0])*pto.ndof], (-1, pto.ndof), order='F') return np.dot(tmat, x_opt)
[docs] def controller_pid( pto: TPTO, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, proportional: Optional[bool] = True, integral: Optional[bool] = True, derivative: Optional[bool] = True, saturation: Optional[FloatOrArray] = None, ) -> ndarray: """Proportional-integral-derivative (PID) controller that returns a time history of PTO forces. Parameters ---------- pto :py:class:`wecopttool.pto.PTO` object. wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. proportional True to include proportional gain integral True to include integral gain derivative True to include derivative gain saturation Maximum and minimum control value. Can be symmetric ([ndof]) or asymmetric ([ndof, 2]). """ ndof = pto.ndof force_td_tmp = np.zeros([wec.nt*nsubsteps, ndof]) # PID force idx = 0 def update_force_td(response): nonlocal idx, force_td_tmp gain = np.diag(x_opt[idx*ndof:(idx+1)*ndof]) force_td_tmp = force_td_tmp + np.dot(response, gain.T) idx = idx + 1 return if proportional: vel_td = pto.velocity(wec, x_wec, x_opt, waves, nsubsteps) update_force_td(vel_td) if integral: pos_td = pto.position(wec, x_wec, x_opt, waves, nsubsteps) update_force_td(pos_td) if derivative: acc_td = pto.acceleration(wec, x_wec, x_opt, waves, nsubsteps) update_force_td(acc_td) # Saturation if saturation is not None: saturation = np.atleast_2d(np.squeeze(saturation)) assert len(saturation)==ndof if len(saturation.shape) > 2: raise ValueError("`saturation` must have <= 2 dimensions.") if saturation.shape[1] == 1: f_min, f_max = -1*saturation, saturation elif saturation.shape[1] == 2: f_min, f_max = saturation[:,0], saturation[:,1] else: raise ValueError("`saturation` must have 1 or 2 columns.") force_td_list = [] for i in range(ndof): tmp = np.clip(force_td_tmp[:,i], f_min[i], f_max[i]) force_td_list.append(tmp) force_td = np.array(force_td_list).T else: force_td = force_td_tmp return force_td
[docs] def controller_pi( pto: TPTO, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, saturation: Optional[FloatOrArray] = None, ) -> ndarray: """Proportional-integral (PI) controller that returns a time history of PTO forces. Parameters ---------- pto :py:class:`wecopttool.pto.PTO` object. wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. saturation Maximum and minimum control value. Can be symmetric ([ndof]) or asymmetric ([ndof, 2]). """ force_td = controller_pid( pto, wec, x_wec, x_opt, waves, nsubsteps, True, True, False, saturation, ) return force_td
[docs] def controller_p( pto: TPTO, wec: TWEC, x_wec: ndarray, x_opt: ndarray, waves: Optional[Dataset] = None, nsubsteps: Optional[int] = 1, saturation: Optional[FloatOrArray] = None, ) -> ndarray: """Proportional (P) controller that returns a time history of PTO forces. Parameters ---------- pto :py:class:`wecopttool.pto.PTO` object. wec :py:class:`wecopttool.WEC` object. x_wec WEC dynamic state. x_opt Optimization (control) state. waves :py:class:`xarray.Dataset` with the structure and elements shown by :py:mod:`wecopttool.waves`. nsubsteps Number of steps between the default (implied) time steps. A value of :python:`1` corresponds to the default step length. saturation Maximum and minimum control value. Can be symmetric ([ndof]) or asymmetric ([ndof, 2]). """ force_td = controller_pid( pto, wec, x_wec, x_opt, waves, nsubsteps, True, False, False, saturation, ) return force_td
# utilities def nstate_unstructured(nfreq: int, ndof: int) -> int: """ Number of states needed to represent an unstructured controller. Parameters ---------- nfreq Number of frequencies. ndof Number of degrees of freedom. """ return 2*nfreq*ndof def nstate_pid( nterm: int, ndof: int, ) -> int: """ Number of states needed to represent an unstructured controller. Parameters ---------- nterm Number of terms (e.g. 1 for P, 2 for PI, 3 for PID). ndof Number of degrees of freedom. """ return int(nterm*ndof)