Coverage for src / sdynpy / signal_processing / sdynpy_cpsd.py: 19%
198 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 16:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 16:22 +0000
1# -*- coding: utf-8 -*-
2"""
3Functions for dealing with CPSD Matrices
4"""
5"""
6Copyright 2022 National Technology & Engineering Solutions of Sandia,
7LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
8Government retains certain rights in this software.
10This program is free software: you can redistribute it and/or modify
11it under the terms of the GNU General Public License as published by
12the Free Software Foundation, either version 3 of the License, or
13(at your option) any later version.
15This program is distributed in the hope that it will be useful,
16but WITHOUT ANY WARRANTY; without even the implied warranty of
17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18GNU General Public License for more details.
20You should have received a copy of the GNU General Public License
21along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""
24import numpy as np
25import scipy.signal as sig
26import matplotlib.pyplot as plt
29def cpsd_coherence(cpsd):
30 num = np.abs(cpsd)**2
31 den = (cpsd[:, np.newaxis, np.arange(cpsd.shape[1]), np.arange(cpsd.shape[2])] *
32 cpsd[:, np.arange(cpsd.shape[1]), np.arange(cpsd.shape[2]), np.newaxis])
33 den[den == 0.0] = 1 # Set to 1
34 return np.real(num /
35 den)
38def cpsd_phase(cpsd):
39 return np.angle(cpsd)
42def cpsd_from_coh_phs(asd, coh, phs):
43 return np.exp(phs * 1j) * np.sqrt(coh * asd[:, :, np.newaxis] * asd[:, np.newaxis, :])
46def cpsd_autospectra(cpsd):
47 return np.einsum('ijj->ij', cpsd)
50def trace(cpsd):
51 return np.einsum('ijj->i', cpsd)
54def match_coherence_phase(cpsd_original, cpsd_to_match):
55 coh = cpsd_coherence(cpsd_to_match)
56 phs = cpsd_phase(cpsd_to_match)
57 asd = cpsd_autospectra(cpsd_original)
58 return cpsd_from_coh_phs(asd, coh, phs)
61def cpsd(signals: np.ndarray, sample_rate: int,
62 samples_per_frame: int, overlap: float,
63 window: str, averages_to_keep: int = None,
64 only_asds: bool = False, reference_signals = None):
65 """
66 Compute cpsd from signals
68 Parameters
69 ----------
70 signals : np.ndarray
71 2D numpy array containing the signals, with each row containing a
72 signal and each column containing a sample of each signal
73 sample_rate : int
74 Sample rate of the signal.
75 samples_per_frame : int
76 Number of samples per frame in the CPSD
77 overlap : float
78 Overlap as a fraction of the frame (e.g. 0.5 not 50).
79 window : str
80 Name of a window function in scipy.signal.windows, or a NumPy ndarray
81 with the window coefficients
82 averages_to_keep : int, optional
83 Optional number of averages to use. The default is None.
84 only_asds : bool, optional
85 If True, only compute autospectral densities, otherwise compute the
86 full CPSD matrix
87 reference_signals : np.ndarray
88 If specified, these signals will be used for the columns of the CPSD
89 matrix. If not specified, the signals passed to the `signals` argument
90 will be used for both the rows and columns of the CPSD matrix. Cannot
91 be used simultaneously with the `only_asds` argument being set to True.
93 Returns
94 -------
95 frequency_spacing : float
96 The frequency spacing of the CPSD matrix
97 response_spectral_matrix : np.ndarray
98 A complex array with dimensions number of frequency lines by
99 number of signals (by number of signals, if only_asds is False)
101 """
103 samples_per_acquire = int(samples_per_frame * (1 - overlap))
104 if isinstance(window[0],str): # This will pass if it is a string or a container with a string as the first value
105 window_array = sig.windows.get_window(window, samples_per_frame, fftbins=True)
106 else:
107 window_array = window
108 window_correction = 1 / np.mean(window_array**2)
109 frequency_spacing = sample_rate / samples_per_frame
110 time_starts = np.arange(0, signals.shape[-1] - samples_per_frame + 1, samples_per_acquire)
111 time_indices_for_averages = time_starts[:, np.newaxis] + np.arange(samples_per_frame)
112 if averages_to_keep is not None:
113 time_indices_for_averages = time_indices_for_averages[:averages_to_keep]
114 response_fft_array = signals[:, time_indices_for_averages]
115 response_fft = np.fft.rfft(response_fft_array[:, :, :] * window_array, axis=-1)
116 if only_asds:
117 if reference_signals is not None:
118 raise ValueError('Cannot specify `only_asds` equal to True and simultaneously provide reference signals')
119 response_spectral_matrix = np.einsum(
120 'iaf,iaf->fi', response_fft, np.conj(response_fft)) / response_fft.shape[1]
121 else:
122 if reference_signals is None:
123 reference_fft = response_fft
124 else:
125 reference_fft_array = reference_signals[:,time_indices_for_averages]
126 reference_fft = np.fft.rfft(reference_fft_array[:, :, :] * window_array, axis=-1)
127 response_spectral_matrix = np.einsum(
128 'iaf,jaf->fij', response_fft, np.conj(reference_fft)) / response_fft.shape[1]
129 # Normalize
130 response_spectral_matrix *= (frequency_spacing * window_correction /
131 sample_rate**2)
132 response_spectral_matrix[1:-1] *= 2
133 return frequency_spacing, response_spectral_matrix
135def dB_pow(x): return 10 * np.log10(x)
138def db2scale(dB):
139 """ Converts a decibel value to a scale factor
141 Parameters
142 ----------
143 dB : float :
144 Value in decibels
147 Returns
148 -------
149 scale : float :
150 Value in linear
152 """
153 return 10**(dB/20)
156def rms(x, axis=None):
157 return np.sqrt(np.mean(x**2, axis=axis))
160def rms_csd(csd, df):
161 """Computes RMS of a CPSD matrix
163 Parameters
164 ----------
165 csd : np.ndarray :
166 3D complex Numpy array where the first dimension is the frequency line
167 and the second two dimensions are the rows and columns of the CPSD
168 matrix.
169 df : float :
170 Frequency spacing of the CPSD matrix
172 Returns
173 -------
174 rms : numpy scalar or numpy.ndarray
175 The root-mean-square value of signals in the CPSD matrix
177 """
178 return np.sqrt(np.einsum('ijj->j', csd).real * df)
181def plot_cpsd_error(frequencies, spec, channel_names=None, figure_kwargs={}, linewidth=1, plot_kwargs={}, **kwargs):
182 spec_asd = np.real(np.einsum('ijj->ji', spec))
183 data_asd = {legend: np.real(np.einsum('ijj->ji', data)) for legend, data in kwargs.items()}
184 num_channels = spec_asd.shape[0]
185 if channel_names is None:
186 channel_names = ['Channel {:}'.format(i) for i in range(num_channels)]
187 ncols = int(np.floor(np.sqrt(num_channels)))
188 nrows = int(np.ceil(num_channels / ncols))
189 if len(kwargs) > 1:
190 total_rows = nrows + 2
191 elif len(kwargs) == 1:
192 total_rows = nrows + 1
193 else:
194 total_rows = nrows
195 fig = plt.figure(**figure_kwargs)
196 grid_spec = plt.GridSpec(total_rows, ncols, figure=fig)
197 for i in range(num_channels):
198 this_row = i // ncols
199 this_col = i % ncols
200 if i == 0:
201 ax = fig.add_subplot(grid_spec[this_row, this_col])
202 original_ax = ax
203 else:
204 ax = fig.add_subplot(grid_spec[this_row, this_col], sharex=original_ax,
205 sharey=original_ax)
206 ax.plot(frequencies, spec_asd[i], linewidth=linewidth * 2, color='k', **plot_kwargs)
207 for legend, data in data_asd.items():
208 ax.plot(frequencies, data[i], linewidth=linewidth)
209 ax.set_ylabel(channel_names[i])
210 if i == 0:
211 ax.set_yscale('log')
212 if this_row == nrows - 1:
213 ax.set_xlabel('Frequency (Hz)')
214 else:
215 plt.setp(ax.get_xticklabels(), visible=False)
216 if this_col != 0:
217 plt.setp(ax.get_yticklabels(), visible=False)
218 return_data = None
219 if len(kwargs) > 0:
220 spec_sum_asd = np.sum(spec_asd, axis=0)
221 data_sum_asd = {legend: np.sum(data, axis=0) for legend, data in data_asd.items()}
222 db_error = {legend: rms(dB_pow(data) - dB_pow(spec_asd), axis=0)
223 for legend, data in data_asd.items()}
224 plot_width = ncols // 2
225 ax = fig.add_subplot(grid_spec[nrows, 0:plot_width])
226 ax.plot(frequencies, spec_sum_asd, linewidth=2 * linewidth, color='k')
227 for legend, data in data_sum_asd.items():
228 ax.plot(frequencies, data, linewidth=linewidth)
229 ax.set_yscale('log')
230 ax.set_ylabel('Sum ASDs')
231 ax = fig.add_subplot(grid_spec[nrows, -plot_width:])
232 for legend, data in db_error.items():
233 ax.plot(frequencies, data, linewidth=linewidth)
234 ax.set_ylabel('dB Error')
235 if len(kwargs) > 1:
236 prop_cycle = plt.rcParams['axes.prop_cycle']
237 colors = prop_cycle.by_key()['color'] * 10
238 db_error_sum_asd = {legend: rms(dB_pow(sum_asd) - dB_pow(spec_sum_asd))
239 for legend, sum_asd in data_sum_asd.items()}
240 db_error_rms = {legend: rms(data) for legend, data in db_error.items()}
241 return_data = (db_error_sum_asd, db_error_rms)
242 ax = fig.add_subplot(grid_spec[nrows + 1, 0:plot_width])
243 for i, (legend, data) in enumerate(db_error_sum_asd.items()):
244 ax.bar(i, data, color=colors[i])
245 ax.text(i, 0, '{:.2f}'.format(data),
246 horizontalalignment='center', verticalalignment='bottom')
247 ax.set_xticks(np.arange(i + 1))
248 ax.set_xticklabels([legend.replace('_', ' ')
249 for legend in db_error_sum_asd], rotation=20, horizontalalignment='right')
250 ax.set_ylabel('Sum RMS dB Error')
251 ax = fig.add_subplot(grid_spec[nrows + 1, -plot_width:])
252 for i, (legend, data) in enumerate(db_error_rms.items()):
253 ax.bar(i, data, color=colors[i])
254 ax.text(i, 0, '{:.2f}'.format(data),
255 horizontalalignment='center', verticalalignment='bottom')
256 ax.set_xticks(np.arange(i + 1))
257 ax.set_xticklabels([legend.replace('_', ' ')
258 for legend in db_error_rms], rotation=20, horizontalalignment='right')
259 ax.set_ylabel('RMS dB Error')
260 fig.tight_layout()
261 return return_data
264def plot_asds(cpsd, freq=None, ax=None, subplots_kwargs={'sharex': True, 'sharey': True}, plot_kwargs={}):
265 num_freq, num_channels, num_channels = cpsd.shape
266 ncols = int(np.floor(np.sqrt(num_channels)))
267 nrows = int(np.ceil(num_channels / ncols))
268 if ax is None:
269 f, ax = plt.subplots(nrows, ncols, **subplots_kwargs)
270 diag = np.einsum('jii->ij', cpsd).real
271 for asd, a in zip(diag, ax.flatten()):
272 if freq is None:
273 a.plot(asd, **plot_kwargs)
274 else:
275 a.plot(freq, asd, **plot_kwargs)
276 a.set_yscale('log')
277 return ax
280def cpsd_to_time_history(cpsd_matrix, sample_rate, df, output_oversample=1):
281 """Generates a time history realization from a CPSD matrix
283 Parameters
284 ----------
285 cpsd_matrix : np.ndarray :
286 A 3D complex np.ndarray representing a CPSD matrix where the first
287 dimension is the frequency line and the second two dimensions are the
288 rows and columns of the matrix at each frequency line.
289 sample_rate: float :
290 The sample rate of the controller in samples per second
291 df : float :
292 The frequency spacing of the cpsd matrix
295 Returns
296 -------
297 output : np.ndarray :
298 A numpy array containing the generated signals
300 Notes
301 -----
302 Uses the process described in [1]_
304 .. [1] R. Schultz and G. Nelson, "Input signal synthesis for open-loop
305 multiple-input/multiple-output testing," Proceedings of the International
306 Modal Analysis Conference, 2019.
308 """
309 # Compute SVD broadcasting over all frequency lines
310 [U, S, Vh] = np.linalg.svd(cpsd_matrix, full_matrices=False)
311 # Reform using the sqrt of the S matrix
312 Lsvd = U * np.sqrt(S[:, np.newaxis, :]) @ Vh
313 # Compute Random Process
314 W = np.sqrt(0.5) * (np.random.randn(*
315 cpsd_matrix.shape[:-1], 1) + 1j * np.random.randn(*cpsd_matrix.shape[:-1], 1))
316 Xv = 1 / np.sqrt(df) * Lsvd @ W
317 # Ensure that the signal is real by setting the nyquist and DC component to 0
318 Xv[[0, -1], :, :] = 0
319 # Compute the IFFT, using the real version makes it so you don't need negative frequencies
320 zero_padding = np.zeros([((output_oversample - 1) * (Xv.shape[0] - 1))
321 ] + list(Xv.shape[1:]), dtype=Xv.dtype)
322 xv = np.fft.irfft(np.concatenate((Xv, zero_padding), axis=0) /
323 np.sqrt(2), axis=0) * output_oversample * sample_rate
324 output = xv[:, :, 0].T
325 return output
328def shaped_psd(frequency_spacing, bandwidth, num_channels=1, target_rms=None,
329 min_frequency=0.0, max_frequency=None, breakpoint_frequencies=None,
330 breakpoint_levels=None, breakpoint_interpolation='linear',
331 squeeze=True):
332 num_lines = int(bandwidth / frequency_spacing) + 1
333 all_freqs = np.arange(num_lines) * frequency_spacing
335 if breakpoint_frequencies is None or breakpoint_levels is None:
336 cpsd = np.ones(all_freqs.shape)
337 else:
338 if breakpoint_interpolation in ['log', 'logarithmic']:
339 cpsd = np.interp(np.log(all_freqs), np.log(breakpoint_frequencies),
340 breakpoint_levels, left=0, right=0)
341 elif breakpoint_interpolation in ['lin', 'linear']:
342 cpsd = np.interp(all_freqs, breakpoint_frequencies, breakpoint_levels)
343 else:
344 raise ValueError('Invalid Interpolation, should be "lin" or "log"')
346 # Truncate to the minimum frequency
347 cpsd[all_freqs < min_frequency] = 0
348 if max_frequency is not None:
349 cpsd[all_freqs > max_frequency] = 0
351 if target_rms is not None:
352 cpsd_rms = np.sqrt(np.sum(cpsd) * frequency_spacing)
353 cpsd *= (target_rms / cpsd_rms)**2
355 full_cpsd = np.zeros(cpsd.shape + (num_channels, num_channels), dtype='complex128')
357 full_cpsd[:, np.arange(num_channels), np.arange(num_channels)] = cpsd[:, np.newaxis]
359 if squeeze:
360 full_cpsd = full_cpsd.squeeze()
362 return full_cpsd
364def nth_octave_freqs(freq,oct_order=1):
365 """
366 Get N-th octave band frequencies
368 Parameters
369 ----------
370 freq : ndarray
371 array of frequency values, either including all freqs or only min and max
372 oct_order : int, optional
373 octave type, 1/octave order. 3 represents 1/3 octave bands. default is 1
375 Returns
376 -------
377 nominal_band_centers : ndarray
378 rounded band center frequencies using ANSI standers. Used to report
379 results, not to compute band limits
380 band_lb : ndarray
381 lower band frequencies in Hz
382 band_ub : ndarray
383 upper band frequencies in Hz
384 band_centers : ndarray
385 exact computed octave center band frequencies
387 Notes
388 -------
389 Uses equations in ANSI S1.11-2014 "Electroacoustics – Octave-band and
390 Fractionaloctave-band Filters – Part 1: Specifications"
392 Uses the min and max freq lines in the provided freq to determine the upper
393 and lower bands
395 Uses 1000 Hz as the reference frequency (as per the ANSI standard)
397 Computes the 1/Nth band center frequencies, using different equations
398 depending on the octave order. Orders with odd numbers (1, 1/3, etc.)
399 use ANSI eqn 2. Order with even numbers (1/6, 1/12, etc.) use eqn 3.
400 This causes any sub-bands to fit entirely within the full octave band
402 Note: even-numbered sub-bands will not share center frequencies with
403 the full octave band, but will share the upper and lower limits.
404 ANSI S1.11 Annex A gives instructions for rounding to make the nominal
405 band center freqs. If left-most digit is 1-4, round to 3 significant
406 digits. If left-most digit is 5-9, round to 2 significant digits. For
407 example, 41.567 rounds to 41.6. 8785.2 rounds to 8800.
408 """
410 # Process inputs
411 if not isinstance(freq,np.ndarray):
412 freq = np.array([freq]).flatten()
414 # Setup
415 oct_ratio = 10**0.3
416 f_ref = 1000
417 bands = np.arange(-200,201)
419 # Compute exact center freqs and edges
420 # equation is different if using odd octOrder (1, 3) vs. even octOrder
421 # (6, 12). Note: bandNumbers = x and octOrder = b in the ANSI eqns
422 pow = [(2*bands+1)/(2*oct_order), bands/oct_order]
423 band_centers = f_ref * oct_ratio**pow[oct_order%2]
424 band_lb = ( band_centers * oct_ratio**(-1/(2*oct_order)) ).round(12) # round slightly
425 band_ub = ( band_centers * oct_ratio**(1/(2*oct_order)) ).round(12) # round slightly
427 # ONLY KEEP BANDS IN THE FREQ RANGE OF INTEREST:
428 # lower limit of lowest band is above min freq
429 # upper limit of highest band is below max freq
430 # rationale: want to compute oct-ave using all the data, so include
431 # the band the includes the minimum frequency and the band that includes
432 # the maximum frequency.
433 # Note that this will result in an under-estimate of the first and last
434 # band levels since some of the band is empty.
435 inds = (max([freq.min(),10]) < band_ub) * (freq.max() > band_lb)
436 band_centers,band_lb,band_ub = [band_centers[inds],band_lb[inds],band_ub[inds]]
439 # GET THE NOMINAL BAND CENTER FREQUENCIES BY ROUNDING
440 # These are just "nice" numbers to make it easier to identify the bands
441 # edges are still calculated from the exact center freqs
442 # method used here is based on:
443 # https://www.mathworks.com/matlabcentral/fileexchange/26212-round-with-significant-digits
444 # freqBandCenters = [ 41.567, 8785.2 ] should give: [ 41.6, 8800]
445 pow_10 = 10** np.floor(np.log10(band_centers))
446 left_digit =np.floor(band_centers/pow_10)
447 sig_digits = 3*(left_digit<=4) + 2*(left_digit>4)
448 tmp = 10 ** np.floor(np.abs(np.log10(band_centers)) - sig_digits + 1)
449 nominal_band_centers = np.round(band_centers/tmp)*tmp
451 # Replace calculated nominal band centers with ANSI nominal band centers if
452 # doing full or 1/3rd octaves:
453 # ANSI only gives freqs from 25 to 20000, so don't correct outside that range
454 # find nearest ANSI nominal center, then verify it is between the band
455 # limits. If the ANSI nominal is between the band limits, replace the
456 # calculated nominal with the ANSI value.
457 if oct_order == 1:
458 ansi_nominal = np.array([31.5 , 63 , 125 , 250 , 500 , 1000 , 2000 , 4000 , 8000 , 16000])
459 elif oct_order == 3:
460 ansi_nominal = np.array([25 , 31.5 , 40 , 50 , 63 , 80 , 100 , 125 , 160 , 200 ,
461 250 , 315 , 400 , 500 , 630 , 800 , 1000 , 1250 , 1600 , 2000 ,
462 2500 , 3150 , 4000 , 5000 , 8000 , 10000 , 12500 , 16000 , 20000])
464 if oct_order==1 or oct_order==3:
465 dists = abs(ansi_nominal - band_centers[:,np.newaxis]) # dist from center freq to nominal
466 contained = (ansi_nominal < band_ub[:,np.newaxis]) * (ansi_nominal > band_lb[:,np.newaxis]) # bands contain nominals
467 contained_dists = contained*dists + 1e20*(~contained) # product is infty if not contained
469 assign_inds = contained_dists.argmin(1) # nominal inds to assign
470 assign_bools = contained_dists.min(1) < 1e20 # center freqs inds to assign
472 nominal_band_centers[assign_bools] = ansi_nominal[assign_inds[assign_bools]]
474 return nominal_band_centers, band_lb, band_ub, band_centers