Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / sdynpy_system_virtual_hardware.py: 20%

148 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1""" 

2Synthetic "hardware" that allows the responses to be simulated by integrating 

3linear equations of motion. 

4 

5Rattlesnake Vibration Control Software 

6Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

7(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

8Government retains certain rights in this software. 

9 

10This program is free software: you can redistribute it and/or modify 

11it under the terms of the GNU General Public License as published by 

12the Free Software Foundation, either version 3 of the License, or 

13(at your option) any later version. 

14 

15This program is distributed in the hope that it will be useful, 

16but WITHOUT ANY WARRANTY; without even the implied warranty of 

17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18GNU General Public License for more details. 

19 

20You should have received a copy of the GNU General Public License 

21along with this program. If not, see <https://www.gnu.org/licenses/>. 

22""" 

23 

24import multiprocessing as mp 

25import time 

26from typing import List 

27 

28import numpy as np 

29import scipy.signal as signal 

30 

31from .abstract_hardware import HardwareAcquisition, HardwareOutput 

32from .utilities import Channel, DataAcquisitionParameters, flush_queue 

33 

34_direction_map = { 

35 "X+": 1, 

36 "X": 1, 

37 "+X": 1, 

38 "Y+": 2, 

39 "Y": 2, 

40 "+Y": 2, 

41 "Z+": 3, 

42 "Z": 3, 

43 "+Z": 3, 

44 "RX+": 4, 

45 "RX": 4, 

46 "+RX": 4, 

47 "RY+": 5, 

48 "RY": 5, 

49 "+RY": 5, 

50 "RZ+": 6, 

51 "RZ": 6, 

52 "+RZ": 6, 

53 "X-": -1, 

54 "-X": -1, 

55 "Y-": -2, 

56 "-Y": -2, 

57 "Z-": -3, 

58 "-Z": -3, 

59 "RX-": -4, 

60 "-RX": -4, 

61 "RY-": -5, 

62 "-RY": -5, 

63 "RZ-": -6, 

64 "-RZ": -6, 

65 "": 0, 

66 None: 0, 

67} 

68 

69DEBUG = False 

70 

71if DEBUG: 

72 from glob import glob 

73 

74 FILE_OUTPUT = "debug_data/sdynpy_hardware_{:}.npz" 

75 

76 

77class SDynPySystemAcquisition(HardwareAcquisition): 

78 """Class defining the interface between the controller and synthetic acquisition 

79 

80 This class defines the interfaces between the controller and the data 

81 acquisition portion of the hardware. In this case, the hardware is simulated 

82 by integrating state space matrices derived from a SDynPy system object. 

83 It is run by the acquisition process, and must define how to get data from 

84 the test hardware into the controller. 

85 """ 

86 

87 def __init__(self, system_file: str, queue: mp.queues.Queue, sleep: bool = True): 

88 """ 

89 Loads in the SDynPy system file and sets initial parameters to null 

90 values. 

91 

92 Parameters 

93 ---------- 

94 system_file : str 

95 Path to the file containing state space the SDynPy system object 

96 queue : mp.queues.Queue 

97 A queue that passes input data from the SDynPySystemOutput class to 

98 this class. Normally, this data transfer would occur through 

99 the physical test object: the exciters would excite the test object 

100 with the specified excitation and the Acquisition would record the 

101 responses to that excitation. In the synthetic case, we need to 

102 pass the output data to the acquisition which does the integration. 

103 sleep : bool 

104 If True, the integrator will wait the amount of time the calculation 

105 would have took if it were real life, which adds a realistic delay 

106 to simulate an actual measurement. If False, the integration will 

107 proceed as fast as possible. 

108 

109 Returns 

110 ------- 

111 None. 

112 

113 """ 

114 self.sdynpy_system_data = {key: val for key, val in np.load(system_file).items()} 

115 self.system = None 

116 self.times = None 

117 self.state = None 

118 self.frame_time = None 

119 self.queue = queue 

120 self.force_buffer = None 

121 self.integration_oversample = None 

122 self.response_channels: np.ndarray 

123 self.output_channels: np.ndarray 

124 self.response_channels = None 

125 self.output_channels = None 

126 self.acquisition_delay = None 

127 self.sleep = sleep 

128 # Create a dictionary of channels for faster lookup 

129 self.channel_indices = { 

130 tuple([abs(v) for v in val]): index 

131 for index, val in enumerate(self.sdynpy_system_data["coordinate"]) 

132 } 

133 

134 def set_up_data_acquisition_parameters_and_channels( 

135 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

136 ): 

