Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / sdynpy_frf_virtual_hardware.py: 0%

135 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1""" 

2Synthetic "hardware" that allows the responses to be simulated by integrating 

3linear equations of motion. 

4 

5Rattlesnake Vibration Control Software 

6Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

7(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

8Government retains certain rights in this software. 

9 

10This program is free software: you can redistribute it and/or modify 

11it under the terms of the GNU General Public License as published by 

12the Free Software Foundation, either version 3 of the License, or 

13(at your option) any later version. 

14 

15This program is distributed in the hope that it will be useful, 

16but WITHOUT ANY WARRANTY; without even the implied warranty of 

17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18GNU General Public License for more details. 

19 

20You should have received a copy of the GNU General Public License 

21along with this program. If not, see <https://www.gnu.org/licenses/>. 

22""" 

23 

24import multiprocessing as mp 

25import time 

26from typing import List 

27 

28import numpy as np 

29 

30from .abstract_hardware import HardwareAcquisition, HardwareOutput 

31from .utilities import ( 

32 Channel, 

33 DataAcquisitionParameters, 

34 flush_queue, 

35 reduce_array_by_coordinate, 

36) 

37 

38try: 

39 # cupy may not exist if correct modules aren't installed 

40 import cupy as cp # type: ignore 

41 from cupyx.scipy.signal import oaconvolve # type: ignore 

42 

43 xp = cp 

44 CUDA = True 

45except ModuleNotFoundError: 

46 from scipy.signal import oaconvolve 

47 

48 xp = np 

49 CUDA = False 

50 

51_direction_map = { 

52 "X+": 1, 

53 "X": 1, 

54 "+X": 1, 

55 "Y+": 2, 

56 "Y": 2, 

57 "+Y": 2, 

58 "Z+": 3, 

59 "Z": 3, 

60 "+Z": 3, 

61 "RX+": 4, 

62 "RX": 4, 

63 "+RX": 4, 

64 "RY+": 5, 

65 "RY": 5, 

66 "+RY": 5, 

67 "RZ+": 6, 

68 "RZ": 6, 

69 "+RZ": 6, 

70 "X-": -1, 

71 "-X": -1, 

72 "Y-": -2, 

73 "-Y": -2, 

74 "Z-": -3, 

75 "-Z": -3, 

76 "RX-": -4, 

77 "-RX": -4, 

78 "RY-": -5, 

79 "-RY": -5, 

80 "RZ-": -6, 

81 "-RZ": -6, 

82 "": 0, 

83 None: 0, 

84} 

85 

86 

87class SDynPyFRFAcquisition(HardwareAcquisition): 

88 """Class defining the interface between the controller and synthetic acquisition 

89 

90 This class defines the interfaces between the controller and the data 

91 acquisition portion of the hardware. In this case, the hardware is simulated 

92 by convolving an IRF with each new frame of data, where the IRF is supplied from 

93 either a SDynPy TransferFunctionArray or ImpulseResponseFunctionArray object. 

94 It is run by the acquisition process, and must define how to get data from 

95 the test hardware into the controller. 

96 """ 

97 

98 def __init__(self, frf_file: str, queue: mp.queues.Queue): 

99 """ 

100 Loads in the SDynPy file and sets initial parameters to null 

101 values. 

102 

103 Parameters 

104 ---------- 

105 system_file : str 

106 Path to the file containing state space the SDynPy object 

107 queue : mp.queues.Queue 

108 A queue that passes input data from the SDynPyFRFOutput class to 

109 this class. Normally, this data transfer would occur through 

110 the physical test object: the exciters would excite the test object 

111 with the specified excitation and the Acquisition would record the 

112 responses to that excitation. In the synthetic case, we need to 

113 pass the output data to the acquisition which does the convolution. 

114 

115 Returns 

116 ------- 

117 None. 

118 

119 """ 

120 self.sdynpy_data, self.function_type = np.load(frf_file).values() 

121 if self.function_type.item() not in [4, 29]: 

122 raise ValueError( 

123 "File must be SDynPy TransferFunctionArray or ImpulseResponseFunctionArray" 

124 ) 

125 self.system = None 

126 self.times = None 

127 self.sample_rate = None 

