Source code for vorlap.interpolation

from scipy import interpolate
from vorlap.structs import AirfoilFFT


import numpy as np


from typing import Dict, List, Tuple, Optional


def _find_cells_and_weights(Re_grid: np.ndarray,
                            AOA_grid: np.ndarray,
                            Re_q: np.ndarray,
                            AOA_q: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Find cells and weights for bilinear interpolation.
    
    For each query (Re_q, AOA_q), clamp to flat-extrapolate and return:
        i,j: lower cell indices
        t,u: local linear coordinates in [0,1] within that cell
        
    Args:
        Re_grid: Reynolds number grid values.
        AOA_grid: Angle of attack grid values.
        Re_q: Query Reynolds number values.
        AOA_q: Query angle of attack values.
        
    Returns:
        Tuple of (i, j, t, u) arrays for interpolation.
    """
    Re = Re_grid
    AoA = AOA_grid

    # clamp for flat extrapolation
    Re_q = np.clip(Re_q, Re[0], Re[-1])
    AOA_q = np.clip(AOA_q, AoA[0], AoA[-1])

    # locate lower cell indices
    i = np.searchsorted(Re, Re_q, side='right') - 1
    j = np.searchsorted(AoA, AOA_q, side='right') - 1
    i = np.clip(i, 0, len(Re) - 2)
    j = np.clip(j, 0, len(AoA) - 2)

    Re0 = Re[i]
    Re1 = Re[i + 1]
    AoA0 = AoA[j]
    AoA1 = AoA[j + 1]

    # local barycentric coords
    with np.errstate(divide='ignore', invalid='ignore'):
        t = np.divide(Re_q - Re0, Re1 - Re0, out=np.zeros_like(Re_q), where=(Re1 != Re0))
        u = np.divide(AOA_q - AoA0, AoA1 - AoA0, out=np.zeros_like(AOA_q), where=(AoA1 != AoA0))

    return i, j, t, u

def _bilinear_apply_tensor(F: np.ndarray,
                           i: np.ndarray,
                           j: np.ndarray,
                           t: np.ndarray,
                           u: np.ndarray,
                           n_freq_depth: Optional[int] = None) -> np.ndarray:
    """
    Apply bilinear interpolation to a tensor at query points.
    
    Args:
        F: Input tensor of shape (NR, NA, K).
        i: Lower cell indices in Reynolds direction.
        j: Lower cell indices in AOA direction.
        t: Local coordinates in Reynolds direction [0,1].
        u: Local coordinates in AOA direction [0,1].
        n_freq_depth: Optional depth limit for frequency dimension.
        
    Returns:
        Interpolated values of shape (nq, K_sel).
    """
    NR, NA, K = F.shape
    if n_freq_depth is None or n_freq_depth > K:
        n_freq_depth = K

    # slice frequencies up to depth
    F = F[:, :, :n_freq_depth]

    # gather 4 corners (nq, K_sel) via advanced indexing
    f00 = F[i,     j,     :]  # lower-left
    f10 = F[i + 1, j,     :]  # right
    f01 = F[i,     j + 1, :]  # up
    f11 = F[i + 1, j + 1, :]  # up-right

    w00 = (1 - t) * (1 - u)
    w10 = t * (1 - u)
    w01 = (1 - t) * u
    w11 = t * u

    # expand weights to (nq, 1) to broadcast along K
    out = (w00[:, None] * f00 +
           w10[:, None] * f10 +
           w01[:, None] * f01 +
           w11[:, None] * f11)
    return out

# interpolation.py (continued)
[docs] def interpolate_fft_spectrum_optimized(afft: AirfoilFFT, Re_val: float, AOA_val: float, fields: List[str], n_freq_depth: Optional[int] = None) -> Dict[str, Tuple[np.ndarray, np.ndarray, np.ndarray]]: """ Vectorized, fast bilinear interpolation with flat extrapolation for multiple fields at once. Args: afft: AirfoilFFT struct containing FFT results. Re_val: Desired Reynolds number. AOA_val: Desired angle of attack (degrees). fields: List of field strings ('CL', 'CD', 'CM', 'CF') to interpolate. n_freq_depth: Optional number of frequencies to return. Returns: Dictionary mapping field names to (ST, Amp, Pha) tuples with arrays of length n_freq_depth. """ # inputs -> 1D arrays to allow easy batching later (for now 1 query) Re_q = np.atleast_1d(Re_val).astype(float) AOA_q = np.atleast_1d(AOA_val).astype(float) # compute indices/weights once i, j, t, u = _find_cells_and_weights(afft.Re, afft.AOA, Re_q, AOA_q) out: Dict[str, Tuple[np.ndarray, np.ndarray, np.ndarray]] = {} for field in fields: if field == 'CL': STsrc, Asrc, Psrc = afft.CL_ST, afft.CL_Amp, afft.CL_Pha elif field == 'CD': STsrc, Asrc, Psrc = afft.CD_ST, afft.CD_Amp, afft.CD_Pha elif field == 'CF': STsrc, Asrc, Psrc = afft.CF_ST, afft.CF_Amp, afft.CF_Pha elif field == 'CM': STsrc, Asrc, Psrc = afft.CM_ST, afft.CM_Amp, afft.CM_Pha else: raise ValueError(f"Invalid field symbol: {field}") # Each returns (nq, K_sel). We have nq==1, so take [0] ST = _bilinear_apply_tensor(STsrc, i, j, t, u, n_freq_depth)[0] Amp = _bilinear_apply_tensor(Asrc, i, j, t, u, n_freq_depth)[0] Pha = _bilinear_apply_tensor(Psrc, i, j, t, u, n_freq_depth)[0] out[field] = (ST, Amp, Pha) return out
[docs] def interpolate_fft_spectrum_batch(afft: AirfoilFFT, Re_vals: np.ndarray, AOA_vals: np.ndarray, field: str, n_freq_depth: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Batch interpolation for multiple Re/AOA points simultaneously. Args: afft: AirfoilFFT struct containing FFT results with cached interpolators. Re_vals: Array of Reynolds numbers. AOA_vals: Array of angles of attack (degrees). field: Field string ('CL', 'CD', 'CM', 'CF') to interpolate. n_freq_depth: Optional number of frequencies to return. Returns: Tuple containing: - ST_out: Array of shape [n_points, n_freq] - amp_out: Array of shape [n_points, n_freq] - phase_out: Array of shape [n_points, n_freq] """ # Ensure interpolators are cached afft._cache_interpolators() if n_freq_depth is None: n_freq_depth = afft.CL_ST.shape[2] else: n_freq_depth = min(n_freq_depth, afft.CL_ST.shape[2]) n_points = len(Re_vals) points = np.column_stack([Re_vals, AOA_vals]) ST_out = np.zeros((n_points, n_freq_depth)) amp_out = np.zeros((n_points, n_freq_depth)) phase_out = np.zeros((n_points, n_freq_depth)) # Get the appropriate cached interpolators if field == 'CL': st_interps, amp_interps, pha_interps = afft._cl_st_interps, afft._cl_amp_interps, afft._cl_pha_interps elif field == 'CD': st_interps, amp_interps, pha_interps = afft._cd_st_interps, afft._cd_amp_interps, afft._cd_pha_interps elif field == 'CF': st_interps, amp_interps, pha_interps = afft._cf_st_interps, afft._cf_amp_interps, afft._cf_pha_interps else: raise ValueError(f"Invalid field symbol: {field}") # Batch interpolation for k in range(n_freq_depth): ST_out[:, k] = st_interps[k](points) amp_out[:, k] = amp_interps[k](points) phase_out[:, k] = pha_interps[k](points) return ST_out, amp_out, phase_out
[docs] def interpolate_fft_spectrum(afft: AirfoilFFT, Re_val: float, AOA_val: float, field: str, n_freq_depth: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Interpolate the FFT amplitude and phase spectra using bilinear interpolation over the stored Re × AOA grid. Args: afft: AirfoilFFT struct containing FFT results. Re_val: Desired Reynolds number. AOA_val: Desired angle of attack (degrees). field: String ('CL', 'CD', 'CM', or 'CF') indicating which force coefficient to interpolate. n_freq_depth: Optional number of frequencies to return. If None, returns all frequencies. Returns: Tuple containing: - freqs: Vector of frequency values. - amp_out: Vector of interpolated amplitudes at each frequency. - phase_out: Vector of interpolated phases at each frequency. Note: - The interpolation is performed independently at each frequency index in the spectrum. - Assumes consistent frequency axis across the full 3D data structure. - Returns values suitable for reconstructing time-domain or frequency-domain force estimates. """ if field == 'CL': STs, amps, phases = afft.CL_ST, afft.CL_Amp, afft.CL_Pha elif field == 'CD': STs, amps, phases = afft.CD_ST, afft.CD_Amp, afft.CD_Pha elif field == 'CM': STs, amps, phases = afft.CM_ST, afft.CM_Amp, afft.CM_Pha elif field == 'CF': STs, amps, phases = afft.CF_ST, afft.CF_Amp, afft.CF_Pha else: raise ValueError(f"Invalid field symbol: {field}") if n_freq_depth is None: n_freq_depth = STs.shape[2] # Use all frequencies n_freq_depth = min(n_freq_depth, STs.shape[2]) # Ensure we don't exceed available frequencies ST_out = np.zeros(n_freq_depth) amp_out = np.zeros(n_freq_depth) phase_out = np.zeros(n_freq_depth) for k in range(n_freq_depth): # Create interpolation functions for this frequency st_interp = interpolate.RegularGridInterpolator( (afft.Re, afft.AOA), STs[:, :, k], bounds_error=False, fill_value=None ) amp_interp = interpolate.RegularGridInterpolator( (afft.Re, afft.AOA), amps[:, :, k], bounds_error=False, fill_value=None ) pha_interp = interpolate.RegularGridInterpolator( (afft.Re, afft.AOA), phases[:, :, k], bounds_error=False, fill_value=None ) # Interpolate at the requested point ST_out[k] = st_interp(np.array([Re_val, AOA_val])) amp_out[k] = amp_interp(np.array([Re_val, AOA_val])) phase_out[k] = pha_interp(np.array([Re_val, AOA_val])) return ST_out, amp_out, phase_out
# def interpolate_fft_spectrum_optimized(afft: AirfoilFFT, Re_val: float, AOA_val: float, # fields: List[str], n_freq_depth: Optional[int] = None) -> Dict[str, Tuple[np.ndarray, np.ndarray, np.ndarray]]: # """ # Optimized version that interpolates multiple fields simultaneously using cached interpolators. # Args: # afft: AirfoilFFT struct containing FFT results with cached interpolators. # Re_val: Desired Reynolds number. # AOA_val: Desired angle of attack (degrees). # fields: List of field strings ('CL', 'CD', 'CM', 'CF') to interpolate. # n_freq_depth: Optional number of frequencies to return. If None, returns all frequencies. # Returns: # Dictionary mapping field names to (ST_out, amp_out, phase_out) tuples. # """ # # Ensure interpolators are cached # afft._cache_interpolators() # if n_freq_depth is None: # n_freq_depth = afft.CL_ST.shape[2] # else: # n_freq_depth = min(n_freq_depth, afft.CL_ST.shape[2]) # # Pre-compute the interpolation point as array to avoid repeated array creation # point = np.array([Re_val, AOA_val]) # results = {} # for field in fields: # ST_out = np.zeros(n_freq_depth) # amp_out = np.zeros(n_freq_depth) # phase_out = np.zeros(n_freq_depth) # # Get the appropriate cached interpolators # if field == 'CL': # st_interps, amp_interps, pha_interps = afft._cl_st_interps, afft._cl_amp_interps, afft._cl_pha_interps # elif field == 'CD': # st_interps, amp_interps, pha_interps = afft._cd_st_interps, afft._cd_amp_interps, afft._cd_pha_interps # elif field == 'CF': # st_interps, amp_interps, pha_interps = afft._cf_st_interps, afft._cf_amp_interps, afft._cf_pha_interps # elif field == 'CM': # # CM not cached in current implementation, fall back to original method # return {field: interpolate_fft_spectrum(afft, Re_val, AOA_val, field, n_freq_depth)} # else: # raise ValueError(f"Invalid field symbol: {field}") # # Vectorized interpolation using cached interpolators # for k in range(n_freq_depth): # ST_out[k] = st_interps[k](point) # amp_out[k] = amp_interps[k](point) # phase_out[k] = pha_interps[k](point) # results[field] = (ST_out, amp_out, phase_out) # return results
[docs] def resample_airfoil(xy: np.ndarray, npoints: int = 200) -> np.ndarray: """ Resample the given airfoil shape using uniform x-spacing. Process: 1. Identify leading (min x) and trailing (max x) edges. 2. Split into upper and lower surfaces. 3. Interpolate both surfaces using `npoints` uniformly spaced x-values. 4. Recombine to produce a smooth resampled airfoil shape. Args: xy: Nx2 matrix of (x, y) airfoil coordinates, not assumed to start at TE or LE. npoints: Number of points used in interpolation (default: 200). Returns: Resampled and recombined airfoil shape. """ x = xy[:, 0] y = xy[:, 1] # Find leading edge as the point with minimum x le_idx = np.argmin(x) x_le = x[le_idx] # Split into upper and lower surfaces upper = xy[:le_idx+1, :] lower = xy[le_idx:, :] # Sort upper surface by descending x upper_sort_idx = np.argsort(upper[:, 0]) upper_sorted = upper[upper_sort_idx, :] # Sort lower surface by ascending x lower_sort_idx = np.argsort(lower[:, 0]) lower_sorted = lower[lower_sort_idx, :] x_upper = upper_sorted[:, 0] y_upper = upper_sorted[:, 1] x_lower = lower_sorted[:, 0] y_lower = lower_sorted[:, 1] # Create common x-grid x_min = max(np.min(x_upper), np.min(x_lower)) x_max = min(np.max(x_upper), np.max(x_lower)) x_resample = np.linspace(x_min, x_max, int(round(npoints/2))) # Interpolate both surfaces itp_upper = interpolate.interp1d(x_upper, y_upper, bounds_error=False, fill_value='extrapolate') itp_lower = interpolate.interp1d(x_lower, y_lower, bounds_error=False, fill_value='extrapolate') y_upper_resampled = itp_upper(x_resample) y_lower_resampled = itp_lower(x_resample) # Recombine: upper reversed to preserve typical TE-to-LE-to-TE convention x_combined = np.concatenate([np.flip(x_resample), x_resample]) y_combined = np.concatenate([np.flip(y_upper_resampled), y_lower_resampled]) return np.column_stack([x_combined, y_combined])