137 """ 

138 Initialize the hardware and set up channels and sampling properties 

139 

140 The function must create channels on the hardware corresponding to 

141 the channels in the test. It must also set the sampling rates. 

142 

143 Parameters 

144 ---------- 

145 test_data : DataAcquisitionParameters : 

146 A container containing the data acquisition parameters for the 

147 controller set by the user. 

148 channel_data : List[Channel] : 

149 A list of ``Channel`` objects defining the channels in the test 

150 

151 Returns 

152 ------- 

153 None. 

154 

155 """ 

156 self.create_response_channels(channel_data) 

157 self.set_parameters(test_data) 

158 

159 def create_response_channels(self, channel_data: List[Channel]): 

160 """Method to set up response channels 

161 

162 This function takes channels from the supplied list of channels and 

163 extracts the mode shape coefficients corresponding to those channels. 

164 

165 Parameters 

166 ---------- 

167 channel_data : List[Channel] : 

168 A list of ``Channel`` objects defining the channels in the test 

169 

170 """ 

171 # pylint: disable=invalid-name 

172 # print('{:} Channels'.format(len(channel_data))) 

173 self.response_channels = np.array( 

174 [ 

175 channel.feedback_device is None or channel.feedback_device == "" 

176 for channel in channel_data 

177 ], 

178 dtype="bool", 

179 ) 

180 self.output_channels = ~self.response_channels 

181 # Need to add a signal buffer in case the write size is not equal to 

182 # the read size 

183 self.force_buffer = np.zeros((0, np.sum(~self.response_channels))) 

184 

185 # Figure out which channels go with which indices 

186 channel_indices = [] 

187 channel_signs = [] 

188 for channel in channel_data: 

189 node_number = int(channel.node_number) 

190 direction = _direction_map[channel.node_direction] 

191 channel_index = self.channel_indices[(node_number, abs(direction))] 

192 channel_indices.append(channel_index) 

193 channel_signs.append( 

194 np.sign(direction) 

195 * np.sign(self.sdynpy_system_data["coordinate"][channel_index]["direction"]) 

196 ) 

197 channel_indices = np.array(channel_indices) 

198 channel_signs = np.array(channel_signs) 

199 

200 # Now we need to actually go through and set up the A, B, C, D state matrices 

201 M = self.sdynpy_system_data["mass"] 

202 C = self.sdynpy_system_data["damping"] 

203 K = self.sdynpy_system_data["stiffness"] 

204 

205 # Now we need to pull out the transformation matrices 

206 phi = self.sdynpy_system_data["transformation"][channel_indices, :] 

207 # Multiply by the signs 

208 phi *= channel_signs[:, np.newaxis] 

209 

210 # Separate into responses and excitations; here input is into the system 

211 phi_excitation = phi[self.output_channels, :].copy() 

212 phi_response = phi[self.response_channels, :].copy() 

213 

214 # Set up some other parameters 

215 ndofs = M.shape[0] 

216 tdofs_response = phi_response.shape[0] 

217 tdofs_input = phi_excitation.shape[0] 

218 

219 # Assembly the full state matrices 

220 

221 # A = [[ 0, I], 

222 # [M^-1*K,M^-1*C]] 

223 

224 A_state = np.block( 

225 [ 

226 [np.zeros((ndofs, ndofs)), np.eye(ndofs)], 

227 [-np.linalg.solve(M, K), -np.linalg.solve(M, C)], 

228 ] 

229 ) 

230 

231 # B = [[ 0], 

232 # [ M^-1]] 

233 

234 B_state = np.block( 

235 [[np.zeros((ndofs, tdofs_input))], [np.linalg.solve(M, phi_excitation.T)]] 

236 ) 

237 

238 # C = [[ I, 0], # Displacements 

239 # [ 0, I], # Velocities 

240 # [M^-1*K,M^-1*C], # Accelerations 

241 # [ 0, 0]] # Forces 

242 

243 C_all = np.block( 

244 [ 

245 [phi_response, np.zeros((tdofs_response, ndofs))], 

246 [np.zeros((tdofs_response, ndofs)), phi_response], 

247 [ 

248 -phi_response @ np.linalg.solve(M, K), 

249 -phi_response @ np.linalg.solve(M, C), 

250 ], 

251 [np.zeros((tdofs_input, ndofs)), np.zeros((tdofs_input, ndofs))], 

252 ] 

253 ) 

254 

255 # D = [[ 0], # Displacements 

256 # [ 0], # Velocities 

257 # [ M^-1], # Accelerations 

258 # [ I]] # Forces 

259 

260 D_all = np.block( 

261 [ 

262 [np.zeros((tdofs_response, tdofs_input))], 

263 [np.zeros((tdofs_response, tdofs_input))], 

264 [phi_response @ np.linalg.solve(M, phi_excitation.T)], 

265 [np.eye(tdofs_input)], 

266 ] 

267 ) 