128 self.samples_per_read = None 

129 self.samples_per_write = None 

130 self.frame_time = None 

131 self.convolution_samples = None 

132 self.queue = queue 

133 self.force_buffer = None 

134 self.output_signal_time = None 

135 self.sys_out = None 

136 self.integration_oversample = None 

137 self.response_channels: np.ndarray 

138 self.output_channels: np.ndarray 

139 self.response_channels = None 

140 self.output_channels = None 

141 self.irf = None 

142 self.acquisition_delay = None 

143 

144 def set_up_data_acquisition_parameters_and_channels( 

145 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

146 ): 

147 """ 

148 Initialize the hardware and set up channels and sampling properties 

149 

150 The function must create channels on the hardware corresponding to 

151 the channels in the test. It must also set the sampling rates. 

152 

153 Parameters 

154 ---------- 

155 test_data : DataAcquisitionParameters : 

156 A container containing the data acquisition parameters for the 

157 controller set by the user. 

158 channel_data : List[Channel] : 

159 A list of ``Channel`` objects defining the channels in the test 

160 

161 Returns 

162 ------- 

163 None. 

164 

165 """ 

166 self.set_parameters(test_data) 

167 self.create_response_channels(channel_data) 

168 

169 def create_response_channels(self, channel_data: List[Channel]): 

170 """Method to set up response channels 

171 

172 This function takes channels from the supplied list of channels and 

173 extracts the mode shape coefficients corresponding to those channels. 

174 

175 Parameters 

176 ---------- 

177 channel_data : List[Channel] : 

178 A list of ``Channel`` objects defining the channels in the test 

179 

180 """ 

181 self.response_channels = np.array( 

182 [ 

183 channel.feedback_device is None or channel.feedback_device == "" 

184 for channel in channel_data 

185 ], 

186 dtype="bool", 

187 ) 

188 self.output_channels = ~self.response_channels 

189 # Need to add a signal buffer in case the write size is not equal to the read size 

190 self.force_buffer = np.zeros((0, np.sum(~self.response_channels))) 

191 

192 # Figure out which channels go with which indices 

193 response_coord = [] 

194 excitation_coord = [] 

195 for channel in channel_data: 

196 node_number = int(channel.node_number) 

197 direction = _direction_map[channel.node_direction] 

198 channel_coord = (node_number, direction) 

199 if channel.feedback_device is None or channel.feedback_device == "": 

200 response_coord.append(channel_coord) 

201 else: 

202 excitation_coord.append(channel_coord) 

203 coord_dtype = np.dtype([("node", "<u8"), ("direction", "i1")]) 

204 response_coord = np.array(response_coord, dtype=coord_dtype) 

205 excitation_coord = np.array(excitation_coord, dtype=coord_dtype) 

206 

207 # check for even abscissa spacing 

208 spacing = np.diff(self.sdynpy_data["abscissa"], axis=-1) 

209 mean_spacing = np.mean(spacing) 

210 if not np.allclose(spacing, mean_spacing): 

211 raise ValueError("SDynPy array does not have evenly spaced abscissa") 

212 # index array by coordinate. `reduce_array_by_coordinate` expects frequency on first axis 

213 array = reduce_array_by_coordinate( 

214 np.moveaxis(self.sdynpy_data["ordinate"], -1, 0), 

215 self.sdynpy_data["coordinate"], 

216 response_coord, 

217 excitation_coord, 

218 ) 

219 # convert to irf if needed 

220 if self.function_type == 4: 

221 # compute irf and transpose so that shape becomes (nref, nresp, nsamples) 

222 self.irf = np.fft.irfft(array, axis=0).T 

223 num_samples = self.irf.shape[-1] 

224 dt = 1 / (self.sdynpy_data["abscissa"].max() * num_samples / np.floor(num_samples / 2)) 

225 elif self.function_type == 29: 

226 self.irf = array.T 

227 dt = mean_spacing 

228 else: 

229 raise ValueError( 

230 "SDynPy FRFs should have type TransferFunctionArray or ImpulseResponseFunctionArray" 

231 ) 

232 if CUDA: 

233 self.irf = cp.asarray(self.irf) 

234 

235 # Checking to see if the transfer function sampling rate matches the acquisition rate 

