Coverage for src / sdynpy / signal_processing / sdynpy_cpsd.py: 19%

198 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-11 16:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Functions for dealing with CPSD Matrices 

4""" 

5""" 

6Copyright 2022 National Technology & Engineering Solutions of Sandia, 

7LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

8Government retains certain rights in this software. 

9 

10This program is free software: you can redistribute it and/or modify 

11it under the terms of the GNU General Public License as published by 

12the Free Software Foundation, either version 3 of the License, or 

13(at your option) any later version. 

14 

15This program is distributed in the hope that it will be useful, 

16but WITHOUT ANY WARRANTY; without even the implied warranty of 

17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18GNU General Public License for more details. 

19 

20You should have received a copy of the GNU General Public License 

21along with this program. If not, see <https://www.gnu.org/licenses/>. 

22""" 

23 

24import numpy as np 

25import scipy.signal as sig 

26import matplotlib.pyplot as plt 

27 

28 

29def cpsd_coherence(cpsd): 

30 num = np.abs(cpsd)**2 

31 den = (cpsd[:, np.newaxis, np.arange(cpsd.shape[1]), np.arange(cpsd.shape[2])] * 

32 cpsd[:, np.arange(cpsd.shape[1]), np.arange(cpsd.shape[2]), np.newaxis]) 

33 den[den == 0.0] = 1 # Set to 1 

34 return np.real(num / 

35 den) 

36 

37 

38def cpsd_phase(cpsd): 

39 return np.angle(cpsd) 

40 

41 

42def cpsd_from_coh_phs(asd, coh, phs): 

43 return np.exp(phs * 1j) * np.sqrt(coh * asd[:, :, np.newaxis] * asd[:, np.newaxis, :]) 

44 

45 

46def cpsd_autospectra(cpsd): 

47 return np.einsum('ijj->ij', cpsd) 

48 

49 

50def trace(cpsd): 

51 return np.einsum('ijj->i', cpsd) 

52 

53 

54def match_coherence_phase(cpsd_original, cpsd_to_match): 

55 coh = cpsd_coherence(cpsd_to_match) 

56 phs = cpsd_phase(cpsd_to_match) 

57 asd = cpsd_autospectra(cpsd_original) 

58 return cpsd_from_coh_phs(asd, coh, phs) 

59 

60 

61def cpsd(signals: np.ndarray, sample_rate: int, 

62 samples_per_frame: int, overlap: float, 

63 window: str, averages_to_keep: int = None, 

64 only_asds: bool = False, reference_signals = None): 

65 """ 

66 Compute cpsd from signals 

67 

68 Parameters 

69 ---------- 

70 signals : np.ndarray 

71 2D numpy array containing the signals, with each row containing a 

72 signal and each column containing a sample of each signal 

73 sample_rate : int 

74 Sample rate of the signal. 

75 samples_per_frame : int 

76 Number of samples per frame in the CPSD 

77 overlap : float 

78 Overlap as a fraction of the frame (e.g. 0.5 not 50). 

79 window : str 

80 Name of a window function in scipy.signal.windows, or a NumPy ndarray 

81 with the window coefficients 

82 averages_to_keep : int, optional 

83 Optional number of averages to use. The default is None. 

84 only_asds : bool, optional 

85 If True, only compute autospectral densities, otherwise compute the 

86 full CPSD matrix 

87 reference_signals : np.ndarray 

88 If specified, these signals will be used for the columns of the CPSD 

89 matrix. If not specified, the signals passed to the `signals` argument 

90 will be used for both the rows and columns of the CPSD matrix. Cannot 

91 be used simultaneously with the `only_asds` argument being set to True. 

92 

93 Returns 

94 ------- 

95 frequency_spacing : float 

96 The frequency spacing of the CPSD matrix 

97 response_spectral_matrix : np.ndarray 

98 A complex array with dimensions number of frequency lines by 

99 number of signals (by number of signals, if only_asds is False) 

100 

