Source code for vorlap.computations

"""
Utility functions for the VorLap package.
"""

import numpy as np
import math
from typing import List, Dict, Tuple, Optional, Union, Any

from .structs import AirfoilFFT, Component, VIV_Params


#@profile
[docs] def compute_thrust_torque_spectrum_optimized(components: List[Component], affts: Dict[str, AirfoilFFT], viv_params: VIV_Params, natfreqs: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Optimized version of compute_thrust_torque_spectrum using cached interpolators and vectorized operations. Same interface and outputs as the original function, but with significant performance improvements. Args: components: List of structural components including geometry, orientation, and segment-wise parameters. affts: Dictionary mapping airfoil IDs to their FFT-derived lift/drag/moment spectra. viv_params: Encapsulates all analysis parameters, including inflow, rotation axis, fluid properties, and plotting settings. natfreqs: Natural frequencies to compare against. Returns: Tuple containing: - percdiff_matrix: Matrix of percent differences between shedding frequencies and natural frequencies. - percdiff_info: Matrix of strings with information about the worst percent differences. - total_global_force_vector: Total global force vector for each inflow speed and azimuth. - total_global_moment_vector: Total global moment vector for each inflow speed and azimuth. - global_force_vector_nodes: Global force vector for each node over time. """ from .interpolation import interpolate_fft_spectrum_optimized # Pre-cache interpolators for all airfoils for afft in affts.values(): afft._cache_interpolators() inflow_speeds = viv_params.inflow_speeds azimuths = viv_params.azimuths rotation_axis = viv_params.rotation_axis fluid_density = viv_params.fluid_density fluid_dynamicviscosity = viv_params.fluid_dynamicviscosity n_harmonic = viv_params.n_harmonic amplitude_coeff_cutoff = viv_params.amplitude_coeff_cutoff n_freq_depth = viv_params.n_freq_depth n_inflow = len(inflow_speeds) n_az = len(azimuths) total_global_force_vector = np.zeros((n_inflow, n_az, 3)) total_global_moment_vector = np.zeros((n_inflow, n_az, 3)) percdiff_matrix = np.ones((n_inflow, n_az)) * 1000 percdiff_info = np.empty((n_inflow, n_az), dtype=object) total_nodes = sum([comp.shape_xyz.shape[0] for comp in components]) global_force_vector_nodes = np.zeros((len(viv_params.output_time), 3, total_nodes)) for i_inflow in range(n_inflow): Vinf = np.array(viv_params.inflow_vec) / np.linalg.norm(viv_params.inflow_vec) * inflow_speeds[i_inflow] for j_azi in range(n_az): # Negative since rotating the inflow in the negative direction is the same as rotating the structure in the positive Vin_rotated = rotate_vector(Vinf, rotation_axis, -azimuths[j_azi]) inode = 0 for comp in components: N_pts = comp.shape_xyz.shape[0] for ipt in range(N_pts): inode += 1 global_pos = comp.shape_xyz_global[ipt] chord = comp.chord[ipt] afid = comp.airfoil_ids[ipt] afft = affts.get(afid, affts["default"]) # get the id, or use the default chord_vector = comp.chord_vector[ipt, :] normal_vector = comp.normal_vector[ipt, :] V_chord = np.dot(Vin_rotated, chord_vector / np.linalg.norm(chord_vector)) V_normal = np.dot(Vin_rotated, normal_vector / np.linalg.norm(normal_vector)) aoa_rad = math.atan2(V_normal, V_chord) # Using atan2 for correct quadrant aoa_deg = math.degrees(aoa_rad) V_eff = math.sqrt(V_normal**2 + V_chord**2) # Fundamental assumption that spanwise flow doesn't impact lift and drag Re = fluid_density * V_eff * chord / fluid_dynamicviscosity if ipt == 0: local_length = 0.0 #TODO: is there a better way to do this without going full on nodes and elements? Maybe just tell people this is how it is calculated # print("AOA: ",aoa_deg, "Azi: ",azimuths[j_azi], "Vinf: ",Vinf, "Veff: ",V_eff) else: local_length = np.linalg.norm(comp.shape_xyz[ipt, :] - comp.shape_xyz[ipt-1, :]) q = 0.5 * fluid_density * V_eff**2 * chord * local_length # Optimized interpolation: get all three fields at once results = interpolate_fft_spectrum_optimized(afft, Re, aoa_deg, ['CL', 'CD', 'CF'], n_freq_depth=n_freq_depth) ST_cl, amps_cl, phases_cl = results['CL'] ST_cd, amps_cd, phases_cd = results['CD'] ST_cf, amps_cf, phases_cf = results['CF'] Lifts = amps_cl[0] * q Drags = amps_cd[0] * q # print("CL: ",amps_cl[0], "q: ", q, "Lifts: ", Lifts) # print("CD: ",amps_cd[0], "q: ", q, "Drags: ", Drags) # Calculate the global force vector chord_vector_rotated = rotate_vector(chord_vector, rotation_axis, azimuths[j_azi]*0) # no need to double rotate, keep at 0. local_yaw = math.degrees(math.atan2(chord_vector_rotated[1], chord_vector_rotated[0])) normal_vector_rotated = rotate_vector(normal_vector, rotation_axis, azimuths[j_azi]) local_roll = math.degrees(math.atan2(normal_vector_rotated[2], normal_vector_rotated[1])) local_force_vector = np.array([Drags, Lifts, 0.0]) force_vector_rolled = rotate_vector(local_force_vector, np.array([1.0, 0, 0]), local_roll) global_force_vector = rotate_vector(force_vector_rolled, np.array([0, 0, 1.0]), local_yaw) total_global_force_vector[i_inflow, j_azi, :] += global_force_vector total_global_moment_vector[i_inflow, j_azi, :] += global_force_vector * global_pos # print("total_global_force_vector: ", total_global_force_vector[i_inflow, j_azi, :]) STlength = chord * abs(math.sin(math.radians(aoa_deg))) frequencies_cf = ST_cf * (V_eff / STlength) # Record the worst case overlap, and where it happened for lstrouhaul in range(min(n_freq_depth, len(frequencies_cf))): if amps_cf[lstrouhaul] > amplitude_coeff_cutoff: for jnatfreq in range(natfreqs.shape[0]): for kharmonic in range(1, n_harmonic + 1): percdiff = (frequencies_cf[lstrouhaul] - natfreqs[jnatfreq] * kharmonic) / (natfreqs[jnatfreq] * kharmonic) * 100 if percdiff_matrix[i_inflow, j_azi] > abs(percdiff): percdiff_matrix[i_inflow, j_azi] = abs(percdiff) percdiff_info[i_inflow, j_azi] = f"{percdiff} percdiff Occurs for NatFreq: {natfreqs[jnatfreq]} at Harmonic: {kharmonic} with Shedding frequency: {frequencies_cf[lstrouhaul]} (Strouhaul depth {lstrouhaul}) AmplitudeCoeff: {amps_cf[lstrouhaul]} in Comp: {comp.id} at pt#: {ipt+1}" # Output data for just the requested point if viv_params.output_azimuth_vinf[0] == azimuths[j_azi] and viv_params.output_azimuth_vinf[1] == inflow_speeds[i_inflow]: # Recreate the time signal for the sampled ST information cl_signal = reconstruct_signal(ST_cl * (V_eff / STlength), amps_cl, phases_cl, viv_params.output_time) cd_signal = reconstruct_signal(ST_cd * (V_eff / STlength), amps_cd, phases_cd, viv_params.output_time) # Create force vectors for each time point local_force_vectors = [] for cd, cl in zip(cd_signal, cl_signal): local_force_vectors.append(np.array([cd * q, cl * q, 0.0])) # Rotate force vectors force_vector_rolled_list = [rotate_vector(local_force, np.array([1.0, 0, 0]), local_roll) for local_force in local_force_vectors] global_force_vector_list = [rotate_vector(force_rolled, np.array([0, 0, 1.0]), local_yaw) for force_rolled in force_vector_rolled_list] # Store in the output array global_force_vector_nodes[:, :, inode-1] = np.array(global_force_vector_list) return percdiff_matrix, percdiff_info, total_global_force_vector, total_global_moment_vector, global_force_vector_nodes
#@profile
[docs] def compute_thrust_torque_spectrum(components: List[Component], affts: Dict[str, AirfoilFFT], viv_params: VIV_Params, natfreqs: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Computes the mean thrust and torque, as well as their frequency-domain spectra, over a range of inflow speeds and azimuthal orientations. Args: components: List of structural components including geometry, orientation, and segment-wise parameters. affts: Dictionary mapping airfoil IDs to their FFT-derived lift/drag/moment spectra. viv_params: Encapsulates all analysis parameters, including inflow, rotation axis, fluid properties, and plotting settings. natfreqs: Natural frequencies to compare against. Returns: percdiff_matrix: Matrix of percent differences between shedding frequencies and natural frequencies. percdiff_info: Matrix of strings with information about the worst percent differences. total_global_force_vector: Total global force vector for each inflow speed and azimuth. total_global_moment_vector: Total global moment vector for each inflow speed and azimuth. global_force_vector_nodes: Global force vector for each node over time. Notes: - Airfoil FFT data is interpolated by Reynolds number and angle of attack. - The segment's local AOA determines how CL and CD spectra are rotated into global inflow and torque components. """ inflow_speeds = viv_params.inflow_speeds azimuths = viv_params.azimuths rotation_axis = viv_params.rotation_axis fluid_density = viv_params.fluid_density fluid_dynamicviscosity = viv_params.fluid_dynamicviscosity n_harmonic = viv_params.n_harmonic amplitude_coeff_cutoff = viv_params.amplitude_coeff_cutoff n_freq_depth = viv_params.n_freq_depth n_inflow = len(inflow_speeds) n_az = len(azimuths) total_global_force_vector = np.zeros((n_inflow, n_az, 3)) total_global_moment_vector = np.zeros((n_inflow, n_az, 3)) percdiff_matrix = np.ones((n_inflow, n_az)) * 1000 percdiff_info = np.empty((n_inflow, n_az), dtype=object) total_nodes = sum([comp.shape_xyz.shape[0] for comp in components]) global_force_vector_nodes = np.zeros((len(viv_params.output_time), 3, total_nodes)) for i_inflow in range(n_inflow): Vinf = np.array(viv_params.inflow_vec) / np.linalg.norm(viv_params.inflow_vec) * inflow_speeds[i_inflow] for j_azi in range(n_az): # Negative since rotating the inflow in the negative direction is the same as rotating the structure in the positive Vin_rotated = rotate_vector(Vinf, rotation_axis, -azimuths[j_azi]) inode = 0 for comp in components: N_pts = comp.shape_xyz.shape[0] for ipt in range(N_pts): inode += 1 global_pos = comp.shape_xyz_global[ipt] chord = comp.chord[ipt] afid = comp.airfoil_ids[ipt] afft = affts.get(afid, affts["default"]) # get the id, or use the default chord_vector = comp.chord_vector[ipt, :] normal_vector = comp.normal_vector[ipt, :] V_chord = np.dot(Vin_rotated, chord_vector / np.linalg.norm(chord_vector)) V_normal = np.dot(Vin_rotated, normal_vector / np.linalg.norm(normal_vector)) aoa_rad = math.atan2(V_normal, V_chord) # Using atan2 for correct quadrant aoa_deg = math.degrees(aoa_rad) V_eff = math.sqrt(V_normal**2 + V_chord**2) # Fundamental assumption that spanwise flow doesn't impact lift and drag Re = fluid_density * V_eff * chord / fluid_dynamicviscosity q = 0.5 * fluid_density * V_eff**2 * chord # Interpolate FFT spectrum ST_cl, amps_cl, phases_cl = interpolate_fft_spectrum(afft, Re, aoa_deg, 'CL', n_freq_depth=n_freq_depth) ST_cd, amps_cd, phases_cd = interpolate_fft_spectrum(afft, Re, aoa_deg, 'CD', n_freq_depth=n_freq_depth) ST_cf, amps_cf, phases_cf = interpolate_fft_spectrum(afft, Re, aoa_deg, 'CF', n_freq_depth=n_freq_depth) Lifts = amps_cl[0] * q Drags = amps_cd[0] * q # Calculate the global force vector chord_vector_rotated = rotate_vector(chord_vector, rotation_axis, azimuths[j_azi]) local_yaw = math.degrees(math.atan2(chord_vector_rotated[1], chord_vector_rotated[0])) normal_vector_rotated = rotate_vector(normal_vector, rotation_axis, azimuths[j_azi]) local_roll = math.degrees(math.atan2(normal_vector_rotated[2], normal_vector_rotated[1])) local_force_vector = np.array([Drags, Lifts, 0.0]) force_vector_rolled = rotate_vector(local_force_vector, np.array([1.0, 0, 0]), local_roll) global_force_vector = rotate_vector(force_vector_rolled, np.array([0, 0, 1.0]), local_yaw) total_global_force_vector[i_inflow, j_azi, :] += global_force_vector total_global_moment_vector[i_inflow, j_azi, :] += global_force_vector * global_pos STlength = chord * abs(math.sin(math.radians(aoa_deg))) frequencies_cf = ST_cf * (V_eff / STlength) # Record the worst case overlap, and where it happened for lstrouhaul in range(min(n_freq_depth, len(frequencies_cf))): if amps_cf[lstrouhaul] > amplitude_coeff_cutoff: for jnatfreq in range(natfreqs.shape[0]): for kharmonic in range(1, n_harmonic + 1): percdiff = (frequencies_cf[lstrouhaul] - natfreqs[jnatfreq] * kharmonic) / (natfreqs[jnatfreq] * kharmonic) * 100 if percdiff_matrix[i_inflow, j_azi] > abs(percdiff): percdiff_matrix[i_inflow, j_azi] = abs(percdiff) percdiff_info[i_inflow, j_azi] = f"{percdiff} percdiff Occurs for NatFreq: {natfreqs[jnatfreq]} at Harmonic: {kharmonic} with Shedding frequency: {frequencies_cf[lstrouhaul]} (Strouhaul depth {lstrouhaul}) AmplitudeCoeff: {amps_cf[lstrouhaul]} in Comp: {comp.id} at pt#: {ipt+1}" # Output data for just the requested point if viv_params.output_azimuth_vinf[0] == azimuths[j_azi] and viv_params.output_azimuth_vinf[1] == inflow_speeds[i_inflow]: # Recreate the time signal for the sampled ST information cl_signal = reconstruct_signal(ST_cl * (V_eff / STlength), amps_cl, phases_cl, viv_params.output_time) cd_signal = reconstruct_signal(ST_cd * (V_eff / STlength), amps_cd, phases_cd, viv_params.output_time) # Create force vectors for each time point local_force_vectors = [] for cd, cl in zip(cd_signal, cl_signal): local_force_vectors.append(np.array([cd * q, cl * q, 0.0])) # Rotate force vectors force_vector_rolled_list = [rotate_vector(local_force, np.array([1.0, 0, 0]), local_roll) for local_force in local_force_vectors] global_force_vector_list = [rotate_vector(force_rolled, np.array([0, 0, 1.0]), local_yaw) for force_rolled in force_vector_rolled_list] # Store in the output array global_force_vector_nodes[:, :, inode-1] = np.array(global_force_vector_list) return percdiff_matrix, percdiff_info, total_global_force_vector, total_global_moment_vector, global_force_vector_nodes
# """ # reconstruct_signal(freqs::Vector{Float64}, amps::Vector{Float64}, phases::Vector{Float64}, t::Vector{Float64}) # Reconstructs a time-domain signal from frequency, amplitude (peak), and phase (radians). # - Assumes the DC term is included as `freqs[i]==0` with `amps[i]` equal to the mean value. # - For f>0, `amps[i]` is the peak amplitude (not RMS). # # Arguments # - `freqs`: Frequency vector (Hz) # - `amps`: Amplitudes corresponding to each frequency # - `phases`: Phase offsets (radians) corresponding to each frequency # - `t`: Time vector (s) # # Returns # - `signal`: Time-domain signal as a vector of Float64 # """ # function reconstruct_signal(freqs::Vector{Float64}, amps::Vector{Float64}, phases::Vector{Float64}, tvec::Vector{Float64}) # @assert length(freqs) == length(amps) == length(phases) "freqs/amps/phases must be same length" # @assert length(tvec) ≥ 2 "tvec must have at least 2 samples" # dt = tvec[2] - tvec[1] # fs = 1/dt # fnyq = fs/2 # if maximum(freqs) > fnyq + eps(fnyq) # @warn "Max frequency $(maximum(freqs)) exceeds Nyquist $(fnyq) Hz implied by tvec; reconstruction will alias." # end # signal = zeros(Float64, length(tvec)) # # DC (mean) # for (A, f) in zip(amps, freqs) # if iszero(f) # signal .+= A # end # end # # Oscillatory terms (cosine with FFT phases; amps = peak) # ω = 2π .* freqs # for i in eachindex(freqs) # f = freqs[i] # if f > 0.0 # A = amps[i] # φ = phases[i] # signal += A .* cos.(ω[i] .* tvec .+ φ) # end # end # return signal # end
[docs] def reconstruct_signal(freqs: np.ndarray, amps: np.ndarray, phases: np.ndarray, tvec: np.ndarray) -> np.ndarray: """ Reconstruct a time-domain signal from frequency, peak amplitude, and phase (radians). Assumptions / conventions: - DC term(s): entries where freqs == 0 carry the mean value in `amps`; all such entries are summed. - For f > 0, `amps` are **peak** amplitudes (not RMS) and phases follow a cosine convention: cos(ωt + φ). - Negative frequencies, if present, are ignored (assumed redundant w.r.t. positive freqs + phases). Args: freqs: Array of frequencies [Hz]. amps: Array of peak amplitudes corresponding to each frequency. phases: Array of phase offsets [rad] corresponding to each frequency. tvec: Time vector [s] (must have at least 2 samples). Returns: Reconstructed time-domain signal (float64), shape = (len(tvec),). """ freqs = np.asarray(freqs, dtype=np.float64) amps = np.asarray(amps, dtype=np.float64) phases = np.asarray(phases, dtype=np.float64) tvec = np.asarray(tvec, dtype=np.float64) if not (len(freqs) == len(amps) == len(phases)): raise ValueError("freqs/amps/phases must be the same length") if tvec.size < 2: raise ValueError("tvec must have at least 2 samples") dt = tvec[1] - tvec[0] fs = 1.0 / dt fnyq = 0.5 * fs # Nyquist check with epsilon tolerance eps = np.finfo(np.float64).eps fmax = float(np.max(freqs)) if freqs.size else 0.0 if fmax > (fnyq + eps * max(1.0, fnyq)): print(f"Warning: Max frequency {fmax:.6g} Hz exceeds Nyquist {fnyq:.6g} Hz implied by tvec; reconstruction may alias.") signal = np.zeros(tvec.shape[0], dtype=np.float64) # DC term(s): sum all freqs == 0 # Use a tolerance for zero comparison to be robust to tiny numerical noise. zero_mask = np.isclose(freqs, 0.0, rtol=0.0, atol=eps) if np.any(zero_mask): signal += np.sum(amps[zero_mask]) # Oscillatory terms: f > 0 using cosine convention pos_mask = freqs > 0.0 if np.any(pos_mask): fpos = freqs[pos_mask] Apos = amps[pos_mask] Ppos = phases[pos_mask] # ω = 2π f omega = 2.0 * np.pi * fpos # Efficient broadcasting: (n_pos, 1) * (1, n_t) + (n_pos, 1) # then sum over rows to get (n_t,) # But to keep memory modest, loop over components (usually sparse spectral lines). for w, A, phi in zip(omega, Apos, Ppos): signal += A * np.cos(w * tvec + phi) return signal
#@profile
[docs] def rotate_vector(vec: np.ndarray, axis: np.ndarray, angle_deg: float) -> np.ndarray: """ Rotate a 3D vector around a given axis using Rodrigues' rotation formula. Args: vec: Vector to rotate (length-3, any direction). axis: Rotation axis (length-3, not required to be normalized). angle_deg: Rotation angle in degrees (positive is right-hand rule about the axis). Returns: Rotated 3D vector (length-3). """ θ = math.radians(angle_deg) k = axis / np.sqrt(axis[0]**2 + axis[1]**2 + axis[2]**2) # Normalize axis v = vec return v * math.cos(θ) + np.cross(k, v) * math.sin(θ) + k * np.dot(k, v) * (1 - math.cos(θ))
[docs] def rotationMatrix(euler: np.ndarray) -> np.ndarray: """ Compute a 3×3 rotation matrix from Euler angles using the ZYX convention (yaw–pitch–roll). The angles are provided in **degrees** and applied in **Z–Y–X** order (extrinsic frame), meaning: 1. Rotate about global Z axis (yaw) 2. Then about the global Y axis (pitch) 3. Then about the global X axis (roll) Args: euler: Euler angles `[roll, pitch, yaw]` in **degrees**. Returns: A 3×3 rotation matrix for transforming a local vector into the global frame. Example: >>> euler = np.array([30.0, 15.0, 60.0]) # roll, pitch, yaw in degrees >>> R = rotationMatrix(euler) >>> v_local = np.array([1.0, 0.0, 0.0]) >>> v_global = R @ v_local """ cz, sz = math.cos(math.radians(euler[2])), math.sin(math.radians(euler[2])) cy, sy = math.cos(math.radians(euler[1])), math.sin(math.radians(euler[1])) cx, sx = math.cos(math.radians(euler[0])), math.sin(math.radians(euler[0])) Rz = np.array([ [cz, -sz, 0.0], [sz, cz, 0.0], [0.0, 0.0, 1.0] ]) Ry = np.array([ [cy, 0.0, sy], [0.0, 1.0, 0.0], [-sy, 0.0, cy] ]) Rx = np.array([ [1.0, 0.0, 0.0], [0.0, cx, -sx], [0.0, sx, cx] ]) R_global = Rz @ Ry @ Rx return R_global
# Import from fileio to avoid circular imports # Import at runtime to avoid circular imports
[docs] def interpolate_fft_spectrum(afft, Re_val, AOA_val, field, n_freq_depth=None): """ Wrapper function to avoid circular imports. The actual implementation is in interpolation.py. Args: afft: AirfoilFFT struct containing FFT results. Re_val: Desired Reynolds number. AOA_val: Desired angle of attack (degrees). field: String ('CL', 'CD', 'CM', or 'CF') indicating which force coefficient to interpolate. n_freq_depth: Optional number of frequencies to return. Returns: Tuple of (freqs, amp_out, phase_out) arrays. """ from .interpolation import interpolate_fft_spectrum as _interpolate_fft_spectrum return _interpolate_fft_spectrum(afft, Re_val, AOA_val, field, n_freq_depth)