236 if not np.isclose(self.sample_rate, 1 / dt): 

237 raise ValueError( 

238 f"The transfer function sampling rate {1 / dt} " 

239 f"does not match the hardware sampling rate {self.sample_rate}." 

240 ) 

241 

242 # check that all channels from channel table will have a corresponding irf 

243 _, number_responses, model_order = self.irf.shape 

244 if number_responses != np.sum(self.response_channels): 

245 raise ValueError( 

246 f"Number of responses in FRF ({number_responses}) does not " 

247 f"match channel table ({np.sum(self.response_channels)})" 

248 ) 

249 # each frame of the convolution must include M - 1 samples of 

250 # previous data to maintain causality (where M is length of impulse response) 

251 self.convolution_samples = self.samples_per_read + model_order - 1 

252 # initialize convolution and output arrays (read function will overwrite rather than 

253 # re-allocate) 

254 self.output_signal_time = xp.zeros( 

255 (number_responses, self.convolution_samples), dtype=xp.float64 

256 ) 

257 self.sys_out = np.zeros((len(channel_data), self.times.size), dtype=np.float64) 

258 

259 def set_parameters(self, test_data: DataAcquisitionParameters): 

260 """Method to set up sampling rate and other test parameters 

261 

262 For the synthetic case, we will set up the integration parameters using 

263 the sample rates provided. 

264 

265 Parameters 

266 ---------- 

267 test_data : DataAcquisitionParameters : 

268 A container containing the data acquisition parameters for the 

269 controller set by the user. 

270 

271 """ 

272 self.integration_oversample = test_data.output_oversample 

273 self.sample_rate = test_data.sample_rate 

274 self.times = np.arange(test_data.samples_per_read) / (test_data.sample_rate) 

275 self.frame_time = test_data.samples_per_read / test_data.sample_rate 

276 self.acquisition_delay = test_data.samples_per_write 

277 self.samples_per_read = test_data.samples_per_read 

278 self.samples_per_write = test_data.samples_per_write 

279 

280 def start(self): 

281 """Method to start acquiring data. 

282 

283 For the synthetic case, doesn't need to do anything""" 

284 

285 def get_acquisition_delay(self) -> int: 

286 """ 

287 Get the number of samples between output and acquisition. 

288 

289 This function returns the number of samples that need to be read to 

290 ensure that the last output is read by the acquisition. If there is 

291 buffering in the output, this delay should be adjusted accordingly. 

292 

293 Returns 

294 ------- 

295 int 

296 Number of samples between when a dataset is written to the output 

297 and when it has finished playing. 

298 

299 """ 

300 return self.acquisition_delay 

301 

302 def read(self): 

303 """Method to read a frame of data from the hardware 

304 

305 This function gets the force from the output queue and adds it to the 

306 buffer of time signals that represents the force. It then convolves 

307 a frame of time and sends it to the acquisition. 

308 

309 For large datasets, computation time may exceed the acquisition 

310 time in which this function is expected to return. This may result in 

311 slower than real-time execution. GPU hardware acceleration is 

312 available to increase computation speed if a CuPy installation is found. 

313 (requires Nvidia CUDA toolkit and CUDA compatible GPU, 

314 see https://docs.cupy.dev/en/stable/install.html) 

315 

316 Returns 

317 ------- 

318 read_data : 

319 2D Data read from the controller with shape ``n_channels`` x 

320 ``n_samples`` 

321 

322 """ 

323 start_time = time.time() 

324 while self.force_buffer.shape[0] < self.convolution_samples: 

325 try: 

326 forces = self.queue.get(timeout=self.frame_time) 

327 except mp.queues.Empty: 

328 # If we don't get an output in time, this likely means output has stopped 

329 # so just put zeros. 

330 forces = np.zeros((self.force_buffer.shape[-1], self.times.size)) 

331 self.force_buffer = np.concatenate((self.force_buffer, forces.T), axis=0) 

332 

333 # Now extract a force that is the correct size (including past samples for convolution) 

334 this_force = self.force_buffer[: self.convolution_samples].T 

335 # And leave the rest for next time 

336 self.force_buffer = self.force_buffer[self.times.size :] 

337 