268 

269 # Split into different types 

270 displacement_indices = np.arange(tdofs_response) 

271 velocity_indices = np.arange(tdofs_response) + tdofs_response 

272 acceleration_indices = np.arange(tdofs_response) + 2 * tdofs_response 

273 force_indices = np.arange(tdofs_input) + 3 * tdofs_response 

274 

275 C_disp = C_all[displacement_indices] 

276 C_vel = C_all[velocity_indices] 

277 C_accel = C_all[acceleration_indices] 

278 C_force = C_all[force_indices] 

279 

280 D_disp = D_all[displacement_indices] 

281 D_vel = D_all[velocity_indices] 

282 D_accel = D_all[acceleration_indices] 

283 D_force = D_all[force_indices] 

284 

285 # Now assemble the full response C and D matrices based on the data type 

286 C_response = [] 

287 D_response = [] 

288 response_index = 0 

289 for i, channel in enumerate(channel_data): 

290 if self.output_channels[i]: 

291 continue 

292 if channel.channel_type.lower() in ["disp", "displacement"]: 

293 C_response.append(C_disp[response_index]) 

294 D_response.append(D_disp[response_index]) 

295 elif channel.channel_type.lower() in ["vel", "velocity"]: 

296 C_response.append(C_vel[response_index]) 

297 D_response.append(D_vel[response_index]) 

298 elif channel.channel_type.lower() in ["accel", "acceleration", "acc"]: 

299 C_response.append(C_accel[response_index]) 

300 D_response.append(D_accel[response_index]) 

301 else: 

302 print(f"Unknown Channel Type for Channel {i + 1}: {channel.channel_type}") 

303 C_response.append(C_disp[response_index]) 

304 D_response.append(D_disp[response_index]) 

305 response_index += 1 

306 C_response = np.array(C_response) 

307 D_response = np.array(D_response) 

308 

309 # Now assemble the final C and D matrices 

310 C_state = np.empty((len(channel_data), C_response.shape[-1])) 

311 C_state[self.response_channels, :] = C_response 

312 C_state[self.output_channels, :] = C_force 

313 D_state = np.empty((len(channel_data), D_response.shape[-1])) 

314 D_state[self.response_channels, :] = D_response 

315 D_state[self.output_channels, :] = D_force 

316 self.system = signal.StateSpace(A_state, B_state, C_state, D_state) 

317 self.state = np.zeros(A_state.shape[0]) 

318 # np.savez('SDynPy_State.npz', A=A_state, B=B_state, C = C_state, D = D_state) 

319 

320 def set_parameters(self, test_data: DataAcquisitionParameters): 

321 """Method to set up sampling rate and other test parameters 

322 

323 For the synthetic case, we will set up the integration parameters using 

324 the sample rates provided. 

325 

326 Parameters 

327 ---------- 

328 test_data : DataAcquisitionParameters : 

329 A container containing the data acquisition parameters for the 

330 controller set by the user. 

331 

332 """ 

333 self.integration_oversample = test_data.output_oversample 

334 # Need to get one more sample than you would think because lsim doesn't bridge the gap 

335 # between integrations 

336 self.times = np.arange(test_data.samples_per_read * self.integration_oversample + 1) / ( 

337 test_data.sample_rate * self.integration_oversample 

338 ) 

339 self.frame_time = test_data.samples_per_read / test_data.sample_rate 

340 self.acquisition_delay = test_data.samples_per_write / test_data.output_oversample 

341 

342 def start(self): 

343 """Method to start acquiring data. 

344 

345 For the synthetic case, it simply initializes the state of the system to zero""" 

346 self.state[:] = 0 

347 

348 def get_acquisition_delay(self) -> int: 

349 """ 

350 Get the number of samples between output and acquisition. 

351 

352 This function returns the number of samples that need to be read to 

353 ensure that the last output is read by the acquisition. If there is 

354 buffering in the output, this delay should be adjusted accordingly. 

355 

356 Returns 

357 ------- 

358 int 

359 Number of samples between when a dataset is written to the output 

360 and when it has finished playing. 

361 

362 """ 

363 return self.acquisition_delay 

364 

365 def read(self): 

366 """Method to read a frame of data from the hardware 

367 

368 This function gets the force from the output queue and adds it to the 

369 buffer of time signals that represents the force. It then integrates 

370 a frame of time and sends it to the acquisition. 

371 

372 Returns 

373 ------- 

374 read_data : 

375 2D Data read from the controller with shape ``n_channels`` x 

376 ``n_samples`` 

377 

378 """ 