101 """ 

102 

103 samples_per_acquire = int(samples_per_frame * (1 - overlap)) 

104 if isinstance(window[0],str): # This will pass if it is a string or a container with a string as the first value 

105 window_array = sig.windows.get_window(window, samples_per_frame, fftbins=True) 

106 else: 

107 window_array = window 

108 window_correction = 1 / np.mean(window_array**2) 

109 frequency_spacing = sample_rate / samples_per_frame 

110 time_starts = np.arange(0, signals.shape[-1] - samples_per_frame + 1, samples_per_acquire) 

111 time_indices_for_averages = time_starts[:, np.newaxis] + np.arange(samples_per_frame) 

112 if averages_to_keep is not None: 

113 time_indices_for_averages = time_indices_for_averages[:averages_to_keep] 

114 response_fft_array = signals[:, time_indices_for_averages] 

115 response_fft = np.fft.rfft(response_fft_array[:, :, :] * window_array, axis=-1) 

116 if only_asds: 

117 if reference_signals is not None: 

118 raise ValueError('Cannot specify `only_asds` equal to True and simultaneously provide reference signals') 

119 response_spectral_matrix = np.einsum( 

120 'iaf,iaf->fi', response_fft, np.conj(response_fft)) / response_fft.shape[1] 

121 else: 

122 if reference_signals is None: 

123 reference_fft = response_fft 

124 else: 

125 reference_fft_array = reference_signals[:,time_indices_for_averages] 

126 reference_fft = np.fft.rfft(reference_fft_array[:, :, :] * window_array, axis=-1) 

127 response_spectral_matrix = np.einsum( 

128 'iaf,jaf->fij', response_fft, np.conj(reference_fft)) / response_fft.shape[1] 

129 # Normalize 

130 response_spectral_matrix *= (frequency_spacing * window_correction / 

131 sample_rate**2) 

132 response_spectral_matrix[1:-1] *= 2 

133 return frequency_spacing, response_spectral_matrix 

134 

135def dB_pow(x): return 10 * np.log10(x) 

136 

137 

138def db2scale(dB): 

139 """ Converts a decibel value to a scale factor 

140 

141 Parameters 

142 ---------- 

143 dB : float : 

144 Value in decibels 

145 

146 

147 Returns 

148 ------- 

149 scale : float : 

150 Value in linear 

151 

152 """ 

153 return 10**(dB/20) 

154 

155 

156def rms(x, axis=None): 

157 return np.sqrt(np.mean(x**2, axis=axis)) 

158 

159 

160def rms_csd(csd, df): 

161 """Computes RMS of a CPSD matrix 

162 

163 Parameters 

164 ---------- 

165 csd : np.ndarray : 

166 3D complex Numpy array where the first dimension is the frequency line 

167 and the second two dimensions are the rows and columns of the CPSD 

168 matrix. 

169 df : float : 

170 Frequency spacing of the CPSD matrix 

171 

172 Returns 

173 ------- 

174 rms : numpy scalar or numpy.ndarray 

175 The root-mean-square value of signals in the CPSD matrix 

176 

