Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / signal_generation.py: 68%

316 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Rattlesnake Vibration Control Software 

4Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

5(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

6Government retains certain rights in this software. 

7 

8This program is free software: you can redistribute it and/or modify 

9it under the terms of the GNU General Public License as published by 

10the Free Software Foundation, either version 3 of the License, or 

11(at your option) any later version. 

12 

13This program is distributed in the hope that it will be useful, 

14but WITHOUT ANY WARRANTY; without even the implied warranty of 

15MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16GNU General Public License for more details. 

17 

18You should have received a copy of the GNU General Public License 

19along with this program. If not, see <https://www.gnu.org/licenses/>. 

20""" 

21 

22from abc import abstractmethod, ABC 

23from enum import Enum 

24 

25import numpy as np 

26import scipy.signal as sig 

27from scipy.stats import norm 

28 

29 

30class SignalTypes(Enum): 

31 """Enumeration of valid types of signals we can generate""" 

32 

33 RANDOM = 0 

34 PSEUDORANDOM = 1 

35 BURST_RANDOM = 2 

36 CHIRP = 3 

37 SINE = 4 

38 SQUARE = 5 

39 CPSD = 6 

40 TRANSIENT = 7 

41 CONTINUOUSTRANSIENT = 8 

42 

43 

44DEBUG = False 

45 

46if DEBUG: 

47 from glob import glob 

48 

49 FILE_OUTPUT = "debug_data/signal_generator_{:}.npz" 

50 

51 

52def cola( 

53 signal_samples: int, 

54 end_samples: int, 

55 signals: np.ndarray, 

56 window_name: str, 

57 window_exponent: float = 0.5, 

58): 

59 """ 

60 Constant Overlap and Addition of signals to blend them together 

61 

62 This function creates long signals of individual realizations by windowing, 

63 overlapping, and adding the signals together. 

64 

65 Parameters 

66 ---------- 

67 signal_samples : int 

68 Number of samples in the overlapped region of the signal. 

69 end_samples : int 

70 Number of samples in the rest of the signal. 

71 signals : np.ndarray 

72 3D numpy array where each of the two rows is a signal that will be 

73 windowed, overlapped, and added. 

74 window_name : str 

75 Name of the window function that will be used to window the signals. 

76 window_exponent : float 

77 Exponent on the window function. Set to 0.5 for constant variance in 

78 the signals (Default Value = 0.5) 

79 

80 Returns 

81 ------- 

82 output : np.ndarray 

83 Combined signal that has been windowed, overlapped, and added. 

84 

85 Notes 

86 ----- 

87 Uses the constant overlap and add process described in [1]_ 

88 

89 .. [1] R. Schultz and G. Nelson, "Input signal synthesis for open-loop 

90 multiple-input/multiple-output testing," Proceedings of the International 

91 Modal Analysis Conference, 2019. 

92 

93 """ 

94 total_samples = signal_samples + end_samples 

95 if window_name == "tukey": 

96 window_name = ("tukey", 2 * (end_samples / total_samples)) 

97 window = sig.get_window(window_name, total_samples, fftbins=True) ** window_exponent 

98 # Create the new signal 

99 last_signal, current_signal = signals 

100 output = current_signal[:, :signal_samples] * window[:signal_samples] 

101 if end_samples > 0: 

102 output[:, :end_samples] += np.array(last_signal)[:, -end_samples:] * window[-end_samples:] 

103 return output 

104 

105 

106def cpsd_to_time_history(cpsd_matrix, sample_rate, df, output_oversample=1): 

107 """Generates a time history realization from a CPSD matrix 

108 

109 Parameters 

110 ---------- 

111 cpsd_matrix : np.ndarray : 

112 A 3D complex np.ndarray representing a CPSD matrix where the first 

113 dimension is the frequency line and the second two dimensions are the 

114 rows and columns of the matrix at each frequency line. 

115 sample_rate: float : 

116 The sample rate of the controller in samples per second 

117 df : float : 

118 The frequency spacing of the cpsd matrix 

119 

120 

121 Returns 

122 ------- 

123 output : np.ndarray : 

124 A numpy array containing the generated signals 

125 

126 Notes 

127 ----- 

128 Uses the process described in [1]_ 

129 

130 .. [1] R. Schultz and G. Nelson, "Input signal synthesis for open-loop 

131 multiple-input/multiple-output testing," Proceedings of the International 

132 Modal Analysis Conference, 2019. 

133 

134 """ 

135 # pylint: disable=invalid-name 

136 # svd_start = time() 

137 # Compute SVD broadcasting over all frequency lines 

138 [U, S, Vh] = np.linalg.svd(cpsd_matrix, full_matrices=False) 

139 # svd_end = time() 

140 # print('SVD Time: {:0.4f}'.format(svd_end-svd_start)) 

141 # Reform using the sqrt of the S matrix 

142 Lsvd = U * np.sqrt(S[:, np.newaxis, :]) @ Vh 

143 # Compute Random Process 

144 W = np.sqrt(0.5) * ( 

145 np.random.randn(*cpsd_matrix.shape[:-1], 1) 

146 + 1j * np.random.randn(*cpsd_matrix.shape[:-1], 1) 

147 ) 

148 Xv = 1 / np.sqrt(df) * Lsvd @ W 

149 # Ensure that the signal is real by setting the nyquist and DC component to 0 

150 Xv[[0, -1], :, :] = 0 

151 # Compute the IFFT, using the real version makes it so you don't need negative frequencies 

152 zero_padding = np.zeros( 

153 [(output_oversample - 1) * (Xv.shape[0] - 1)] + list(Xv.shape[1:]), 

154 dtype=Xv.dtype, 

155 ) 

156 # ifft_start = time() 

157 xv = ( 

158 np.fft.irfft(np.concatenate((Xv, zero_padding), axis=0) / np.sqrt(2), axis=0) 

159 * output_oversample 

160 * sample_rate 

161 ) 

162 # ifft_end = time() 

163 # print('FFT Time: {:0.4f}'.format(ifft_end-ifft_start)) 

164 output = xv[:, :, 0].T 

165 return output 

166 

167 

168class SignalGenerator(ABC): 

169 """Abstract base class showing required methods of each signal generator class""" 

170 

171 @abstractmethod 

172 def generate_frame(self): 

173 """This method generates and returns a frame of data from the signal generator""" 

174 

175 # TODO: update the parameters passing approach to take one argument (like a dictionary) 

176 # so calling signature is maintained in child classes. 

177 def update_parameters(self): 

178 """This method accepts various arguments to update the parameters of the signal generator""" 

179 

180 @property 

181 @abstractmethod 

182 def ready_for_next_output(self): 

183 """This method returns True if the signal generator can currently produce a frame of data""" 

184 

185 

186class RandomSignalGenerator(SignalGenerator): 

187 """Signal generator that produces true random signals using a COLA procedure""" 

188 

189 def __init__( 

190 self, 

191 rms, 

192 sample_rate, 

193 num_samples_per_frame, 

194 num_signals, 

195 low_frequency_cutoff, 

196 high_frequency_cutoff, 

197 cola_overlap, 

198 cola_window, 

199 cola_exponent, 

200 output_oversample, 

201 ): 

202 self.rms = rms 

203 self.sample_rate = sample_rate 

204 self.num_samples = num_samples_per_frame 

205 self.num_signals = num_signals 

206 self.low_frequency_cutoff = 0 if low_frequency_cutoff is None else low_frequency_cutoff 

207 self.high_frequency_cutoff = ( 

208 sample_rate / 2 if high_frequency_cutoff is None else high_frequency_cutoff 

209 ) 

210 self.cola_overlap = cola_overlap 

211 self.cola_window = cola_window.lower() 

212 self.cola_exponent = cola_exponent 

213 self.output_oversample = output_oversample 

214 self.cola_queue = np.zeros((2, self.num_signals, self.num_samples * self.output_oversample)) 

215 

216 # Set up the queue the first time 

217 self.generate_frame() 

218 

219 @property 

220 def samples_per_output(self): 

221 """Property returning the samples per output given the COLA overlap""" 

222 return int(self.num_samples * (1 - self.cola_overlap)) 

223 

224 @property 

225 def overlapped_output_samples(self): 

226 """Property returning the number of output samples that are overlapped.""" 

227 return self.num_samples - self.samples_per_output 

228 

229 @property 

230 def ready_for_next_output(self): 

231 """Random signals are always ready to produce next outputs""" 

232 return True 

233 

234 def generate_frame(self): 

235 """Generates a random frame of data and overlaps and adds it to the previous data""" 

236 # Create a signal 

237 signal = self.rms * np.random.randn( 

238 self.num_signals, self.num_samples * self.output_oversample 

239 ) 

240 # Band limit it 

241 fft = np.fft.rfft(signal, axis=-1) 

242 freq = np.fft.rfftfreq(signal.shape[-1], 1 / (self.sample_rate * self.output_oversample)) 

243 invalid_frequencies = (freq < self.low_frequency_cutoff) | ( 

244 freq > self.high_frequency_cutoff 

245 ) 

246 scale_factor = (~invalid_frequencies).sum() / len(invalid_frequencies) 

247 fft[..., invalid_frequencies] = 0 

248 bandlimited_signal = np.fft.irfft(fft) / np.sqrt(scale_factor) 

249 # Roll the queue 

250 self.cola_queue = np.roll(self.cola_queue, -1, axis=0) 

251 self.cola_queue[-1, ...] = bandlimited_signal 

252 output_signal = cola( 

253 self.samples_per_output * self.output_oversample, 

254 self.overlapped_output_samples * self.output_oversample, 

255 self.cola_queue, 

256 self.cola_window, 

257 self.cola_exponent, 

258 ) 

259 return output_signal, False 

260 

261 

262class PseudorandomSignalGenerator(SignalGenerator): 

263 """Signal generator that produces a periodic signal that looks like a random signal""" 

264 

265 def __init__( 

266 self, 

267 rms, 

268 sample_rate, 

269 num_samples_per_frame, 

270 num_signals, 

271 low_frequency_cutoff, 

272 high_frequency_cutoff, 

273 output_oversample, 

274 ): 

275 freq = np.fft.rfftfreq( 

276 num_samples_per_frame * output_oversample, 

277 1 / (sample_rate * output_oversample), 

278 ) 

279 fft = np.zeros( 

280 (num_signals, num_samples_per_frame * output_oversample // 2 + 1), 

281 dtype=complex, 

282 ) 

283 low_frequency_cutoff = 0 if low_frequency_cutoff is None else low_frequency_cutoff 

284 high_frequency_cutoff = ( 

285 sample_rate / 2 if high_frequency_cutoff is None else high_frequency_cutoff 

286 ) 

287 valid_frequencies = (freq >= low_frequency_cutoff) & (freq <= high_frequency_cutoff) 

288 fft[..., valid_frequencies] = np.exp( 

289 1j * 2 * np.pi * np.random.rand(num_signals, valid_frequencies.sum()) 

290 ) 

291 self.signal = np.fft.irfft(fft) 

292 signal_rms = np.sqrt(np.mean(self.signal**2, axis=-1, keepdims=True)) 

293 self.signal *= rms / signal_rms 

294 

295 def generate_frame(self): 

296 """Generates one realization of the periodic pseudorandom signal""" 

297 return self.signal.copy(), False 

298 

299 @property 

300 def ready_for_next_output(self): 

301 """Pseudorandom signals are always ready, as they just repeat the frame""" 

302 return True 

303 

304 

305class BurstRandomSignalGenerator(SignalGenerator): 

306 """Signal generator that produces a burst random excitation signal""" 

307 

308 def __init__( 

309 self, 

310 rms, 

311 sample_rate, 

312 num_samples_per_frame, 

313 num_signals, 

314 low_frequency_cutoff, 

315 high_frequency_cutoff, 

316 on_fraction, 

317 ramp_fraction, 

318 output_oversample, 

319 ): 

320 self.rms = rms 

321 self.sample_rate = sample_rate 

322 self.num_samples = num_samples_per_frame 

323 self.num_signals = num_signals 

324 self.low_frequency_cutoff = 0 if low_frequency_cutoff is None else low_frequency_cutoff 

325 self.high_frequency_cutoff = ( 

326 sample_rate / 2 if high_frequency_cutoff is None else high_frequency_cutoff 

327 ) 

328 self.on_fraction = on_fraction 

329 if ramp_fraction > 0.5: 

330 raise ValueError("ramp_fraction cannot be more that 0.5") 

331 self.ramp_fraction = ramp_fraction 

332 self.output_oversample = output_oversample 

333 

334 self.envelope = np.zeros(self.num_samples * self.output_oversample) 

335 self.envelope[: self.ramp_samples] = np.linspace(0, 1, self.ramp_samples) 

336 self.envelope[self.ramp_samples : self.ramp_samples + self.on_samples] = 1 

337 self.envelope[ 

338 self.ramp_samples + self.on_samples : self.ramp_samples * 2 + self.on_samples 

339 ] = np.linspace(1, 0, self.ramp_samples) 

340 

341 @property 

342 def ramp_samples(self): 

343 """Property computing how many samples are in the ramp-up and ramp-down of the burst""" 

344 return int( 

345 self.num_samples * self.output_oversample * self.on_fraction * self.ramp_fraction 

346 ) 

347 

348 @property 

349 def on_samples(self): 

350 """Property computing how many samples the burst is active for""" 

351 return int( 

352 self.num_samples * self.output_oversample * self.on_fraction - 2 * self.ramp_samples 

353 ) 

354 

355 @property 

356 def ready_for_next_output(self): 

357 """Burst random is always ready for the next output""" 

358 return True 

359 

360 def generate_frame(self): 

361 """Generates one burst cycle of data""" 

362 # Create a signal 

363 signal = self.rms * np.random.randn( 

364 self.num_signals, self.num_samples * self.output_oversample 

365 ) 

366 # Band limit it 

367 fft = np.fft.rfft(signal, axis=-1) 

368 freq = np.fft.rfftfreq(signal.shape[-1], 1 / (self.sample_rate * self.output_oversample)) 

369 invalid_frequencies = (freq < self.low_frequency_cutoff) | ( 

370 freq > self.high_frequency_cutoff 

371 ) 

372 scale_factor = (~invalid_frequencies).sum() / len(invalid_frequencies) 

373 fft[..., invalid_frequencies] = 0 

374 bandlimited_signal = np.fft.irfft(fft) / np.sqrt(scale_factor) 

375 return bandlimited_signal * self.envelope, False 

376 

377 

378class ChirpSignalGenerator(SignalGenerator): 

379 """Signal generator that generates a periodic fast sine sweep from low to high frequency""" 

380 

381 def __init__( 

382 self, 

383 level, 

384 sample_rate, 

385 num_samples_per_frame, 

386 num_signals, 

387 low_frequency_cutoff, 

388 high_frequency_cutoff, 

389 output_oversample, 

390 ): 

391 times = np.arange(num_samples_per_frame * output_oversample) / ( 

392 sample_rate * output_oversample 

393 ) 

394 signal_length = num_samples_per_frame / sample_rate 

395 n_cycles = np.ceil(high_frequency_cutoff * signal_length) 

396 high_frequency_cutoff = n_cycles / signal_length 

397 frequency_slope = (high_frequency_cutoff - low_frequency_cutoff) / signal_length 

398 argument = frequency_slope / 2 * times**2 + low_frequency_cutoff * times 

399 self.signal = np.tile(level * np.sin(2 * np.pi * argument), (num_signals, 1)) 

400 

401 def generate_frame(self): 

402 """Generates a single realization of the sweep""" 

403 return self.signal.copy(), False 

404 

405 @property 

406 def ready_for_next_output(self): 

407 """Chirp signals are always ready for next output, as they just repeat the same signal""" 

408 return True 

409 

410 

411class SineSignalGenerator(SignalGenerator): 

412 """Signal generator that produces stationary sine signals and tracks the instantaneous phase""" 

413 

414 def __init__( 

415 self, 

416 level, 

417 sample_rate, 

418 num_samples_per_frame, 

419 num_signals, 

420 frequency, 

421 phase, 

422 output_oversample, 

423 ): 

424 self.level = np.broadcast_to(level, (num_signals, 1)).copy() 

425 self.sample_rate = sample_rate 

426 self.num_samples = num_samples_per_frame 

427 self.num_signals = num_signals 

428 self.frequency = None if frequency is None else np.array(frequency, dtype=float) 

429 self.phase = None if phase is None else np.array(phase, dtype=float) 

430 self.output_oversample = output_oversample 

431 self.times = np.arange(self.num_samples * self.output_oversample) / ( 

432 self.sample_rate * self.output_oversample 

433 ) 

434 

435 @property 

436 def phase_per_sample(self): 

437 """Property computing the phase change per sample""" 

438 return 2 * np.pi * self.frequency / self.sample_rate 

439 

440 @property 

441 def phase_per_frame(self): 

442 """Property computing the phase change per frame""" 

443 return self.phase_per_sample * self.num_samples 

444 

445 @property 

446 def ready_for_next_output(self): 

447 """Sine signals are ready for output if all parameters are defined""" 

448 return self.frequency is not None and self.phase is not None 

449 

450 def update_parameters(self, frequency, level, phase=None): 

451 """Updates the parameters of the sinusoidal signal 

452 

453 Parameters 

454 ---------- 

455 frequency : np.ndarray 

456 The new frequencies to use for the sine waves 

457 level : np.ndarray 

458 The new amplitudes to use for the sine waves 

459 phase : np.ndarray, optional 

460 The new phases to use for the sine waves. If not specified, it will not be updated 

461 

462 Notes 

463 ----- 

464 All parameters broadcast when creating the sine signals. The time dimension will be 

465 appended to each of these parameters as a new axis. However, they must consistently 

466 broadcast with one another. 

467 """ 

468 self.frequency[...] = frequency 

469 self.level[...] = level 

470 if phase is not None: 

471 self.phase[...] = phase 

472 

473 def generate_frame(self): 

474 """Generates a frame of sine data while tracking the phase change""" 

475 signal = self.level * np.sin( 

476 2 * np.pi * self.frequency[..., np.newaxis] * self.times + self.phase[..., np.newaxis] 

477 ) 

478 self.phase += self.phase_per_frame 

479 return signal, False 

480 

481 

482class SquareSignalGenerator(SignalGenerator): 

483 """Signal generator that produces a square wave and tracks the instantaneous phase""" 

484 

485 def __init__( 

486 self, 

487 level, 

488 sample_rate, 

489 num_samples_per_frame, 

490 num_signals, 

491 frequency, 

492 phase, 

493 on_fraction, 

494 output_oversample, 

495 ): 

496 self.level = np.broadcast_to(level, (num_signals, 1)).copy() 

497 self.sample_rate = sample_rate 

498 self.num_samples = num_samples_per_frame 

499 self.num_signals = num_signals 

500 self.frequency = None if frequency is None else np.array(frequency, dtype=float) 

501 self.phase = None if phase is None else np.array(phase, dtype=float) 

502 self.on_fraction = on_fraction 

503 self.output_oversample = output_oversample 

504 self.times = np.arange(self.num_samples * self.output_oversample) / ( 

505 self.sample_rate * self.output_oversample 

506 ) 

507 

508 @property 

509 def phase_per_sample(self): 

510 """Computes the phase change per sample based on the frequency""" 

511 return 2 * np.pi * self.frequency / self.sample_rate 

512 

513 @property 

514 def phase_per_frame(self): 

515 """Computes the phase change per frame based on frequency and number of samples""" 

516 return self.phase_per_sample * self.num_samples 

517 

518 @property 

519 def ready_for_next_output(self): 

520 """Square waves are ready for output as long as frequency and phase are defined""" 

521 return self.frequency is not None and self.phase is not None 

522 

523 def update_parameters(self, frequency, phase=None): 

524 """Updates the parameters of the square wave signal 

525 

526 Parameters 

527 ---------- 

528 frequency : np.ndarray 

529 The new frequencies to use for the square waves 

530 phase : np.ndarray, optional 

531 The new phases to use for the square waves. If not specified, it will not be updated 

532 

533 Notes 

534 ----- 

535 All parameters broadcast when creating the sine signals. The time dimension will be 

536 appended to each of these parameters as a new axis. However, they must consistently 

537 broadcast with one another. 

538 """ 

539 self.frequency[...] = frequency 

540 if phase is not None: 

541 self.phase[...] = phase 

542 

543 def generate_frame(self): 

544 """Generates a frame of data while tracking the instantaneous phase change""" 

545 signal = self.level * ( 

546 2 

547 * ( 

548 ( 

549 ( 

550 2 * np.pi * self.frequency[..., np.newaxis] * self.times 

551 + self.phase[..., np.newaxis] 

552 ) 

553 % (2 * np.pi) 

554 ) 

555 < 2 * np.pi * self.on_fraction 

556 ).astype(int) 

557 - 1 

558 ) 

559 self.phase += self.phase_per_frame 

560 return signal, False 

561 

562 

563class CPSDSignalGenerator(SignalGenerator): 

564 """Signal generator that generates time histories satisfying a prescribed CPSD matrix""" 

565 

566 def __init__( 

567 self, 

568 sample_rate, 

569 num_samples_per_frame, 

570 num_signals, 

571 cpsd_matrix, 

572 cola_overlap, 

573 cola_window, 

574 cola_exponent, 

575 sigma_clip, 

576 output_oversample, 

577 ): 

578 self.sample_rate = sample_rate 

579 self.num_samples = num_samples_per_frame 

580 self.num_signals = num_signals 

581 if sigma_clip is None: 

582 self.sigma_clip = None 

583 elif isinstance(sigma_clip, np.ndarray): 

584 self.sigma_clip = sigma_clip.squeeze()[:, np.newaxis] # force to n x 1 array 

585 if np.all(self.sigma_clip >= 5.0): 

586 self.sigma_clip = None 

587 elif isinstance(sigma_clip, (int, float)): 

588 self.sigma_clip = sigma_clip 

589 if self.sigma_clip >= 5.0: 

590 self.sigma_clip = None 

591 self.update_parameters(cpsd_matrix) 

592 self.cola_overlap = cola_overlap 

593 self.cola_window = cola_window.lower() 

594 self.cola_exponent = cola_exponent 

595 self.output_oversample = output_oversample 

596 self.cola_queue = np.zeros((2, self.num_signals, self.num_samples * self.output_oversample)) 

597 self.cola_initialized = False 

598 

599 @property 

600 def samples_per_output(self): 

601 """Property returning the samples per output given the COLA overlap""" 

602 return int(self.num_samples * (1 - self.cola_overlap)) 

603 

604 @property 

605 def overlapped_output_samples(self): 

606 """Property returning the number of output samples that are overlapped.""" 

607 return self.num_samples - self.samples_per_output 

608 

609 @property 

610 def frequency_spacing(self): 

611 """Property returning frequency line spacing given the sampling parameters""" 

612 return self.sample_rate / self.num_samples 

613 

614 @property 

615 def ready_for_next_output(self): 

616 """Ready for output as long as the CPSD matrix we are targetting is defined""" 

617 return self.cpsd_matrix is not None 

618 

619 def update_parameters(self, cpsd_matrix): 

620 """Updates the CPSD target 

621 

622 Parameters 

623 ---------- 

624 cpsd_matrix : np.ndarray 

625 A 3D CPSD matrix that will be matched by the time histories being generated 

626 """ 

627 # pylint: disable=invalid-name 

628 self.cpsd_matrix = cpsd_matrix 

629 if self.cpsd_matrix is None: 

630 self.Lsvd = None 

631 return 

632 # Determine rms and rescaling factors for sigma clipping (rescale factors used to 

633 # maintain rms levels when using low clipping thresholds) 

634 self._rms = ( 

635 np.trapz( # TODO: This is deprecated, fix to use Trapezoid 

636 self.cpsd_matrix.diagonal(axis1=1, axis2=2), 

637 np.arange(self.cpsd_matrix.shape[0]) * self.frequency_spacing, 

638 axis=0, 

639 ) 

640 ** 0.5 

641 )[:, np.newaxis] 

642 if self.sigma_clip is None: 

643 self._scale_factor = None 

644 else: 

645 # this is based on a curve fit between clipping threshold and 

646 # rms error: [-1/(x + 0.5)^3 + 1] (less effective at lower clipping threshold) 

647 self._scale_factor = -1 / (self.sigma_clip + 0.5) ** 3 + 1 

648 self._size = (*self.cpsd_matrix.shape[:-1], 1) 

649 # svd_start = time() 

650 # Compute SVD broadcasting over all frequency lines 

651 [U, S, Vh] = np.linalg.svd(cpsd_matrix, full_matrices=False) 

652 # svd_end = time() 

653 # print('SVD Time: {:0.4f}'.format(svd_end-svd_start)) 

654 # Reform using the sqrt of the S matrix 

655 self.Lsvd = U * np.sqrt(S[:, np.newaxis, :]) @ Vh 

656 

657 def rejection_sample(self, size, threshold=None) -> np.ndarray: 

658 """Handles sigma clipping for the randomly generated signals""" 

659 # `size` should be (n_samples x n_channels x 1) 

660 # (this is the size needed to add to the cola queue) 

661 if threshold is None: 

662 return 

663 oversample = np.max(size[0] + np.ceil((1 - norm.cdf(threshold)) * 100 * size[0])).astype( 

664 int 

665 ) 

666 # arr needs to be (n_channels x n_samples) (so that when we mask it, 

667 # it gets flattened in the right order) 

668 arr = np.random.randn(size[1], oversample) 

669 mask = np.abs(arr) <= threshold 

670 # total number of samples rejected for each channel 

671 num_rejected = np.cumsum(np.sum(~mask, axis=1)) 

672 # roll forward by 1 and set first value to zero 

673 num_rejected = np.roll(num_rejected, 1, axis=0) 

674 num_rejected[0] = 0 

675 # starting indices from original arr after masked values are removed 

676 shifted_indices = ( 

677 np.array([oversample * j for j in range(size[1])], dtype=int) - num_rejected 

678 ) 

679 indices = np.concatenate([np.arange(ind, ind + size[0]) for ind in shifted_indices]) 

680 # pull out masked values, reshape in correct order, and swapaxes to match dims of `size` 

681 return arr[mask][indices].reshape((size[1], size[0], size[2])).swapaxes(0, 1) 

682 

683 def generate_frame(self): 

684 """Generates a single frame of data and overlaps it with the previous frame of data""" 

685 if not self.cola_initialized: 

686 self.cola_initialized = True 

687 self.generate_frame() 

688 # Create a signal 

689 if self.sigma_clip is None: 

690 real = np.random.randn(*self._size) 

691 imag = np.random.randn(*self._size) 

692 else: 

693 # Apply sigma clipping via rejection sampling 

694 # (apply correction factor to attempt to preserve rms levels) 

695 real = self.rejection_sample(self._size, self.sigma_clip) / self._scale_factor 

696 imag = self.rejection_sample(self._size, self.sigma_clip) / self._scale_factor 

697 # print('after ', len(real), len(imag)) 

698 # Compute Random Process 

699 W = np.sqrt(0.5) * (real + 1j * imag) # pylint: disable=invalid-name 

700 Xv = 1 / np.sqrt(self.frequency_spacing) * self.Lsvd @ W # pylint: disable=invalid-name 

701 # Ensure that the signal is real by setting the nyquist and DC component to 0 

702 Xv[[0, -1], :, :] = 0 

703 # Compute the IFFT, using the real version makes it so you don't need negative frequencies 

704 zero_padding = np.zeros( 

705 [(self.output_oversample - 1) * (Xv.shape[0] - 1)] + list(Xv.shape[1:]), 

706 dtype=Xv.dtype, 

707 ) 

708 # ifft_start = time() 

709 xv = ( 

710 np.fft.irfft(np.concatenate((Xv, zero_padding), axis=0) / np.sqrt(2), axis=0) 

711 * self.output_oversample 

712 * self.sample_rate 

713 ) 

714 # ifft_end = time() 

715 # print('FFT Time: {:0.4f}'.format(ifft_end-ifft_start)) 

716 signal = xv[:, :, 0].T 

717 # Band limit it 

718 # Roll the queue 

719 self.cola_queue = np.roll(self.cola_queue, -1, axis=0) 

720 self.cola_queue[-1, ...] = signal 

721 output_signal = cola( 

722 self.samples_per_output * self.output_oversample, 

723 self.overlapped_output_samples * self.output_oversample, 

724 self.cola_queue, 

725 self.cola_window, 

726 self.cola_exponent, 

727 ) 

728 

729 # mirror tail ends of the distribution about the sigma clipping threshold 

730 if self.sigma_clip is not None: 

731 mask = np.abs(output_signal) >= (self._rms * self.sigma_clip) 

732 if len(mask) > 0: 

733 twosigma = np.sign(output_signal).real * self._rms.real * self.sigma_clip * 2 

734 output_signal[mask] *= -1 

735 output_signal[mask] += twosigma[mask] 

736 return output_signal, False 

737 

738 

739class ContinuousTransientSignalGenerator(SignalGenerator): 

740 """Signal generator that constantly receives data to later generate""" 

741 

742 def __init__(self, num_samples_per_frame, num_signals, signal, last_signal): 

743 self.num_samples = num_samples_per_frame 

744 self.num_signals = num_signals 

745 self.signal = np.zeros((self.num_signals, 0)) if signal is None else signal 

746 self.no_more_signal_incoming = last_signal 

747 

748 @property 

749 def ready_for_next_output(self): 

750 """Ready to output if there is enough data on the signal buffer to create a frame""" 

751 return self.signal.shape[-1] >= self.num_samples or self.no_more_signal_incoming 

752 

753 def update_parameters(self, signal, last_signal): 

754 """Updates the parameters of the transient signal generator 

755 

756 Parameters 

757 ---------- 

758 signal : np.ndarray 

759 New portions of signals to add to the output signal buffer 

760 last_signal : bool 

761 True if this is the last signal that will be given to the signal generator 

762 """ 

763 self.signal = np.concatenate((self.signal, signal), axis=-1) 

764 self.no_more_signal_incoming = last_signal 

765 

766 def generate_frame(self): 

767 """Generates a frame of data and a flag letting the caller know the signal is done""" 

768 output_signal = self.signal[..., : self.num_samples] 

769 self.signal = self.signal[..., self.num_samples :] 

770 if DEBUG: 

771 num_files = len(glob(FILE_OUTPUT.format("*"))) 

772 np.savez( 

773 FILE_OUTPUT.format(num_files), 

774 output_signal=output_signal, 

775 last_signal=(self.no_more_signal_incoming and self.signal.shape[-1] == 0), 

776 ) 

777 return output_signal, (self.no_more_signal_incoming and self.signal.shape[-1] == 0) 

778 

779 

780class TransientSignalGenerator(SignalGenerator): 

781 """Signal generator to generate a specified signal""" 

782 

783 def __init__(self, signal, repeat): 

784 self.signal = signal 

785 self.repeat = repeat 

786 

787 @property 

788 def ready_for_next_output(self): 

789 """Ready for output if the signal is defined""" 

790 return self.signal is not None 

791 

792 def update_parameters(self, signal, repeat): 

793 """Updates with a new signal and a flag to repeat the signal or not""" 

794 self.signal = signal 

795 self.repeat = repeat 

796 

797 def generate_frame(self): 

798 """Generates the signal. The done flag will be set if the signal is not repeating""" 

799 return self.signal, not self.repeat