338 if np.any(this_force): 

339 if CUDA: 

340 this_force = cp.asarray(this_force) 

341 # reset the output signal array to zero 

342 self.output_signal_time[:] = 0 

343 # Setting up and doing the convolution (using GPU if possible) 

344 # (see sdynpy.data.TimeHistoryArray.mimo_forward) 

345 for reference_irfs, inputs in zip(self.irf, this_force): 

346 self.output_signal_time += oaconvolve(reference_irfs, inputs[np.newaxis, :])[ 

347 :, : self.convolution_samples 

348 ] 

349 

350 # assign latest frame of data to correct channels 

351 # (transfer from GPU to CPU if necessary) 

352 if CUDA: 

353 self.sys_out[self.response_channels, :] = self.output_signal_time[ 

354 :, -self.times.size : 

355 ].get() 

356 self.sys_out[self.output_channels, :] = this_force[:, -self.times.size :].get() 

357 else: 

358 self.sys_out[self.response_channels, :] = self.output_signal_time[ 

359 :, -self.times.size : 

360 ] 

361 self.sys_out[self.output_channels, :] = this_force[:, -self.times.size :] 

362 else: 

363 self.sys_out[:] = 0 

364 

365 computation_time = time.time() - start_time 

366 remaining_time = self.frame_time - computation_time 

367 if remaining_time > 0.0: 

368 time.sleep(remaining_time) 

369 

370 return self.sys_out 

371 

372 def read_remaining(self): 

373 """Method to read the rest of the data on the acquisition 

374 

375 This function simply returns one sample of zeros. 

376 

377 Returns 

378 ------- 

379 read_data : 

380 2D Data read from the controller with shape ``n_channels`` x 

381 ``n_samples`` 

382 """ 

383 return np.zeros((len(self.response_channels), 1)) 

384 

385 def stop(self): 

386 """Method to stop the acquisition.""" 

387 

388 def close(self): 

389 """Method to close down the hardware""" 

390 

391 

392class SDynPyFRFOutput(HardwareOutput): 

393 """Class defining the interface between the controller and synthetic output 

394 

395 Note that the only thing that this class does is pass data to the acquisition 

396 hardware task which actually performs the integration. Therefore, many of 

397 the functions here are actually empty.""" 

398 

399 def __init__(self, queue: mp.queues.Queue): 

400 """ 

401 Initializes the hardware by simply storing the data passing queue. 

402 

403 Parameters 

404 ---------- 

405 queue : mp.queues.Queue 

406 Queue used to pass data from output to acquisition for integration. 

407 See ``StateSpaceAcquisition.__init__`` 

408 

409 """ 

410 self.queue = queue 

411 

412 def set_up_data_output_parameters_and_channels( 

413 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

414 ): 

415 """ 

416 Initialize the hardware and set up sources and sampling properties 

417 

418 This does nothing for the synthetic hardware 

419 

420 Parameters 

421 ---------- 

422 test_data : DataAcquisitionParameters : 

423 A container containing the data acquisition parameters for the 

424 controller set by the user. 

425 channel_data : List[Channel] : 

426 A list of ``Channel`` objects defining the channels in the test 

427 

428 Returns 

429 ------- 

430 None. 

431 

432 """ 

433 

434 def start(self): 

435 """Method to start acquiring data 

436 

437 Does nothing for synthetic hardware.""" 

438 

439 def write(self, data: np.ndarray): 

440 """Method to write a frame of data 

441 

442 For the synthetic excitation, this simply puts the data into the data- 

443 passing queue. 

444 

445 Parameters 

446 ---------- 

447 data : np.ndarray 

448 Data to write to the output. 

449 

450 """ 

451 self.queue.put(data) 

452 

453 def stop(self): 

454 """Method to stop the acquisition 

455 

456 Does nothing for synthetic hardware.""" 

457 flush_queue(self.queue) 

458 

459 def close(self): 

460 """Method to close down the hardware 

461 

462 Does nothing for synthetic hardware.""" 

463 

464 def ready_for_new_output(self): 

465 """Signals that the hardware is ready for new output 

466 

467 Returns ``True`` if the data-passing queue is empty. 

468 """ 

469 return self.queue.empty()