177 """ 

178 return np.sqrt(np.einsum('ijj->j', csd).real * df) 

179 

180 

181def plot_cpsd_error(frequencies, spec, channel_names=None, figure_kwargs={}, linewidth=1, plot_kwargs={}, **kwargs): 

182 spec_asd = np.real(np.einsum('ijj->ji', spec)) 

183 data_asd = {legend: np.real(np.einsum('ijj->ji', data)) for legend, data in kwargs.items()} 

184 num_channels = spec_asd.shape[0] 

185 if channel_names is None: 

186 channel_names = ['Channel {:}'.format(i) for i in range(num_channels)] 

187 ncols = int(np.floor(np.sqrt(num_channels))) 

188 nrows = int(np.ceil(num_channels / ncols)) 

189 if len(kwargs) > 1: 

190 total_rows = nrows + 2 

191 elif len(kwargs) == 1: 

192 total_rows = nrows + 1 

193 else: 

194 total_rows = nrows 

195 fig = plt.figure(**figure_kwargs) 

196 grid_spec = plt.GridSpec(total_rows, ncols, figure=fig) 

197 for i in range(num_channels): 

198 this_row = i // ncols 

199 this_col = i % ncols 

200 if i == 0: 

201 ax = fig.add_subplot(grid_spec[this_row, this_col]) 

202 original_ax = ax 

203 else: 

204 ax = fig.add_subplot(grid_spec[this_row, this_col], sharex=original_ax, 

205 sharey=original_ax) 

206 ax.plot(frequencies, spec_asd[i], linewidth=linewidth * 2, color='k', **plot_kwargs) 

207 for legend, data in data_asd.items(): 

208 ax.plot(frequencies, data[i], linewidth=linewidth) 

209 ax.set_ylabel(channel_names[i]) 

210 if i == 0: 

211 ax.set_yscale('log') 

212 if this_row == nrows - 1: 

213 ax.set_xlabel('Frequency (Hz)') 

214 else: 

215 plt.setp(ax.get_xticklabels(), visible=False) 

216 if this_col != 0: 

217 plt.setp(ax.get_yticklabels(), visible=False) 

218 return_data = None 

219 if len(kwargs) > 0: 

220 spec_sum_asd = np.sum(spec_asd, axis=0) 

221 data_sum_asd = {legend: np.sum(data, axis=0) for legend, data in data_asd.items()} 

222 db_error = {legend: rms(dB_pow(data) - dB_pow(spec_asd), axis=0) 

223 for legend, data in data_asd.items()} 

224 plot_width = ncols // 2 

225 ax = fig.add_subplot(grid_spec[nrows, 0:plot_width]) 

226 ax.plot(frequencies, spec_sum_asd, linewidth=2 * linewidth, color='k') 

227 for legend, data in data_sum_asd.items(): 

228 ax.plot(frequencies, data, linewidth=linewidth) 

229 ax.set_yscale('log') 

230 ax.set_ylabel('Sum ASDs') 

231 ax = fig.add_subplot(grid_spec[nrows, -plot_width:]) 

232 for legend, data in db_error.items(): 

233 ax.plot(frequencies, data, linewidth=linewidth) 

234 ax.set_ylabel('dB Error') 

235 if len(kwargs) > 1: 

236 prop_cycle = plt.rcParams['axes.prop_cycle'] 

237 colors = prop_cycle.by_key()['color'] * 10 

238 db_error_sum_asd = {legend: rms(dB_pow(sum_asd) - dB_pow(spec_sum_asd)) 

239 for legend, sum_asd in data_sum_asd.items()} 

240 db_error_rms = {legend: rms(data) for legend, data in db_error.items()} 

241 return_data = (db_error_sum_asd, db_error_rms) 

242 ax = fig.add_subplot(grid_spec[nrows + 1, 0:plot_width]) 

243 for i, (legend, data) in enumerate(db_error_sum_asd.items()): 

244 ax.bar(i, data, color=colors[i]) 

245 ax.text(i, 0, '{:.2f}'.format(data), 

246 horizontalalignment='center', verticalalignment='bottom') 

247 ax.set_xticks(np.arange(i + 1)) 

248 ax.set_xticklabels([legend.replace('_', ' ') 

249 for legend in db_error_sum_asd], rotation=20, horizontalalignment='right') 

250 ax.set_ylabel('Sum RMS dB Error') 

251 ax = fig.add_subplot(grid_spec[nrows + 1, -plot_width:]) 

252 for i, (legend, data) in enumerate(db_error_rms.items()): 

253 ax.bar(i, data, color=colors[i]) 

254 ax.text(i, 0, '{:.2f}'.format(data), 

255 horizontalalignment='center', verticalalignment='bottom') 

256 ax.set_xticks(np.arange(i + 1)) 

257 ax.set_xticklabels([legend.replace('_', ' ') 

258 for legend in db_error_rms], rotation=20, horizontalalignment='right') 

259 ax.set_ylabel('RMS dB Error') 

260 fig.tight_layout() 

261 return return_data 

262 

263 

264def plot_asds(cpsd, freq=None, ax=None, subplots_kwargs={'sharex': True, 'sharey': True}, plot_kwargs={}): 

265 num_freq, num_channels, num_channels = cpsd.shape 

266 ncols = int(np.floor(np.sqrt(num_channels))) 

267 nrows = int(np.ceil(num_channels / ncols)) 

268 if ax is None: 

269 f, ax = plt.subplots(nrows, ncols, **subplots_kwargs) 

270 diag = np.einsum('jii->ij', cpsd).real 

271 for asd, a in zip(diag, ax.flatten()): 

272 if freq is None: 

273 a.plot(asd, **plot_kwargs) 

274 else: 

275 a.plot(freq, asd, **plot_kwargs) 

276 a.set_yscale('log') 

277 return ax 

278 

279 

280def cpsd_to_time_history(cpsd_matrix, sample_rate, df, output_oversample=1): 

281 """Generates a time history realization from a CPSD matrix 

282 

283 Parameters 

284 ---------- 

285 cpsd_matrix : np.ndarray : 

286 A 3D complex np.ndarray representing a CPSD matrix where the first 

287 dimension is the frequency line and the second two dimensions are the 

288 rows and columns of the matrix at each frequency line. 

289 sample_rate: float : 

290 The sample rate of the controller in samples per second 

291 df : float : 

292 The frequency spacing of the cpsd matrix 

293 

294 

295 Returns 

296 ------- 

297 output : np.ndarray : 

298 A numpy array containing the generated signals 

299 

300 Notes 

301 ----- 

302 Uses the process described in [1]_ 

303 

304 .. [1] R. Schultz and G. Nelson, "Input signal synthesis for open-loop 

305 multiple-input/multiple-output testing," Proceedings of the International 

306 Modal Analysis Conference, 2019. 

307 

