Coverage for / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / signal_generation.py: 68%
316 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
1# -*- coding: utf-8 -*-
2"""
3Rattlesnake Vibration Control Software
4Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
5(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
6Government retains certain rights in this software.
8This program is free software: you can redistribute it and/or modify
9it under the terms of the GNU General Public License as published by
10the Free Software Foundation, either version 3 of the License, or
11(at your option) any later version.
13This program is distributed in the hope that it will be useful,
14but WITHOUT ANY WARRANTY; without even the implied warranty of
15MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16GNU General Public License for more details.
18You should have received a copy of the GNU General Public License
19along with this program. If not, see <https://www.gnu.org/licenses/>.
20"""
22from abc import abstractmethod, ABC
23from enum import Enum
25import numpy as np
26import scipy.signal as sig
27from scipy.stats import norm
30class SignalTypes(Enum):
31 """Enumeration of valid types of signals we can generate"""
33 RANDOM = 0
34 PSEUDORANDOM = 1
35 BURST_RANDOM = 2
36 CHIRP = 3
37 SINE = 4
38 SQUARE = 5
39 CPSD = 6
40 TRANSIENT = 7
41 CONTINUOUSTRANSIENT = 8
44DEBUG = False
46if DEBUG:
47 from glob import glob
49 FILE_OUTPUT = "debug_data/signal_generator_{:}.npz"
52def cola(
53 signal_samples: int,
54 end_samples: int,
55 signals: np.ndarray,
56 window_name: str,
57 window_exponent: float = 0.5,
58):
59 """
60 Constant Overlap and Addition of signals to blend them together
62 This function creates long signals of individual realizations by windowing,
63 overlapping, and adding the signals together.
65 Parameters
66 ----------
67 signal_samples : int
68 Number of samples in the overlapped region of the signal.
69 end_samples : int
70 Number of samples in the rest of the signal.
71 signals : np.ndarray
72 3D numpy array where each of the two rows is a signal that will be
73 windowed, overlapped, and added.
74 window_name : str
75 Name of the window function that will be used to window the signals.
76 window_exponent : float
77 Exponent on the window function. Set to 0.5 for constant variance in
78 the signals (Default Value = 0.5)
80 Returns
81 -------
82 output : np.ndarray
83 Combined signal that has been windowed, overlapped, and added.
85 Notes
86 -----
87 Uses the constant overlap and add process described in [1]_
89 .. [1] R. Schultz and G. Nelson, "Input signal synthesis for open-loop
90 multiple-input/multiple-output testing," Proceedings of the International
91 Modal Analysis Conference, 2019.
93 """
94 total_samples = signal_samples + end_samples
95 if window_name == "tukey":
96 window_name = ("tukey", 2 * (end_samples / total_samples))
97 window = sig.get_window(window_name, total_samples, fftbins=True) ** window_exponent
98 # Create the new signal
99 last_signal, current_signal = signals
100 output = current_signal[:, :signal_samples] * window[:signal_samples]
101 if end_samples > 0:
102 output[:, :end_samples] += np.array(last_signal)[:, -end_samples:] * window[-end_samples:]
103 return output
106def cpsd_to_time_history(cpsd_matrix, sample_rate, df, output_oversample=1):
107 """Generates a time history realization from a CPSD matrix
109 Parameters
110 ----------
111 cpsd_matrix : np.ndarray :
112 A 3D complex np.ndarray representing a CPSD matrix where the first
113 dimension is the frequency line and the second two dimensions are the
114 rows and columns of the matrix at each frequency line.
115 sample_rate: float :
116 The sample rate of the controller in samples per second
117 df : float :
118 The frequency spacing of the cpsd matrix
121 Returns
122 -------
123 output : np.ndarray :
124 A numpy array containing the generated signals
126 Notes
127 -----
128 Uses the process described in [1]_
130 .. [1] R. Schultz and G. Nelson, "Input signal synthesis for open-loop
131 multiple-input/multiple-output testing," Proceedings of the International
132 Modal Analysis Conference, 2019.
134 """
135 # pylint: disable=invalid-name
136 # svd_start = time()
137 # Compute SVD broadcasting over all frequency lines
138 [U, S, Vh] = np.linalg.svd(cpsd_matrix, full_matrices=False)
139 # svd_end = time()
140 # print('SVD Time: {:0.4f}'.format(svd_end-svd_start))
141 # Reform using the sqrt of the S matrix
142 Lsvd = U * np.sqrt(S[:, np.newaxis, :]) @ Vh
143 # Compute Random Process
144 W = np.sqrt(0.5) * (
145 np.random.randn(*cpsd_matrix.shape[:-1], 1)
146 + 1j * np.random.randn(*cpsd_matrix.shape[:-1], 1)
147 )
148 Xv = 1 / np.sqrt(df) * Lsvd @ W
149 # Ensure that the signal is real by setting the nyquist and DC component to 0
150 Xv[[0, -1], :, :] = 0
151 # Compute the IFFT, using the real version makes it so you don't need negative frequencies
152 zero_padding = np.zeros(
153 [(output_oversample - 1) * (Xv.shape[0] - 1)] + list(Xv.shape[1:]),
154 dtype=Xv.dtype,
155 )
156 # ifft_start = time()
157 xv = (
158 np.fft.irfft(np.concatenate((Xv, zero_padding), axis=0) / np.sqrt(2), axis=0)
159 * output_oversample
160 * sample_rate
161 )
162 # ifft_end = time()
163 # print('FFT Time: {:0.4f}'.format(ifft_end-ifft_start))
164 output = xv[:, :, 0].T
165 return output
168class SignalGenerator(ABC):
169 """Abstract base class showing required methods of each signal generator class"""
171 @abstractmethod
172 def generate_frame(self):
173 """This method generates and returns a frame of data from the signal generator"""
175 # TODO: update the parameters passing approach to take one argument (like a dictionary)
176 # so calling signature is maintained in child classes.
177 def update_parameters(self):
178 """This method accepts various arguments to update the parameters of the signal generator"""
180 @property
181 @abstractmethod
182 def ready_for_next_output(self):
183 """This method returns True if the signal generator can currently produce a frame of data"""
186class RandomSignalGenerator(SignalGenerator):
187 """Signal generator that produces true random signals using a COLA procedure"""
189 def __init__(
190 self,
191 rms,
192 sample_rate,
193 num_samples_per_frame,
194 num_signals,
195 low_frequency_cutoff,
196 high_frequency_cutoff,
197 cola_overlap,
198 cola_window,
199 cola_exponent,
200 output_oversample,
201 ):
202 self.rms = rms
203 self.sample_rate = sample_rate
204 self.num_samples = num_samples_per_frame
205 self.num_signals = num_signals
206 self.low_frequency_cutoff = 0 if low_frequency_cutoff is None else low_frequency_cutoff
207 self.high_frequency_cutoff = (
208 sample_rate / 2 if high_frequency_cutoff is None else high_frequency_cutoff
209 )
210 self.cola_overlap = cola_overlap
211 self.cola_window = cola_window.lower()
212 self.cola_exponent = cola_exponent
213 self.output_oversample = output_oversample
214 self.cola_queue = np.zeros((2, self.num_signals, self.num_samples * self.output_oversample))
216 # Set up the queue the first time
217 self.generate_frame()
219 @property
220 def samples_per_output(self):
221 """Property returning the samples per output given the COLA overlap"""
222 return int(self.num_samples * (1 - self.cola_overlap))
224 @property
225 def overlapped_output_samples(self):
226 """Property returning the number of output samples that are overlapped."""
227 return self.num_samples - self.samples_per_output
229 @property
230 def ready_for_next_output(self):
231 """Random signals are always ready to produce next outputs"""
232 return True
234 def generate_frame(self):
235 """Generates a random frame of data and overlaps and adds it to the previous data"""
236 # Create a signal
237 signal = self.rms * np.random.randn(
238 self.num_signals, self.num_samples * self.output_oversample
239 )
240 # Band limit it
241 fft = np.fft.rfft(signal, axis=-1)
242 freq = np.fft.rfftfreq(signal.shape[-1], 1 / (self.sample_rate * self.output_oversample))
243 invalid_frequencies = (freq < self.low_frequency_cutoff) | (
244 freq > self.high_frequency_cutoff
245 )
246 scale_factor = (~invalid_frequencies).sum() / len(invalid_frequencies)
247 fft[..., invalid_frequencies] = 0
248 bandlimited_signal = np.fft.irfft(fft) / np.sqrt(scale_factor)
249 # Roll the queue
250 self.cola_queue = np.roll(self.cola_queue, -1, axis=0)
251 self.cola_queue[-1, ...] = bandlimited_signal
252 output_signal = cola(
253 self.samples_per_output * self.output_oversample,
254 self.overlapped_output_samples * self.output_oversample,
255 self.cola_queue,
256 self.cola_window,
257 self.cola_exponent,
258 )
259 return output_signal, False
262class PseudorandomSignalGenerator(SignalGenerator):
263 """Signal generator that produces a periodic signal that looks like a random signal"""
265 def __init__(
266 self,
267 rms,
268 sample_rate,
269 num_samples_per_frame,
270 num_signals,
271 low_frequency_cutoff,
272 high_frequency_cutoff,
273 output_oversample,
274 ):
275 freq = np.fft.rfftfreq(
276 num_samples_per_frame * output_oversample,
277 1 / (sample_rate * output_oversample),
278 )
279 fft = np.zeros(
280 (num_signals, num_samples_per_frame * output_oversample // 2 + 1),
281 dtype=complex,
282 )
283 low_frequency_cutoff = 0 if low_frequency_cutoff is None else low_frequency_cutoff
284 high_frequency_cutoff = (
285 sample_rate / 2 if high_frequency_cutoff is None else high_frequency_cutoff
286 )
287 valid_frequencies = (freq >= low_frequency_cutoff) & (freq <= high_frequency_cutoff)
288 fft[..., valid_frequencies] = np.exp(
289 1j * 2 * np.pi * np.random.rand(num_signals, valid_frequencies.sum())
290 )
291 self.signal = np.fft.irfft(fft)
292 signal_rms = np.sqrt(np.mean(self.signal**2, axis=-1, keepdims=True))
293 self.signal *= rms / signal_rms
295 def generate_frame(self):
296 """Generates one realization of the periodic pseudorandom signal"""
297 return self.signal.copy(), False
299 @property
300 def ready_for_next_output(self):
301 """Pseudorandom signals are always ready, as they just repeat the frame"""
302 return True
305class BurstRandomSignalGenerator(SignalGenerator):
306 """Signal generator that produces a burst random excitation signal"""
308 def __init__(
309 self,
310 rms,
311 sample_rate,
312 num_samples_per_frame,
313 num_signals,
314 low_frequency_cutoff,
315 high_frequency_cutoff,
316 on_fraction,
317 ramp_fraction,
318 output_oversample,
319 ):
320 self.rms = rms
321 self.sample_rate = sample_rate
322 self.num_samples = num_samples_per_frame
323 self.num_signals = num_signals
324 self.low_frequency_cutoff = 0 if low_frequency_cutoff is None else low_frequency_cutoff
325 self.high_frequency_cutoff = (
326 sample_rate / 2 if high_frequency_cutoff is None else high_frequency_cutoff
327 )
328 self.on_fraction = on_fraction
329 if ramp_fraction > 0.5:
330 raise ValueError("ramp_fraction cannot be more that 0.5")
331 self.ramp_fraction = ramp_fraction
332 self.output_oversample = output_oversample
334 self.envelope = np.zeros(self.num_samples * self.output_oversample)
335 self.envelope[: self.ramp_samples] = np.linspace(0, 1, self.ramp_samples)
336 self.envelope[self.ramp_samples : self.ramp_samples + self.on_samples] = 1
337 self.envelope[
338 self.ramp_samples + self.on_samples : self.ramp_samples * 2 + self.on_samples
339 ] = np.linspace(1, 0, self.ramp_samples)
341 @property
342 def ramp_samples(self):
343 """Property computing how many samples are in the ramp-up and ramp-down of the burst"""
344 return int(
345 self.num_samples * self.output_oversample * self.on_fraction * self.ramp_fraction
346 )
348 @property
349 def on_samples(self):
350 """Property computing how many samples the burst is active for"""
351 return int(
352 self.num_samples * self.output_oversample * self.on_fraction - 2 * self.ramp_samples
353 )
355 @property
356 def ready_for_next_output(self):
357 """Burst random is always ready for the next output"""
358 return True
360 def generate_frame(self):
361 """Generates one burst cycle of data"""
362 # Create a signal
363 signal = self.rms * np.random.randn(
364 self.num_signals, self.num_samples * self.output_oversample
365 )
366 # Band limit it
367 fft = np.fft.rfft(signal, axis=-1)
368 freq = np.fft.rfftfreq(signal.shape[-1], 1 / (self.sample_rate * self.output_oversample))
369 invalid_frequencies = (freq < self.low_frequency_cutoff) | (
370 freq > self.high_frequency_cutoff
371 )
372 scale_factor = (~invalid_frequencies).sum() / len(invalid_frequencies)
373 fft[..., invalid_frequencies] = 0
374 bandlimited_signal = np.fft.irfft(fft) / np.sqrt(scale_factor)
375 return bandlimited_signal * self.envelope, False
378class ChirpSignalGenerator(SignalGenerator):
379 """Signal generator that generates a periodic fast sine sweep from low to high frequency"""
381 def __init__(
382 self,
383 level,
384 sample_rate,
385 num_samples_per_frame,
386 num_signals,
387 low_frequency_cutoff,
388 high_frequency_cutoff,
389 output_oversample,
390 ):
391 times = np.arange(num_samples_per_frame * output_oversample) / (
392 sample_rate * output_oversample
393 )
394 signal_length = num_samples_per_frame / sample_rate
395 n_cycles = np.ceil(high_frequency_cutoff * signal_length)
396 high_frequency_cutoff = n_cycles / signal_length
397 frequency_slope = (high_frequency_cutoff - low_frequency_cutoff) / signal_length
398 argument = frequency_slope / 2 * times**2 + low_frequency_cutoff * times
399 self.signal = np.tile(level * np.sin(2 * np.pi * argument), (num_signals, 1))
401 def generate_frame(self):
402 """Generates a single realization of the sweep"""
403 return self.signal.copy(), False
405 @property
406 def ready_for_next_output(self):
407 """Chirp signals are always ready for next output, as they just repeat the same signal"""
408 return True
411class SineSignalGenerator(SignalGenerator):
412 """Signal generator that produces stationary sine signals and tracks the instantaneous phase"""
414 def __init__(
415 self,
416 level,
417 sample_rate,
418 num_samples_per_frame,
419 num_signals,
420 frequency,
421 phase,
422 output_oversample,
423 ):
424 self.level = np.broadcast_to(level, (num_signals, 1)).copy()
425 self.sample_rate = sample_rate
426 self.num_samples = num_samples_per_frame
427 self.num_signals = num_signals
428 self.frequency = None if frequency is None else np.array(frequency, dtype=float)
429 self.phase = None if phase is None else np.array(phase, dtype=float)
430 self.output_oversample = output_oversample
431 self.times = np.arange(self.num_samples * self.output_oversample) / (
432 self.sample_rate * self.output_oversample
433 )
435 @property
436 def phase_per_sample(self):
437 """Property computing the phase change per sample"""
438 return 2 * np.pi * self.frequency / self.sample_rate
440 @property
441 def phase_per_frame(self):
442 """Property computing the phase change per frame"""
443 return self.phase_per_sample * self.num_samples
445 @property
446 def ready_for_next_output(self):
447 """Sine signals are ready for output if all parameters are defined"""
448 return self.frequency is not None and self.phase is not None
450 def update_parameters(self, frequency, level, phase=None):
451 """Updates the parameters of the sinusoidal signal
453 Parameters
454 ----------
455 frequency : np.ndarray
456 The new frequencies to use for the sine waves
457 level : np.ndarray
458 The new amplitudes to use for the sine waves
459 phase : np.ndarray, optional
460 The new phases to use for the sine waves. If not specified, it will not be updated
462 Notes
463 -----
464 All parameters broadcast when creating the sine signals. The time dimension will be
465 appended to each of these parameters as a new axis. However, they must consistently
466 broadcast with one another.
467 """
468 self.frequency[...] = frequency
469 self.level[...] = level
470 if phase is not None:
471 self.phase[...] = phase
473 def generate_frame(self):
474 """Generates a frame of sine data while tracking the phase change"""
475 signal = self.level * np.sin(
476 2 * np.pi * self.frequency[..., np.newaxis] * self.times + self.phase[..., np.newaxis]
477 )
478 self.phase += self.phase_per_frame
479 return signal, False
482class SquareSignalGenerator(SignalGenerator):
483 """Signal generator that produces a square wave and tracks the instantaneous phase"""
485 def __init__(
486 self,
487 level,
488 sample_rate,
489 num_samples_per_frame,
490 num_signals,
491 frequency,
492 phase,
493 on_fraction,
494 output_oversample,
495 ):
496 self.level = np.broadcast_to(level, (num_signals, 1)).copy()
497 self.sample_rate = sample_rate
498 self.num_samples = num_samples_per_frame
499 self.num_signals = num_signals
500 self.frequency = None if frequency is None else np.array(frequency, dtype=float)
501 self.phase = None if phase is None else np.array(phase, dtype=float)
502 self.on_fraction = on_fraction
503 self.output_oversample = output_oversample
504 self.times = np.arange(self.num_samples * self.output_oversample) / (
505 self.sample_rate * self.output_oversample
506 )
508 @property
509 def phase_per_sample(self):
510 """Computes the phase change per sample based on the frequency"""
511 return 2 * np.pi * self.frequency / self.sample_rate
513 @property
514 def phase_per_frame(self):
515 """Computes the phase change per frame based on frequency and number of samples"""
516 return self.phase_per_sample * self.num_samples
518 @property
519 def ready_for_next_output(self):
520 """Square waves are ready for output as long as frequency and phase are defined"""
521 return self.frequency is not None and self.phase is not None
523 def update_parameters(self, frequency, phase=None):
524 """Updates the parameters of the square wave signal
526 Parameters
527 ----------
528 frequency : np.ndarray
529 The new frequencies to use for the square waves
530 phase : np.ndarray, optional
531 The new phases to use for the square waves. If not specified, it will not be updated
533 Notes
534 -----
535 All parameters broadcast when creating the sine signals. The time dimension will be
536 appended to each of these parameters as a new axis. However, they must consistently
537 broadcast with one another.
538 """
539 self.frequency[...] = frequency
540 if phase is not None:
541 self.phase[...] = phase
543 def generate_frame(self):
544 """Generates a frame of data while tracking the instantaneous phase change"""
545 signal = self.level * (
546 2
547 * (
548 (
549 (
550 2 * np.pi * self.frequency[..., np.newaxis] * self.times
551 + self.phase[..., np.newaxis]
552 )
553 % (2 * np.pi)
554 )
555 < 2 * np.pi * self.on_fraction
556 ).astype(int)
557 - 1
558 )
559 self.phase += self.phase_per_frame
560 return signal, False
563class CPSDSignalGenerator(SignalGenerator):
564 """Signal generator that generates time histories satisfying a prescribed CPSD matrix"""
566 def __init__(
567 self,
568 sample_rate,
569 num_samples_per_frame,
570 num_signals,
571 cpsd_matrix,
572 cola_overlap,
573 cola_window,
574 cola_exponent,
575 sigma_clip,
576 output_oversample,
577 ):
578 self.sample_rate = sample_rate
579 self.num_samples = num_samples_per_frame
580 self.num_signals = num_signals
581 if sigma_clip is None:
582 self.sigma_clip = None
583 elif isinstance(sigma_clip, np.ndarray):
584 self.sigma_clip = sigma_clip.squeeze()[:, np.newaxis] # force to n x 1 array
585 if np.all(self.sigma_clip >= 5.0):
586 self.sigma_clip = None
587 elif isinstance(sigma_clip, (int, float)):
588 self.sigma_clip = sigma_clip
589 if self.sigma_clip >= 5.0:
590 self.sigma_clip = None
591 self.update_parameters(cpsd_matrix)
592 self.cola_overlap = cola_overlap
593 self.cola_window = cola_window.lower()
594 self.cola_exponent = cola_exponent
595 self.output_oversample = output_oversample
596 self.cola_queue = np.zeros((2, self.num_signals, self.num_samples * self.output_oversample))
597 self.cola_initialized = False
599 @property
600 def samples_per_output(self):
601 """Property returning the samples per output given the COLA overlap"""
602 return int(self.num_samples * (1 - self.cola_overlap))
604 @property
605 def overlapped_output_samples(self):
606 """Property returning the number of output samples that are overlapped."""
607 return self.num_samples - self.samples_per_output
609 @property
610 def frequency_spacing(self):
611 """Property returning frequency line spacing given the sampling parameters"""
612 return self.sample_rate / self.num_samples
614 @property
615 def ready_for_next_output(self):
616 """Ready for output as long as the CPSD matrix we are targetting is defined"""
617 return self.cpsd_matrix is not None
619 def update_parameters(self, cpsd_matrix):
620 """Updates the CPSD target
622 Parameters
623 ----------
624 cpsd_matrix : np.ndarray
625 A 3D CPSD matrix that will be matched by the time histories being generated
626 """
627 # pylint: disable=invalid-name
628 self.cpsd_matrix = cpsd_matrix
629 if self.cpsd_matrix is None:
630 self.Lsvd = None
631 return
632 # Determine rms and rescaling factors for sigma clipping (rescale factors used to
633 # maintain rms levels when using low clipping thresholds)
634 self._rms = (
635 np.trapz( # TODO: This is deprecated, fix to use Trapezoid
636 self.cpsd_matrix.diagonal(axis1=1, axis2=2),
637 np.arange(self.cpsd_matrix.shape[0]) * self.frequency_spacing,
638 axis=0,
639 )
640 ** 0.5
641 )[:, np.newaxis]
642 if self.sigma_clip is None:
643 self._scale_factor = None
644 else:
645 # this is based on a curve fit between clipping threshold and
646 # rms error: [-1/(x + 0.5)^3 + 1] (less effective at lower clipping threshold)
647 self._scale_factor = -1 / (self.sigma_clip + 0.5) ** 3 + 1
648 self._size = (*self.cpsd_matrix.shape[:-1], 1)
649 # svd_start = time()
650 # Compute SVD broadcasting over all frequency lines
651 [U, S, Vh] = np.linalg.svd(cpsd_matrix, full_matrices=False)
652 # svd_end = time()
653 # print('SVD Time: {:0.4f}'.format(svd_end-svd_start))
654 # Reform using the sqrt of the S matrix
655 self.Lsvd = U * np.sqrt(S[:, np.newaxis, :]) @ Vh
657 def rejection_sample(self, size, threshold=None) -> np.ndarray:
658 """Handles sigma clipping for the randomly generated signals"""
659 # `size` should be (n_samples x n_channels x 1)
660 # (this is the size needed to add to the cola queue)
661 if threshold is None:
662 return
663 oversample = np.max(size[0] + np.ceil((1 - norm.cdf(threshold)) * 100 * size[0])).astype(
664 int
665 )
666 # arr needs to be (n_channels x n_samples) (so that when we mask it,
667 # it gets flattened in the right order)
668 arr = np.random.randn(size[1], oversample)
669 mask = np.abs(arr) <= threshold
670 # total number of samples rejected for each channel
671 num_rejected = np.cumsum(np.sum(~mask, axis=1))
672 # roll forward by 1 and set first value to zero
673 num_rejected = np.roll(num_rejected, 1, axis=0)
674 num_rejected[0] = 0
675 # starting indices from original arr after masked values are removed
676 shifted_indices = (
677 np.array([oversample * j for j in range(size[1])], dtype=int) - num_rejected
678 )
679 indices = np.concatenate([np.arange(ind, ind + size[0]) for ind in shifted_indices])
680 # pull out masked values, reshape in correct order, and swapaxes to match dims of `size`
681 return arr[mask][indices].reshape((size[1], size[0], size[2])).swapaxes(0, 1)
683 def generate_frame(self):
684 """Generates a single frame of data and overlaps it with the previous frame of data"""
685 if not self.cola_initialized:
686 self.cola_initialized = True
687 self.generate_frame()
688 # Create a signal
689 if self.sigma_clip is None:
690 real = np.random.randn(*self._size)
691 imag = np.random.randn(*self._size)
692 else:
693 # Apply sigma clipping via rejection sampling
694 # (apply correction factor to attempt to preserve rms levels)
695 real = self.rejection_sample(self._size, self.sigma_clip) / self._scale_factor
696 imag = self.rejection_sample(self._size, self.sigma_clip) / self._scale_factor
697 # print('after ', len(real), len(imag))
698 # Compute Random Process
699 W = np.sqrt(0.5) * (real + 1j * imag) # pylint: disable=invalid-name
700 Xv = 1 / np.sqrt(self.frequency_spacing) * self.Lsvd @ W # pylint: disable=invalid-name
701 # Ensure that the signal is real by setting the nyquist and DC component to 0
702 Xv[[0, -1], :, :] = 0
703 # Compute the IFFT, using the real version makes it so you don't need negative frequencies
704 zero_padding = np.zeros(
705 [(self.output_oversample - 1) * (Xv.shape[0] - 1)] + list(Xv.shape[1:]),
706 dtype=Xv.dtype,
707 )
708 # ifft_start = time()
709 xv = (
710 np.fft.irfft(np.concatenate((Xv, zero_padding), axis=0) / np.sqrt(2), axis=0)
711 * self.output_oversample
712 * self.sample_rate
713 )
714 # ifft_end = time()
715 # print('FFT Time: {:0.4f}'.format(ifft_end-ifft_start))
716 signal = xv[:, :, 0].T
717 # Band limit it
718 # Roll the queue
719 self.cola_queue = np.roll(self.cola_queue, -1, axis=0)
720 self.cola_queue[-1, ...] = signal
721 output_signal = cola(
722 self.samples_per_output * self.output_oversample,
723 self.overlapped_output_samples * self.output_oversample,
724 self.cola_queue,
725 self.cola_window,
726 self.cola_exponent,
727 )
729 # mirror tail ends of the distribution about the sigma clipping threshold
730 if self.sigma_clip is not None:
731 mask = np.abs(output_signal) >= (self._rms * self.sigma_clip)
732 if len(mask) > 0:
733 twosigma = np.sign(output_signal).real * self._rms.real * self.sigma_clip * 2
734 output_signal[mask] *= -1
735 output_signal[mask] += twosigma[mask]
736 return output_signal, False
739class ContinuousTransientSignalGenerator(SignalGenerator):
740 """Signal generator that constantly receives data to later generate"""
742 def __init__(self, num_samples_per_frame, num_signals, signal, last_signal):
743 self.num_samples = num_samples_per_frame
744 self.num_signals = num_signals
745 self.signal = np.zeros((self.num_signals, 0)) if signal is None else signal
746 self.no_more_signal_incoming = last_signal
748 @property
749 def ready_for_next_output(self):
750 """Ready to output if there is enough data on the signal buffer to create a frame"""
751 return self.signal.shape[-1] >= self.num_samples or self.no_more_signal_incoming
753 def update_parameters(self, signal, last_signal):
754 """Updates the parameters of the transient signal generator
756 Parameters
757 ----------
758 signal : np.ndarray
759 New portions of signals to add to the output signal buffer
760 last_signal : bool
761 True if this is the last signal that will be given to the signal generator
762 """
763 self.signal = np.concatenate((self.signal, signal), axis=-1)
764 self.no_more_signal_incoming = last_signal
766 def generate_frame(self):
767 """Generates a frame of data and a flag letting the caller know the signal is done"""
768 output_signal = self.signal[..., : self.num_samples]
769 self.signal = self.signal[..., self.num_samples :]
770 if DEBUG:
771 num_files = len(glob(FILE_OUTPUT.format("*")))
772 np.savez(
773 FILE_OUTPUT.format(num_files),
774 output_signal=output_signal,
775 last_signal=(self.no_more_signal_incoming and self.signal.shape[-1] == 0),
776 )
777 return output_signal, (self.no_more_signal_incoming and self.signal.shape[-1] == 0)
780class TransientSignalGenerator(SignalGenerator):
781 """Signal generator to generate a specified signal"""
783 def __init__(self, signal, repeat):
784 self.signal = signal
785 self.repeat = repeat
787 @property
788 def ready_for_next_output(self):
789 """Ready for output if the signal is defined"""
790 return self.signal is not None
792 def update_parameters(self, signal, repeat):
793 """Updates with a new signal and a flag to repeat the signal or not"""
794 self.signal = signal
795 self.repeat = repeat
797 def generate_frame(self):
798 """Generates the signal. The done flag will be set if the signal is not repeating"""
799 return self.signal, not self.repeat