1"""
2Synthetic "hardware" that allows the responses to be simulated by integrating
3linear equations of motion.
4
5Rattlesnake Vibration Control Software
6Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
7(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
8Government retains certain rights in this software.
9
10This program is free software: you can redistribute it and/or modify
11it under the terms of the GNU General Public License as published by
12the Free Software Foundation, either version 3 of the License, or
13(at your option) any later version.
14
15This program is distributed in the hope that it will be useful,
16but WITHOUT ANY WARRANTY; without even the implied warranty of
17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18GNU General Public License for more details.
19
20You should have received a copy of the GNU General Public License
21along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""
23
24import multiprocessing as mp
25import time
26from typing import List
27
28import numpy as np
29
30from .abstract_hardware import HardwareAcquisition, HardwareOutput
31from .utilities import (
32 Channel,
33 DataAcquisitionParameters,
34 flush_queue,
35 reduce_array_by_coordinate,
36)
37
38try:
39 # cupy may not exist if correct modules aren't installed
40 import cupy as cp # type: ignore
41 from cupyx.scipy.signal import oaconvolve # type: ignore
42
43 xp = cp
44 CUDA = True
45except ModuleNotFoundError:
46 from scipy.signal import oaconvolve
47
48 xp = np
49 CUDA = False
50
51_direction_map = {
52 "X+": 1,
53 "X": 1,
54 "+X": 1,
55 "Y+": 2,
56 "Y": 2,
57 "+Y": 2,
58 "Z+": 3,
59 "Z": 3,
60 "+Z": 3,
61 "RX+": 4,
62 "RX": 4,
63 "+RX": 4,
64 "RY+": 5,
65 "RY": 5,
66 "+RY": 5,
67 "RZ+": 6,
68 "RZ": 6,
69 "+RZ": 6,
70 "X-": -1,
71 "-X": -1,
72 "Y-": -2,
73 "-Y": -2,
74 "Z-": -3,
75 "-Z": -3,
76 "RX-": -4,
77 "-RX": -4,
78 "RY-": -5,
79 "-RY": -5,
80 "RZ-": -6,
81 "-RZ": -6,
82 "": 0,
83 None: 0,
84}
85
86
87class SDynPyFRFAcquisition(HardwareAcquisition):
88 """Class defining the interface between the controller and synthetic acquisition
89
90 This class defines the interfaces between the controller and the data
91 acquisition portion of the hardware. In this case, the hardware is simulated
92 by convolving an IRF with each new frame of data, where the IRF is supplied from
93 either a SDynPy TransferFunctionArray or ImpulseResponseFunctionArray object.
94 It is run by the acquisition process, and must define how to get data from
95 the test hardware into the controller.
96 """
97
98 def __init__(self, frf_file: str, queue: mp.queues.Queue):
99 """
100 Loads in the SDynPy file and sets initial parameters to null
101 values.
102
103 Parameters
104 ----------
105 system_file : str
106 Path to the file containing state space the SDynPy object
107 queue : mp.queues.Queue
108 A queue that passes input data from the SDynPyFRFOutput class to
109 this class. Normally, this data transfer would occur through
110 the physical test object: the exciters would excite the test object
111 with the specified excitation and the Acquisition would record the
112 responses to that excitation. In the synthetic case, we need to
113 pass the output data to the acquisition which does the convolution.
114
115 Returns
116 -------
117 None.
118
119 """
120 self.sdynpy_data, self.function_type = np.load(frf_file).values()
121 if self.function_type.item() not in [4, 29]:
122 raise ValueError(
123 "File must be SDynPy TransferFunctionArray or ImpulseResponseFunctionArray"
124 )
125 self.system = None
126 self.times = None
127 self.sample_rate = None
128 self.samples_per_read = None
129 self.samples_per_write = None
130 self.frame_time = None
131 self.convolution_samples = None
132 self.queue = queue
133 self.force_buffer = None
134 self.output_signal_time = None
135 self.sys_out = None
136 self.integration_oversample = None
137 self.response_channels: np.ndarray
138 self.output_channels: np.ndarray
139 self.response_channels = None
140 self.output_channels = None
141 self.irf = None
142 self.acquisition_delay = None
143
144 def set_up_data_acquisition_parameters_and_channels(
145 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
146 ):
147 """
148 Initialize the hardware and set up channels and sampling properties
149
150 The function must create channels on the hardware corresponding to
151 the channels in the test. It must also set the sampling rates.
152
153 Parameters
154 ----------
155 test_data : DataAcquisitionParameters :
156 A container containing the data acquisition parameters for the
157 controller set by the user.
158 channel_data : List[Channel] :
159 A list of ``Channel`` objects defining the channels in the test
160
161 Returns
162 -------
163 None.
164
165 """
166 self.set_parameters(test_data)
167 self.create_response_channels(channel_data)
168
169 def create_response_channels(self, channel_data: List[Channel]):
170 """Method to set up response channels
171
172 This function takes channels from the supplied list of channels and
173 extracts the mode shape coefficients corresponding to those channels.
174
175 Parameters
176 ----------
177 channel_data : List[Channel] :
178 A list of ``Channel`` objects defining the channels in the test
179
180 """
181 self.response_channels = np.array(
182 [
183 channel.feedback_device is None or channel.feedback_device == ""
184 for channel in channel_data
185 ],
186 dtype="bool",
187 )
188 self.output_channels = ~self.response_channels
189 # Need to add a signal buffer in case the write size is not equal to the read size
190 self.force_buffer = np.zeros((0, np.sum(~self.response_channels)))
191
192 # Figure out which channels go with which indices
193 response_coord = []
194 excitation_coord = []
195 for channel in channel_data:
196 node_number = int(channel.node_number)
197 direction = _direction_map[channel.node_direction]
198 channel_coord = (node_number, direction)
199 if channel.feedback_device is None or channel.feedback_device == "":
200 response_coord.append(channel_coord)
201 else:
202 excitation_coord.append(channel_coord)
203 coord_dtype = np.dtype([("node", "<u8"), ("direction", "i1")])
204 response_coord = np.array(response_coord, dtype=coord_dtype)
205 excitation_coord = np.array(excitation_coord, dtype=coord_dtype)
206
207 # check for even abscissa spacing
208 spacing = np.diff(self.sdynpy_data["abscissa"], axis=-1)
209 mean_spacing = np.mean(spacing)
210 if not np.allclose(spacing, mean_spacing):
211 raise ValueError("SDynPy array does not have evenly spaced abscissa")
212 # index array by coordinate. `reduce_array_by_coordinate` expects frequency on first axis
213 array = reduce_array_by_coordinate(
214 np.moveaxis(self.sdynpy_data["ordinate"], -1, 0),
215 self.sdynpy_data["coordinate"],
216 response_coord,
217 excitation_coord,
218 )
219 # convert to irf if needed
220 if self.function_type == 4:
221 # compute irf and transpose so that shape becomes (nref, nresp, nsamples)
222 self.irf = np.fft.irfft(array, axis=0).T
223 num_samples = self.irf.shape[-1]
224 dt = 1 / (self.sdynpy_data["abscissa"].max() * num_samples / np.floor(num_samples / 2))
225 elif self.function_type == 29:
226 self.irf = array.T
227 dt = mean_spacing
228 else:
229 raise ValueError(
230 "SDynPy FRFs should have type TransferFunctionArray or ImpulseResponseFunctionArray"
231 )
232 if CUDA:
233 self.irf = cp.asarray(self.irf)
234
235 # Checking to see if the transfer function sampling rate matches the acquisition rate
236 if not np.isclose(self.sample_rate, 1 / dt):
237 raise ValueError(
238 f"The transfer function sampling rate {1 / dt} "
239 f"does not match the hardware sampling rate {self.sample_rate}."
240 )
241
242 # check that all channels from channel table will have a corresponding irf
243 _, number_responses, model_order = self.irf.shape
244 if number_responses != np.sum(self.response_channels):
245 raise ValueError(
246 f"Number of responses in FRF ({number_responses}) does not "
247 f"match channel table ({np.sum(self.response_channels)})"
248 )
249 # each frame of the convolution must include M - 1 samples of
250 # previous data to maintain causality (where M is length of impulse response)
251 self.convolution_samples = self.samples_per_read + model_order - 1
252 # initialize convolution and output arrays (read function will overwrite rather than
253 # re-allocate)
254 self.output_signal_time = xp.zeros(
255 (number_responses, self.convolution_samples), dtype=xp.float64
256 )
257 self.sys_out = np.zeros((len(channel_data), self.times.size), dtype=np.float64)
258
259 def set_parameters(self, test_data: DataAcquisitionParameters):
260 """Method to set up sampling rate and other test parameters
261
262 For the synthetic case, we will set up the integration parameters using
263 the sample rates provided.
264
265 Parameters
266 ----------
267 test_data : DataAcquisitionParameters :
268 A container containing the data acquisition parameters for the
269 controller set by the user.
270
271 """
272 self.integration_oversample = test_data.output_oversample
273 self.sample_rate = test_data.sample_rate
274 self.times = np.arange(test_data.samples_per_read) / (test_data.sample_rate)
275 self.frame_time = test_data.samples_per_read / test_data.sample_rate
276 self.acquisition_delay = test_data.samples_per_write
277 self.samples_per_read = test_data.samples_per_read
278 self.samples_per_write = test_data.samples_per_write
279
280 def start(self):
281 """Method to start acquiring data.
282
283 For the synthetic case, doesn't need to do anything"""
284
285 def get_acquisition_delay(self) -> int:
286 """
287 Get the number of samples between output and acquisition.
288
289 This function returns the number of samples that need to be read to
290 ensure that the last output is read by the acquisition. If there is
291 buffering in the output, this delay should be adjusted accordingly.
292
293 Returns
294 -------
295 int
296 Number of samples between when a dataset is written to the output
297 and when it has finished playing.
298
299 """
300 return self.acquisition_delay
301
302 def read(self):
303 """Method to read a frame of data from the hardware
304
305 This function gets the force from the output queue and adds it to the
306 buffer of time signals that represents the force. It then convolves
307 a frame of time and sends it to the acquisition.
308
309 For large datasets, computation time may exceed the acquisition
310 time in which this function is expected to return. This may result in
311 slower than real-time execution. GPU hardware acceleration is
312 available to increase computation speed if a CuPy installation is found.
313 (requires Nvidia CUDA toolkit and CUDA compatible GPU,
314 see https://docs.cupy.dev/en/stable/install.html)
315
316 Returns
317 -------
318 read_data :
319 2D Data read from the controller with shape ``n_channels`` x
320 ``n_samples``
321
322 """
323 start_time = time.time()
324 while self.force_buffer.shape[0] < self.convolution_samples:
325 try:
326 forces = self.queue.get(timeout=self.frame_time)
327 except mp.queues.Empty:
328 # If we don't get an output in time, this likely means output has stopped
329 # so just put zeros.
330 forces = np.zeros((self.force_buffer.shape[-1], self.times.size))
331 self.force_buffer = np.concatenate((self.force_buffer, forces.T), axis=0)
332
333 # Now extract a force that is the correct size (including past samples for convolution)
334 this_force = self.force_buffer[: self.convolution_samples].T
335 # And leave the rest for next time
336 self.force_buffer = self.force_buffer[self.times.size :]
337
338 if np.any(this_force):
339 if CUDA:
340 this_force = cp.asarray(this_force)
341 # reset the output signal array to zero
342 self.output_signal_time[:] = 0
343 # Setting up and doing the convolution (using GPU if possible)
344 # (see sdynpy.data.TimeHistoryArray.mimo_forward)
345 for reference_irfs, inputs in zip(self.irf, this_force):
346 self.output_signal_time += oaconvolve(reference_irfs, inputs[np.newaxis, :])[
347 :, : self.convolution_samples
348 ]
349
350 # assign latest frame of data to correct channels
351 # (transfer from GPU to CPU if necessary)
352 if CUDA:
353 self.sys_out[self.response_channels, :] = self.output_signal_time[
354 :, -self.times.size :
355 ].get()
356 self.sys_out[self.output_channels, :] = this_force[:, -self.times.size :].get()
357 else:
358 self.sys_out[self.response_channels, :] = self.output_signal_time[
359 :, -self.times.size :
360 ]
361 self.sys_out[self.output_channels, :] = this_force[:, -self.times.size :]
362 else:
363 self.sys_out[:] = 0
364
365 computation_time = time.time() - start_time
366 remaining_time = self.frame_time - computation_time
367 if remaining_time > 0.0:
368 time.sleep(remaining_time)
369
370 return self.sys_out
371
372 def read_remaining(self):
373 """Method to read the rest of the data on the acquisition
374
375 This function simply returns one sample of zeros.
376
377 Returns
378 -------
379 read_data :
380 2D Data read from the controller with shape ``n_channels`` x
381 ``n_samples``
382 """
383 return np.zeros((len(self.response_channels), 1))
384
385 def stop(self):
386 """Method to stop the acquisition."""
387
388 def close(self):
389 """Method to close down the hardware"""
390
391
392class SDynPyFRFOutput(HardwareOutput):
393 """Class defining the interface between the controller and synthetic output
394
395 Note that the only thing that this class does is pass data to the acquisition
396 hardware task which actually performs the integration. Therefore, many of
397 the functions here are actually empty."""
398
399 def __init__(self, queue: mp.queues.Queue):
400 """
401 Initializes the hardware by simply storing the data passing queue.
402
403 Parameters
404 ----------
405 queue : mp.queues.Queue
406 Queue used to pass data from output to acquisition for integration.
407 See ``StateSpaceAcquisition.__init__``
408
409 """
410 self.queue = queue
411
412 def set_up_data_output_parameters_and_channels(
413 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
414 ):
415 """
416 Initialize the hardware and set up sources and sampling properties
417
418 This does nothing for the synthetic hardware
419
420 Parameters
421 ----------
422 test_data : DataAcquisitionParameters :
423 A container containing the data acquisition parameters for the
424 controller set by the user.
425 channel_data : List[Channel] :
426 A list of ``Channel`` objects defining the channels in the test
427
428 Returns
429 -------
430 None.
431
432 """
433
434 def start(self):
435 """Method to start acquiring data
436
437 Does nothing for synthetic hardware."""
438
439 def write(self, data: np.ndarray):
440 """Method to write a frame of data
441
442 For the synthetic excitation, this simply puts the data into the data-
443 passing queue.
444
445 Parameters
446 ----------
447 data : np.ndarray
448 Data to write to the output.
449
450 """
451 self.queue.put(data)
452
453 def stop(self):
454 """Method to stop the acquisition
455
456 Does nothing for synthetic hardware."""
457 flush_queue(self.queue)
458
459 def close(self):
460 """Method to close down the hardware
461
462 Does nothing for synthetic hardware."""
463
464 def ready_for_new_output(self):
465 """Signals that the hardware is ready for new output
466
467 Returns ``True`` if the data-passing queue is empty.
468 """
469 return self.queue.empty()