308 """ 

309 # Compute SVD broadcasting over all frequency lines 

310 [U, S, Vh] = np.linalg.svd(cpsd_matrix, full_matrices=False) 

311 # Reform using the sqrt of the S matrix 

312 Lsvd = U * np.sqrt(S[:, np.newaxis, :]) @ Vh 

313 # Compute Random Process 

314 W = np.sqrt(0.5) * (np.random.randn(* 

315 cpsd_matrix.shape[:-1], 1) + 1j * np.random.randn(*cpsd_matrix.shape[:-1], 1)) 

316 Xv = 1 / np.sqrt(df) * Lsvd @ W 

317 # Ensure that the signal is real by setting the nyquist and DC component to 0 

318 Xv[[0, -1], :, :] = 0 

319 # Compute the IFFT, using the real version makes it so you don't need negative frequencies 

320 zero_padding = np.zeros([((output_oversample - 1) * (Xv.shape[0] - 1)) 

321 ] + list(Xv.shape[1:]), dtype=Xv.dtype) 

322 xv = np.fft.irfft(np.concatenate((Xv, zero_padding), axis=0) / 

323 np.sqrt(2), axis=0) * output_oversample * sample_rate 

324 output = xv[:, :, 0].T 

325 return output 

326 

327 

328def shaped_psd(frequency_spacing, bandwidth, num_channels=1, target_rms=None, 

329 min_frequency=0.0, max_frequency=None, breakpoint_frequencies=None, 

330 breakpoint_levels=None, breakpoint_interpolation='linear', 

331 squeeze=True): 

332 num_lines = int(bandwidth / frequency_spacing) + 1 

333 all_freqs = np.arange(num_lines) * frequency_spacing 

334 

335 if breakpoint_frequencies is None or breakpoint_levels is None: 

336 cpsd = np.ones(all_freqs.shape) 

337 else: 

338 if breakpoint_interpolation in ['log', 'logarithmic']: 

339 cpsd = np.interp(np.log(all_freqs), np.log(breakpoint_frequencies), 

340 breakpoint_levels, left=0, right=0) 

341 elif breakpoint_interpolation in ['lin', 'linear']: 

342 cpsd = np.interp(all_freqs, breakpoint_frequencies, breakpoint_levels) 

343 else: 

344 raise ValueError('Invalid Interpolation, should be "lin" or "log"') 

345 

346 # Truncate to the minimum frequency 

347 cpsd[all_freqs < min_frequency] = 0 

348 if max_frequency is not None: 

349 cpsd[all_freqs > max_frequency] = 0 

350 

351 if target_rms is not None: 

352 cpsd_rms = np.sqrt(np.sum(cpsd) * frequency_spacing) 

353 cpsd *= (target_rms / cpsd_rms)**2 

354 

355 full_cpsd = np.zeros(cpsd.shape + (num_channels, num_channels), dtype='complex128') 

356 

357 full_cpsd[:, np.arange(num_channels), np.arange(num_channels)] = cpsd[:, np.newaxis] 

358 

359 if squeeze: 

360 full_cpsd = full_cpsd.squeeze() 

361 

362 return full_cpsd 

363 

364def nth_octave_freqs(freq,oct_order=1): 

365 """ 

366 Get N-th octave band frequencies 

367 

368 Parameters 

369 ---------- 

370 freq : ndarray 

371 array of frequency values, either including all freqs or only min and max 

372 oct_order : int, optional 

373 octave type, 1/octave order. 3 represents 1/3 octave bands. default is 1 

374 

375 Returns 

376 ------- 

377 nominal_band_centers : ndarray 

378 rounded band center frequencies using ANSI standers. Used to report 

379 results, not to compute band limits 

380 band_lb : ndarray 

381 lower band frequencies in Hz 

382 band_ub : ndarray 

383 upper band frequencies in Hz 

384 band_centers : ndarray 

385 exact computed octave center band frequencies 

386  

387 Notes 

388 ------- 

389 Uses equations in ANSI S1.11-2014 "Electroacoustics – Octave-band and  

390 Fractionaloctave-band Filters – Part 1: Specifications" 

391  

392 Uses the min and max freq lines in the provided freq to determine the upper 

393 and lower bands 

394  

395 Uses 1000 Hz as the reference frequency (as per the ANSI standard) 

396  

397 Computes the 1/Nth band center frequencies, using different equations 

398 depending on the octave order. Orders with odd numbers (1, 1/3, etc.)  

399 use ANSI eqn 2. Order with even numbers (1/6, 1/12, etc.) use eqn 3.  

400 This causes any sub-bands to fit entirely within the full octave band 

401  

