Coverage for src / sdynpy / signal_processing / sdynpy_harmonic.py: 5%
321 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 16:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 16:22 +0000
1# -*- coding: utf-8 -*-
2"""
3Functions for dealing with sinusoidal data
4"""
5"""
6Copyright 2022 National Technology & Engineering Solutions of Sandia,
7LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
8Government retains certain rights in this software.
10This program is free software: you can redistribute it and/or modify
11it under the terms of the GNU General Public License as published by
12the Free Software Foundation, either version 3 of the License, or
13(at your option) any later version.
15This program is distributed in the hope that it will be useful,
16but WITHOUT ANY WARRANTY; without even the implied warranty of
17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18GNU General Public License for more details.
20You should have received a copy of the GNU General Public License
21along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""
24import numpy as np
25import numpy.linalg as la
26import matplotlib.pyplot as plt
27from scipy.signal import lfilter, lfiltic, butter, windows
28from scipy.optimize import minimize
29from scipy.sparse import linalg
30from scipy import sparse
31from .sdynpy_buffer import CircularBufferWithOverlap
34def harmonic_mag_phase(ts, xs, frequency, nharmonics=1, return_residual = False):
35 A = np.zeros((ts.size, nharmonics * 2 + 1))
36 for i in range(nharmonics):
37 A[:, i] = np.sin(2 * np.pi * frequency * (i + 1) * ts)
38 A[:, nharmonics + i] = np.cos(2 * np.pi * frequency * (i + 1) * ts)
39 A[:, -1] = np.ones(ts.size)
40 coefs = la.lstsq(A, xs[:, np.newaxis])[0]
41 As = np.array(coefs[:nharmonics, :])
42 Bs = np.array(coefs[nharmonics:nharmonics * 2, :])
43 phases = np.arctan2(Bs, As)[:, 0]
44 magnitudes = np.sqrt(As**2 + Bs**2)[:, 0]
45 if return_residual:
46 return magnitudes, phases, (A@coefs - xs[:,np.newaxis])
47 else:
48 return magnitudes, phases
50def harmonic_mag_phase_fit(ts,xs, frequency_guess, nharmonics = 1,**minimize_options):
52 def min_fun(frequency):
53 return np.mean(harmonic_mag_phase(ts, xs, frequency[0], nharmonics, True)[2]**2)
55 result = minimize(min_fun, [frequency_guess], **minimize_options)
57 frequency_fit = result.x[0]
58 mag, phs = harmonic_mag_phase(ts, xs, frequency_fit)
59 return frequency_fit, mag, phs
62def digital_tracking_filter(dt, xs, frequencies, arguments, cutoff_frequency_ratio = 0.15,
63 filter_order = 2, phase_estimate = None, amplitude_estimate = None,
64 block_size = None, plot_results = False,
65 ):
66 """
67 Computes amplitudes and phases using a digital tracking filter
69 Parameters
70 ----------
71 dt : float
72 The time step of the signal
73 xs : iterable
74 The signal that will have amplitude and phase extracted.
75 frequencies : iterable
76 The instantaneous frequency at each time step.
77 arguments : iterable
78 The instantaneous argument to the sine wave at each time step.
79 cutoff_frequency_ratio : float
80 The cutoff frequency of the low-pass filter compared to the lowest
81 frequency sine tone in each block. Default is 0.15.
82 filter_order : float
83 The filter order of the low-pass butterworth filter. Default is 2.
84 phase_estimate : float
85 An estimate of the initial phase to seed the low-pass filter.
86 amplitude_estimate : float
87 An estimate of the initial amplitude to seed the low-pass filter.
88 block_size : int
89 Number of samples to use for each block. If not specified, the entire
90 signal is treated as a single block.
91 plot_results : bool
92 If True, will plot the data at multiple steps for diagnostics
94 Returns
95 -------
96 amplitude : np.ndarray
97 The amplitude at each time step
98 phase : np.ndarray
99 The phase at each time step
100 """
101 if block_size is None:
102 block_size = xs.shape[-1]
103 if plot_results:
104 fig,ax = plt.subplots(2,2,sharex=True)
105 ax[0,0].set_ylabel('Signal and Amplitude')
106 ax[0,1].set_ylabel('Phase')
107 ax[1,0].set_ylabel('Filtered COLA Signal (cos)')
108 ax[1,1].set_ylabel('Filtered COLA Signal (sin)')
109 if phase_estimate is None:
110 phase_estimate = 0
111 if amplitude_estimate is None:
112 amplitude_estimate = 0
114 xi_0_filt = None
115 xi_90_filt = None
116 xi_0 = None
117 xi_90 = None
118 frame_index = 0
119 xs = np.array(xs)
120 frequencies = np.array(frequencies)
121 arguments = np.array(arguments)
122 phases = []
123 amplitudes = []
124 while frame_index*block_size < xs.shape[-1]:
125 idx = (Ellipsis,slice(frame_index*block_size,(frame_index+1)*block_size))
126 fi = frequencies[idx]
127 b,a = butter(filter_order, cutoff_frequency_ratio*np.min(fi),fs=1/dt)
128 if xi_0_filt is None:
129 # Set up some fake data to initialize the filter to a good value
130 past_ts = np.arange(-filter_order*2-1,0)*dt
131 past_xs = amplitude_estimate * np.cos(2*np.pi*fi[0]*past_ts + phase_estimate)
132 xi_0 = np.cos(2*np.pi*fi[0]*past_ts)*past_xs
133 xi_90 = -np.sin(2*np.pi*fi[0]*past_ts)*past_xs
134 xi_0_filt = 0.5*amplitude_estimate*np.cos(phase_estimate)*np.ones(xi_0.shape)
135 xi_90_filt = 0.5*amplitude_estimate*np.sin(phase_estimate)*np.ones(xi_90.shape)
136 if plot_results:
137 ax[1,0].plot(past_ts,xi_0,'r')
138 ax[1,0].plot(past_ts,xi_0_filt,'m')
139 ax[1,1].plot(past_ts,xi_90,'r')
140 ax[1,1].plot(past_ts,xi_90_filt,'m')
141 # Set up the filter initial states
142 z0i = lfiltic(b,a,xi_0_filt[::-1],xi_0[::-1])
143 z90i = lfiltic(b,a,xi_90_filt[::-1],xi_90[::-1])
144 # Extract this portion of the signal
145 xi = xs[idx]
146 ti = np.arange(frame_index*block_size,(frame_index+1)*block_size)*dt
147 ti = ti[...,:xi.shape[-1]]
148 argsi = arguments[idx]
149 # Now set up the tracking filter
150 cola0 = np.cos(argsi)
151 cola90 = -np.sin(argsi)
152 xi_0 = cola0*xi
153 xi_90 = cola90*xi
154 xi_0_filt,z0i = lfilter(b,a,xi_0,zi=z0i)
155 xi_90_filt,z90i = lfilter(b,a,xi_90,zi=z90i)
156 phases.append(np.arctan2(xi_90_filt,xi_0_filt))
157 amplitudes.append(2*np.sqrt(xi_0_filt**2 + xi_90_filt**2))
158 frame_index += 1
159 if plot_results:
160 ax[0,0].plot(ti,xi,'b')
161 ax[0,0].plot(ti,amplitudes[-1],'g')
162 ax[0,1].plot(ti,phases[-1],'g')
163 ax[1,0].plot(ti,xi_0,'b')
164 ax[1,0].plot(ti,xi_0_filt,'g')
165 ax[1,1].plot(ti,xi_90,'b')
166 ax[1,1].plot(ti,xi_90_filt,'g')
167 amplitude = np.concatenate(amplitudes,axis=-1)
168 phase = np.concatenate(phases,axis=-1)
169 if plot_results:
170 fig.tight_layout()
171 return amplitude,phase
173def digital_tracking_filter_generator(
174 dt, cutoff_frequency_ratio = 0.15,
175 filter_order = 2, phase_estimate = None, amplitude_estimate = None,
176 plot_results = False,
177 ):
178 """
179 Computes amplitudes and phases using a digital tracking filter
181 Parameters
182 ----------
183 dt : float
184 The time step of the signal
185 cutoff_frequency_ratio : float
186 The cutoff frequency of the low-pass filter compared to the lowest
187 frequency sine tone in each block. Default is 0.15.
188 filter_order : float
189 The filter order of the low-pass butterworth filter. Default is 2.
190 phase_estimate : float
191 An estimate of the initial phase to seed the low-pass filter.
192 amplitude_estimate : float
193 An estimate of the initial amplitude to seed the low-pass filter.
194 plot_results : bool
195 If True, will plot the data at multiple steps for diagnostics
197 Sends
198 ------
199 xi : iterable
200 The next block of the signal to be filtered
201 fi : iterable
202 The frequencies at the time steps in xi
203 argsi : iterable
204 The argument to a cosine function at the time steps in xi
206 Yields
207 ------
208 amplitude : np.ndarray
209 The amplitude at each time step
210 phase : np.ndarray
211 The phase at each time step
212 """
213 if plot_results:
214 fig,ax = plt.subplots(2,2,sharex=True)
215 ax[0,0].set_ylabel('Signal and Amplitude')
216 ax[0,1].set_ylabel('Phase')
217 ax[1,0].set_ylabel('Filtered COLA Signal (cos)')
218 ax[1,1].set_ylabel('Filtered COLA Signal (sin)')
219 sample_index = 0
220 fig.tight_layout()
221 if phase_estimate is None:
222 phase_estimate = 0
223 if amplitude_estimate is None:
224 amplitude_estimate = 0
226 xi_0_filt = None
227 xi_90_filt = None
228 xi_0 = None
229 xi_90 = None
230 amplitude = None
231 phase = None
232 while True:
233 xi, fi, argsi = yield amplitude,phase
234 xi = np.array(xi)
235 fi = np.array(fi)
236 argsi = np.array(argsi)
237 b,a = butter(filter_order, cutoff_frequency_ratio*np.min(fi),fs=1/dt)
238 if xi_0_filt is None:
239 # Set up some fake data to initialize the filter to a good value
240 past_ts = np.arange(-filter_order*2-1,0)*dt
241 past_xs = amplitude_estimate * np.cos(2*np.pi*fi[0]*past_ts + phase_estimate)
242 xi_0 = np.cos(2*np.pi*fi[0]*past_ts)*past_xs
243 xi_90 = -np.sin(2*np.pi*fi[0]*past_ts)*past_xs
244 xi_0_filt = 0.5*amplitude_estimate*np.cos(phase_estimate)*np.ones(xi_0.shape)
245 xi_90_filt = 0.5*amplitude_estimate*np.sin(phase_estimate)*np.ones(xi_90.shape)
246 if plot_results:
247 ax[1,0].plot(past_ts,xi_0,'r')
248 ax[1,0].plot(past_ts,xi_0_filt,'m')
249 ax[1,1].plot(past_ts,xi_90,'r')
250 ax[1,1].plot(past_ts,xi_90_filt,'m')
251 # Set up the filter initial states
252 z0i = lfiltic(b,a,xi_0_filt[::-1],xi_0[::-1])
253 z90i = lfiltic(b,a,xi_90_filt[::-1],xi_90[::-1])
254 # Now set up the tracking filter
255 cola0 = np.cos(argsi)
256 cola90 = -np.sin(argsi)
257 xi_0 = cola0*xi
258 xi_90 = cola90*xi
259 xi_0_filt,z0i = lfilter(b,a,xi_0,zi=z0i)
260 xi_90_filt,z90i = lfilter(b,a,xi_90,zi=z90i)
261 phase = np.arctan2(xi_90_filt,xi_0_filt)
262 amplitude = 2*np.sqrt(xi_0_filt**2 + xi_90_filt**2)
263 if plot_results:
264 ti = np.arange(sample_index,sample_index + xi.shape[-1])*dt
265 ax[0,0].plot(ti,xi,'b')
266 ax[0,0].plot(ti,amplitude,'g')
267 ax[0,1].plot(ti,phase,'g')
268 ax[1,0].plot(ti,xi_0,'b')
269 ax[1,0].plot(ti,xi_0_filt,'g')
270 ax[1,1].plot(ti,xi_90,'b')
271 ax[1,1].plot(ti,xi_90_filt,'g')
272 sample_index += xi.shape[-1]
274def vold_kalman_filter(sample_rate, signal, arguments, filter_order = None,
275 bandwidth = None, method = None, return_amp_phs = False,
276 return_envelope = False, return_r = False, verbose=False):
277 """
278 Extract sinusoidal components from a signal using the second generation
279 Vold-Kalman filter.
281 Parameters
282 ----------
283 sample_rate : float
284 The sample rate of the signal in Hz.
285 signal : ndarray
286 A 1D signal containing sinusoidal components that need to be extracted
287 arguments : ndarray
288 A 2D array consisting of the arguments to the sinusoidal components of
289 the form exp(1j*argument). This is the integral over time of the
290 angular frequency, which can be approximated as
291 2*np.pi*scipy.integrate.cumulative_trapezoid(frequencies,timesteps,initial=0)
292 if frequencies is the frequency at each time step in Hz timesteps is
293 the vector of time steps in seconds. This is a 2D array where the
294 number of rows is the
295 number of different sinusoidal components that are desired to be
296 extracted, and the number of columns are the number of time steps in
297 the `signal` argument.
298 filter_order : int, optional
299 The order of the VK filter, which should be 1, 2, or 3. The default is
300 2. The low-pass filter roll-off is approximately -40 dB per times the
301 filter order.
302 bandwidth : ndarray, optional
303 The prescribed bandwidth of the filter. This is related to the filter
304 selectivity parameter `r` in the literature. This will be broadcast to
305 the same shape as the `arguments` argument. The default is the sample
306 rate divided by 1000.
307 method : str, optional
308 Can be set to either 'single' or 'multi'. In a 'single' solution, each
309 sinusoidal component will be solved independently without any coupling.
310 This can be more efficient, but will result in errors if the
311 frequencies of the sine waves cross. The 'multi' solution will solve
312 for all sinusoidal components simultaneously, resulting in a better
313 estimate of crossing frequencies. The default is 'multi'.
314 return_amp_phs : bool
315 Returns the amplitude and phase of the reconstructed signals at each
316 time step. Default is False
317 return_envelope : bool
318 Returns the complex envelope and phasors at each time step. Default is
319 False
320 return_r : bool
321 Returns the computed selectivity parameters for the filter. Default is
322 False
323 verbose : bool
324 Prints progress throughout the calculation if True, default is False.
326 Raises
327 ------
328 ValueError
329 If arguments are not the correct size or values.
331 Returns
332 -------
333 reconstructed_signals : ndarray
334 Returns a time history the same size as `signal` for each of the
335 sinusoidal components solved for.
336 reconstructed_amplitudes : ndarray
337 Returns the amplitude over time for each of the sinusoidal components
338 solved for. Only returned if return_amp_phs is True.
339 reconstructed_phases : ndarray
340 Returns the phase over time for each of the sinusoidal components
341 solved for. Only returned if return_amp_phs is True.
342 reconstructed_envelope : ndarray
343 Returns the complex envelope `x` over time for each of the sinusoidal
344 components solved for. Only returned if return_envelope is True.
345 reconstructed_phasor : ndarray
346 Returns the phasor `c` over time for each of the sinusoidal components
347 solved for. Only returned if return_envelope is True.
348 r : ndarray
349 Returns the selectivity `r` over time for each of the sinusoidal
350 components solved for. Only returned if return_r is True.
352 """
353 if verbose:
354 print('Parsing Arguments')
355 if filter_order is None:
356 filter_order = 2
358 if bandwidth is None:
359 bandwidth = sample_rate/1000
361 if verbose:
362 print('Ensuring Correct Array Sizes')
363 # Make sure input data are numpy arrays
364 signal = np.array(signal)
365 arguments = np.atleast_2d(arguments)
366 bandwidth = np.atleast_2d(bandwidth)
367 bandwidth = np.broadcast_to(bandwidth,arguments.shape)
368 relative_bandwidth = bandwidth/sample_rate
370 # Extract some sizes to make sure everything is correctly sized
371 n_samples = signal.shape[-1]
372 # n_orders_freq, n_freq = frequencies.shape
373 # if n_freq != n_samples:
374 # raise ValueError('Frequency array must have identical number of columns as samples in signal')
375 n_orders_arg, n_arg = arguments.shape
376 if n_arg != n_samples:
377 raise ValueError('Argument array must have identical number of columns as samples in signal')
379 if method is None:
380 if n_orders_arg > 1:
381 method = 'multi'
382 else:
383 method = 'single'
384 if method.lower() not in ['multi','single']:
385 raise ValueError('`method` must be either "multi" or "single"')
387 if verbose:
388 print('Constructing Phasors')
389 # Construct phasors to multiply the signals by
390 phasor = np.exp(1j*arguments)
392 if verbose:
393 print('Constructing Solution Matrices')
394 # Construct the matrices for the least squares solution
395 if filter_order == 1:
396 coefs = np.array([1,-1])
397 r = np.sqrt((np.sqrt(2)-1)/
398 (2*(1-np.cos(np.pi*relative_bandwidth))))
399 elif filter_order == 2:
400 coefs = np.array([1,-2,1])
401 r = np.sqrt((np.sqrt(2)-1)/
402 (6-8*np.cos(np.pi*relative_bandwidth)+2*np.cos(2*np.pi*relative_bandwidth)))
403 elif filter_order == 3:
404 coefs = np.array([1,-3,3,-1])
405 r = np.sqrt((np.sqrt(2)-1)/
406 (20-30*np.cos(np.pi*relative_bandwidth)+12*np.cos(2*np.pi*relative_bandwidth)-2*np.cos(3*np.pi*relative_bandwidth)))
407 else:
408 raise ValueError('filter order must be 1, 2, or 3')
410 # Construct the solution matrices
411 A = sparse.spdiags(np.tile(coefs,(n_samples,1)).T,
412 np.arange(filter_order+1),
413 n_samples-filter_order,
414 n_samples)
415 B = []
416 for rvec in r:
417 R = sparse.spdiags(rvec,0,n_samples,n_samples)
418 AR = A@R
419 B.append((AR).T@(AR) + sparse.eye(n_samples))
421 if method.lower() == 'multi':
422 if verbose:
423 print('Setting up "multi"')
424 # This solves the multiple order approach, constructing a big matrix of
425 # Bs on the diagonal and CHCs on the off-diagonals. We can set up the
426 # matrix as diagonals and upper diagonals then add the transpose to get the
427 # lower diagonals
428 B_multi_diagonal = sparse.block_diag(B)
429 # There will be number of orders**2 B matrices, and number of orders
430 # diagonals, so there will be n_orders**2-n_orders off diagonals, half on
431 # on the upper triangle. We need to fill in all of these values for all
432 # time steps.
433 num_off_diags = (n_orders_arg**2-n_orders_arg)//2
434 row_indices = np.zeros((n_samples,num_off_diags),dtype=int)
435 col_indices = np.zeros((n_samples,num_off_diags),dtype=int)
436 CHC = np.zeros((n_samples,num_off_diags),dtype='c16')
437 # Keep track of the off-diagonal index so we know which column to put the
438 # data in
439 off_diagonal_index = 0
440 # Now we need to step through the off-diagonal blocks and create the arrays
441 for row_index in range(n_orders_arg):
442 # Since we need to stay on the upper triangle, column indices will start
443 # after the diagonal entry
444 for col_index in range(row_index+1,n_orders_arg):
445 row_indices[:,off_diagonal_index] = np.arange(row_index*n_samples,(row_index+1)*n_samples)
446 col_indices[:,off_diagonal_index] = np.arange(col_index*n_samples,(col_index+1)*n_samples)
447 CHC[:,off_diagonal_index] = phasor[row_index].conj()*phasor[col_index]
448 off_diagonal_index += 1
449 # We set up the variables as multidimensional so we could store them easier,
450 # but now we need to flatten them to put them into the sparse matrix.
451 # We choose CSR because we can do math with it easier
452 B_multi_utri = sparse.csr_matrix((CHC.flatten(),(row_indices.flatten(),col_indices.flatten())),shape=B_multi_diagonal.shape)
454 # Now we can assemble the entire matrix by adding with the complex conjugate
455 # of the upper triangle to get the lower triangle
456 B_multi = B_multi_diagonal + B_multi_utri + B_multi_utri.getH()
458 # We also need to construct the right hand side of the equation. This
459 # should be a multiplication of the phasor^H with the signal
460 RHS = phasor.flatten().conj()*np.tile(signal,n_orders_arg)
461 if verbose:
462 print('Solving "multi"')
463 x_multi = linalg.spsolve(B_multi,RHS[:,np.newaxis])
464 x = 2*x_multi.reshape(n_orders_arg,-1) # Multiply by 2 to account for missing negative frequency components
465 if verbose:
466 print('Done!')
467 else:
468 if verbose:
469 print('Setting up "single"')
470 # This solves the single order approach. If the user has put in multiple
471 # orders, it will solve them all independently instead of combining them
472 # into a single larger solve.
473 x = np.zeros((n_orders_arg,n_samples),dtype=np.complex128)
474 for i,(phasor_i, B_i) in enumerate(zip(phasor,B)):
475 # We already have the left side of the equation B, now we just need the
476 # right side of the equation, which is the phasor hermetian
477 # times the signal elementwise-multiplied
478 RHS = phasor_i.conj()*signal
479 if verbose:
480 print(f'Solving "single" {i+1}')
481 x[i] = 2*linalg.spsolve(B_i,RHS)
482 if verbose:
483 print("Done!")
485 if verbose:
486 print('Constructing Return Values')
487 return_value = [np.real(x*phasor)]
488 if return_amp_phs:
489 return_value.extend([np.abs(x),np.angle(x)])
490 if return_envelope:
491 return_value.extend([x,phasor])
492 if return_r:
493 return_value.extend([r])
494 if len(return_value) == 1:
495 return return_value[0]
496 else:
497 return return_value
499def vold_kalman_filter_generator(sample_rate, num_orders, block_size, overlap,
500 filter_order = None,
501 bandwidth = None, method = None, plot_results = False,
502 verbose=False):
503 """
504 Extracts sinusoidal information using a Vold-Kalman Filter
506 This uses an windowed-overlap-and-add process to solve for the signal while
507 removing start and end effects of the filter. Each time the generator is
508 called, it will yield a further section of the results up until the overlap
509 section.
511 Parameters
512 ----------
513 sample_rate : float
514 The sample rate of the signal in Hz.
515 num_orders : int
516 The number of orders that will be found in the signal
517 block_size : int
518 The size of the blocks used in the analysis.
519 overlap : float, optional
520 Fraction of the block size to overlap when computing the results. If
521 not specified, it will default to 0.15.
522 filter_order : int, optional
523 The order of the VK filter, which should be 1, 2, or 3. The default is
524 2. The low-pass filter roll-off is approximately -40 dB per times the
525 filter order.
526 bandwidth : ndarray, optional
527 The prescribed bandwidth of the filter. This is related to the filter
528 selectivity parameter `r` in the literature. This will be broadcast to
529 the same shape as the `arguments` argument. The default is the sample
530 rate divided by 1000.
531 method : str, optional
532 Can be set to either 'single' or 'multi'. In a 'single' solution, each
533 sinusoidal component will be solved independently without any coupling.
534 This can be more efficient, but will result in errors if the
535 frequencies of the sine waves cross. The 'multi' solution will solve
536 for all sinusoidal components simultaneously, resulting in a better
537 estimate of crossing frequencies. The default is 'multi'.
538 plot_results : bool
539 If True, will plot the data at multiple steps for diagnostics
541 Raises
542 ------
543 ValueError
544 If arguments are not the correct size or values.
545 ValueError
546 If data is provided subsequently to specifying last_signal = True
548 Sends
549 -----
550 xi : iterable
551 The next block of the signal to be filtered. This should be a 1D
552 signal containing sinusoidal components that need to be extracted.
553 argsi : iterable
554 A 2D array consisting of the arguments to the sinusoidal components of
555 the form exp(1j*argsi). This is the integral over time of the
556 angular frequency, which can be approximated as
557 2*np.pi*scipy.integrate.cumulative_trapezoid(frequencies,timesteps,initial=0)
558 if frequencies is the frequency at each time step in Hz timesteps is
559 the vector of time steps in seconds. This is a 2D array where the
560 number of rows is the
561 number of different sinusoidal components that are desired to be
562 extracted, and the number of columns are the number of time steps in
563 the `signal` argument.
564 last_signal : bool
565 If True, the remainder of the data will be returned and the
566 overlap-and-add process will be finished.
568 Yields
569 -------
570 reconstructed_signals : ndarray
571 Returns a time history the same size as `signal` for each of the
572 sinusoidal components solved for.
573 reconstructed_amplitudes : ndarray
574 Returns the amplitude over time for each of the sinusoidal components
575 solved for. Only returned if return_amp_phs is True.
576 reconstructed_phases : ndarray
577 Returns the phase over time for each of the sinusoidal components
578 solved for. Only returned if return_amp_phs is True.
580 """
581 if plot_results:
582 fig,ax = plt.subplots(num_orders+1,3)
583 signal_index = 0
584 analysis_index = 0
585 if verbose:
586 print('Startup')
587 previous_envelope = None
588 reconstructed_signals = None
589 reconstructed_amplitudes = None
590 reconstructed_phases = None
591 overlap_samples = int(overlap*block_size)
592 window = windows.hann(overlap_samples*2,False)
593 start_window = window[:overlap_samples]
594 end_window = window[overlap_samples:]
595 buffer = CircularBufferWithOverlap(3*block_size, block_size, overlap_samples, data_shape=(num_orders+1,))
596 first_output = True
597 last_signal = False
598 while True:
599 if verbose:
600 print('Before Yield')
601 xi,argsi,check_last_signal = yield reconstructed_signals,reconstructed_amplitudes, reconstructed_phases
602 if last_signal and check_last_signal:
603 raise ValueError('Generator has been exhausted')
604 last_signal = check_last_signal
605 argsi = np.atleast_2d(argsi)
606 if plot_results:
607 timesteps_signal = np.arange(signal_index, signal_index + xi.shape[-1])/sample_rate
608 ax[0,0].plot(timesteps_signal,xi,'k')
609 signal_index += xi.shape[-1]
610 if verbose:
611 print('After Yield')
612 buffer_data = np.concatenate([xi[np.newaxis],argsi])
613 buffer_output = buffer.write_get_data(buffer_data, last_signal)
614 if buffer_output is not None:
615 if first_output:
616 buffer_output = buffer_output[...,overlap_samples:]
617 first_output = False
618 signal = buffer_output[0]
619 arguments = buffer_output[1:]
620 else:
621 signal = buffer_output[0]
622 arguments = buffer_output[1:]
623 signal[:overlap_samples] = signal[:overlap_samples]*start_window
624 if not last_signal:
625 signal[-overlap_samples:] = signal[-overlap_samples:]*end_window
626 if plot_results:
627 timesteps_analysis = np.arange(analysis_index, analysis_index + signal.shape[-1])/sample_rate
628 ax[0,1].plot(timesteps_analysis,signal)
629 analysis_index += signal.shape[-1] - overlap_samples
630 # Do the VK Filtering
631 vk_signal, vk_envelope, vk_phasor = vold_kalman_filter(
632 sample_rate, signal, arguments, filter_order,
633 bandwidth, method, return_envelope=True, verbose = verbose)
634 if plot_results:
635 for vks,vke,a in zip(vk_signal,vk_envelope,ax[1:]):
636 a[0].plot(timesteps_analysis,vks.copy())
637 a[1].plot(timesteps_analysis,np.abs(vke))
638 a[2].plot(timesteps_analysis,np.angle(vke))
639 # If necessary, do the overlap
640 if previous_envelope is not None:
641 vk_envelope[...,:overlap_samples] = vk_envelope[...,:overlap_samples] + previous_envelope[...,-overlap_samples:]
642 if not last_signal:
643 reconstructed_signals = np.real(vk_envelope[...,:-overlap_samples]*vk_phasor[...,:-overlap_samples])
644 reconstructed_amplitudes = np.abs(vk_envelope[...,:-overlap_samples])
645 reconstructed_phases = np.angle(vk_envelope[...,:-overlap_samples])
646 if plot_results:
647 for vks,vka,vkp,a in zip(reconstructed_signals,reconstructed_amplitudes,reconstructed_phases,ax[1:]):
648 a[0].plot(timesteps_analysis[:-overlap_samples],vks,'k')
649 a[1].plot(timesteps_analysis[:-overlap_samples],vka,'k')
650 a[2].plot(timesteps_analysis[:-overlap_samples],vkp,'k')
651 else:
652 reconstructed_signals = np.real(vk_envelope*vk_phasor)
653 reconstructed_amplitudes = np.abs(vk_envelope)
654 reconstructed_phases = np.angle(vk_envelope)
655 if plot_results:
656 for vks,vka,vkp,a in zip(reconstructed_signals,reconstructed_amplitudes,reconstructed_phases,ax[1:]):
657 a[0].plot(timesteps_analysis,vks,'k')
658 a[1].plot(timesteps_analysis,vka,'k')
659 a[2].plot(timesteps_analysis,vkp,'k')
660 previous_envelope = vk_envelope
661 else:
662 reconstructed_signals = reconstructed_amplitudes = reconstructed_phases = None