379 start_time = time.time() 

380 while self.force_buffer.shape[0] < self.times.size: 

381 try: 

382 forces = self.queue.get(timeout=self.frame_time) 

383 except mp.queues.Empty: 

384 # If we don't get an output in time, this likely means output has stopped 

385 # so just put zeros. 

386 print("Warning! SDynPy integrator ran out of samples!") 

387 forces = np.zeros((self.force_buffer.shape[-1], self.times.size)) 

388 self.force_buffer = np.concatenate((self.force_buffer, forces.T), axis=0) 

389 

390 # Now extract a force that is the correct size 

391 this_force = self.force_buffer[: self.times.size] 

392 # And leave the rest for next time 

393 # Note we have to keep the last force sample still on the 

394 # buffer because it will be the next force sample we use 

395 self.force_buffer = self.force_buffer[self.times.size - 1 :] 

396 

397 _, sys_out, x_out = signal.lsim(self.system, this_force, self.times, self.state) 

398 

399 self.state[:] = x_out[-1] 

400 

401 if DEBUG: 

402 num_files = len(glob(FILE_OUTPUT.format("*"))) 

403 np.savez( 

404 FILE_OUTPUT.format(num_files), 

405 force_in=this_force.T, 

406 response_out_full_resolution=sys_out.T[..., : -1 : self.integration_oversample], 

407 response_out_downsampled=sys_out.T[..., :-1], 

408 ) 

409 

410 integration_time = time.time() - start_time 

411 remaining_time = self.frame_time - integration_time 

412 if remaining_time > 0.0 and self.sleep: 

413 time.sleep(remaining_time) 

414 

415 # We don't want to return the last sample because it 

416 # will be the initial state for the next sample 

417 return sys_out.T[..., : -1 : self.integration_oversample] 

418 

419 def read_remaining(self): 

420 """Method to read the rest of the data on the acquisition 

421 

422 This function simply returns one sample of zeros. 

423 

424 Returns 

425 ------- 

426 read_data : 

427 2D Data read from the controller with shape ``n_channels`` x 

428 ``n_samples`` 

429 """ 

430 return np.zeros((len(self.response_channels), 1)) 

431 

432 def stop(self): 

433 """Method to stop the acquisition. 

434 

435 This simply sets the state to zero.""" 

436 self.state[:] = 0 

437 

438 def close(self): 

439 """Method to close down the hardware""" 

440 

441 

442class SDynPySystemOutput(HardwareOutput): 

443 """Class defining the interface between the controller and synthetic output 

444 

445 Note that the only thing that this class does is pass data to the acquisition 

446 hardware task which actually performs the integration. Therefore, many of 

447 the functions here are actually empty.""" 

448 

449 def __init__(self, queue: mp.queues.Queue): 

450 """ 

451 Initializes the hardware by simply storing the data passing queue. 

452 

453 Parameters 

454 ---------- 

455 queue : mp.queues.Queue 

456 Queue used to pass data from output to acquisition for integration. 

457 See ``StateSpaceAcquisition.__init__`` 

458 

459 """ 

460 self.queue = queue 

461 

462 def set_up_data_output_parameters_and_channels( 

463 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

464 ): 

465 """ 

466 Initialize the hardware and set up sources and sampling properties 

467 

468 This does nothing for the synthetic hardware 

469 

470 Parameters 

471 ---------- 

472 test_data : DataAcquisitionParameters : 

473 A container containing the data acquisition parameters for the 

474 controller set by the user. 

475 channel_data : List[Channel] : 

476 A list of ``Channel`` objects defining the channels in the test 

477 

478 Returns 

479 ------- 

480 None. 

481 

482 """ 

483 

484 def start(self): 

485 """Method to start acquiring data 

486 

487 Does nothing for synthetic hardware.""" 

488 

489 def write(self, data: np.ndarray): 

490 """Method to write a frame of data 

491 

492 For the synthetic excitation, this simply puts the data into the data- 

493 passing queue. 

494 

495 Parameters 

496 ---------- 

497 data : np.ndarray 

498 Data to write to the output. 

499 

500 """ 

501 self.queue.put(data) 

502 

503 def stop(self): 

504 """Method to stop the acquisition 

505 

506 Does nothing for synthetic hardware.""" 

507 flush_queue(self.queue) 

508 

509 def close(self): 

510 """Method to close down the hardware 

511 

512 Does nothing for synthetic hardware.""" 

513 

514 def ready_for_new_output(self): 

515 """Signals that the hardware is ready for new output 

516 

517 Returns ``True`` if the data-passing queue is empty. 

518 """ 

519 return self.queue.empty()