402 Note: even-numbered sub-bands will not share center frequencies with  

403 the full octave band, but will share the upper and lower limits. 

404 ANSI S1.11 Annex A gives instructions for rounding to make the nominal 

405 band center freqs. If left-most digit is 1-4, round to 3 significant 

406 digits. If left-most digit is 5-9, round to 2 significant digits. For 

407 example, 41.567 rounds to 41.6. 8785.2 rounds to 8800.  

408 """ 

409 

410 # Process inputs 

411 if not isinstance(freq,np.ndarray): 

412 freq = np.array([freq]).flatten() 

413 

414 # Setup 

415 oct_ratio = 10**0.3 

416 f_ref = 1000 

417 bands = np.arange(-200,201) 

418 

419 # Compute exact center freqs and edges 

420 # equation is different if using odd octOrder (1, 3) vs. even octOrder 

421 # (6, 12). Note: bandNumbers = x and octOrder = b in the ANSI eqns 

422 pow = [(2*bands+1)/(2*oct_order), bands/oct_order] 

423 band_centers = f_ref * oct_ratio**pow[oct_order%2] 

424 band_lb = ( band_centers * oct_ratio**(-1/(2*oct_order)) ).round(12) # round slightly 

425 band_ub = ( band_centers * oct_ratio**(1/(2*oct_order)) ).round(12) # round slightly 

426 

427 # ONLY KEEP BANDS IN THE FREQ RANGE OF INTEREST:  

428 # lower limit of lowest band is above min freq 

429 # upper limit of highest band is below max freq 

430 # rationale: want to compute oct-ave using all the data, so include  

431 # the band the includes the minimum frequency and the band that includes 

432 # the maximum frequency. 

433 # Note that this will result in an under-estimate of the first and last 

434 # band levels since some of the band is empty. 

435 inds = (max([freq.min(),10]) < band_ub) * (freq.max() > band_lb) 

436 band_centers,band_lb,band_ub = [band_centers[inds],band_lb[inds],band_ub[inds]] 

437 

438 

439 # GET THE NOMINAL BAND CENTER FREQUENCIES BY ROUNDING 

440 # These are just "nice" numbers to make it easier to identify the bands  

441 # edges are still calculated from the exact center freqs 

442 # method used here is based on: 

443 # https://www.mathworks.com/matlabcentral/fileexchange/26212-round-with-significant-digits 

444 # freqBandCenters = [ 41.567, 8785.2 ] should give: [ 41.6, 8800] 

445 pow_10 = 10** np.floor(np.log10(band_centers)) 

446 left_digit =np.floor(band_centers/pow_10) 

447 sig_digits = 3*(left_digit<=4) + 2*(left_digit>4) 

448 tmp = 10 ** np.floor(np.abs(np.log10(band_centers)) - sig_digits + 1) 

449 nominal_band_centers = np.round(band_centers/tmp)*tmp 

450 

451 # Replace calculated nominal band centers with ANSI nominal band centers if 

452 # doing full or 1/3rd octaves: 

453 # ANSI only gives freqs from 25 to 20000, so don't correct outside that range 

454 # find nearest ANSI nominal center, then verify it is between the band 

455 # limits. If the ANSI nominal is between the band limits, replace the  

456 # calculated nominal with the ANSI value. 

457 if oct_order == 1: 

458 ansi_nominal = np.array([31.5 , 63 , 125 , 250 , 500 , 1000 , 2000 , 4000 , 8000 , 16000]) 

459 elif oct_order == 3: 

460 ansi_nominal = np.array([25 , 31.5 , 40 , 50 , 63 , 80 , 100 , 125 , 160 , 200 , 

461 250 , 315 , 400 , 500 , 630 , 800 , 1000 , 1250 , 1600 , 2000 , 

462 2500 , 3150 , 4000 , 5000 , 8000 , 10000 , 12500 , 16000 , 20000]) 

463 

464 if oct_order==1 or oct_order==3: 

465 dists = abs(ansi_nominal - band_centers[:,np.newaxis]) # dist from center freq to nominal 

466 contained = (ansi_nominal < band_ub[:,np.newaxis]) * (ansi_nominal > band_lb[:,np.newaxis]) # bands contain nominals 

467 contained_dists = contained*dists + 1e20*(~contained) # product is infty if not contained 

468 

469 assign_inds = contained_dists.argmin(1) # nominal inds to assign  

470 assign_bools = contained_dists.min(1) < 1e20 # center freqs inds to assign 

471 

472 nominal_band_centers[assign_bools] = ansi_nominal[assign_inds[assign_bools]] 

473 

474 return nominal_band_centers, band_lb, band_ub, band_centers