Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / random_vibration_sys_id_data_analysis.py: 15%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3This file defines the data analysis and control law for the Random Vibration 

4Environment 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24import importlib 

25import multiprocessing as mp 

26import os 

27import time 

28from enum import Enum 

29 

30import numpy as np 

31 

32from .abstract_sysid_data_analysis import ( 

33 AbstractSysIDAnalysisProcess, 

34 SysIDDataAnalysisCommands, 

35) 

36from .random_vibration_sys_id_environment import ( 

37 RandomVibrationCommands, 

38 RandomVibrationMetadata, 

39) 

40from .utilities import ( 

41 GlobalCommands, 

42 VerboseMessageQueue, 

43 flush_queue, 

44 power2db, 

45 rms_csd, 

46 rms_time, 

47) 

48 

49 

50class RandomVibrationDataAnalysisCommands(Enum): 

51 """Enumeration containing valid commands for the random data analysis process""" 

52 

53 INITIALIZE_PARAMETERS = 0 

54 PERFORM_CONTROL_PREDICTION = 1 

55 RUN_CONTROL = 2 

56 STOP_CONTROL = 3 

57 # SHUTDOWN_ACHIEVED = 4 

58 # UPDATE_INTERACTIVE_CONTROL_PARAMETERS = 5 

59 

60 

61class RandomVibrationDataAnalysisProcess(AbstractSysIDAnalysisProcess): 

62 """Control calculations for the Random Vibration environment""" 

63 

64 def __init__( 

65 self, 

66 process_name: str, 

67 command_queue: VerboseMessageQueue, 

68 data_in_queue: mp.queues.Queue, 

69 data_out_queue: mp.queues.Queue, 

70 environment_command_queue: VerboseMessageQueue, 

71 log_file_queue: mp.queues.Queue, 

72 gui_update_queue: mp.queues.Queue, 

73 environment_name: str, 

74 ): 

75 super().__init__( 

76 process_name, 

77 command_queue, 

78 data_in_queue, 

79 data_out_queue, 

80 environment_command_queue, 

81 log_file_queue, 

82 gui_update_queue, 

83 environment_name, 

84 ) 

85 self.map_command( 

86 RandomVibrationDataAnalysisCommands.INITIALIZE_PARAMETERS, 

87 self.initialize_sysid_parameters, 

88 ) 

89 self.map_command( 

90 RandomVibrationDataAnalysisCommands.PERFORM_CONTROL_PREDICTION, 

91 self.perform_control_prediction, 

92 ) 

93 self.map_command(RandomVibrationDataAnalysisCommands.RUN_CONTROL, self.run_control) 

94 self.map_command(RandomVibrationDataAnalysisCommands.STOP_CONTROL, self.stop_control) 

95 self.map_command( 

96 GlobalCommands.UPDATE_INTERACTIVE_CONTROL_PARAMETERS, 

97 self.update_interactive_control_parameters, 

98 ) 

99 self.map_command(GlobalCommands.SEND_INTERACTIVE_COMMAND, self.send_interactive_command) 

100 self.error_indices = None 

101 self.control_function = None 

102 self.response_cpsd_prediction = None 

103 self.drive_cpsd_prediction = None 

104 self.control_frf = None 

105 self.control_coherence = None 

106 self.control_frf_condition = None 

107 self.last_response_cpsd = None 

108 self.last_drive_cpsd = None 

109 self.startup = True 

110 self.has_sent_interactive_control_transfer_function_results = False 

111 self.last_interactive_parameters = None 

112 

113 def initialize_sysid_parameters(self, data: RandomVibrationMetadata): 

114 self.parameters: RandomVibrationMetadata 

115 super().initialize_sysid_parameters(data) # This defines self.parameters 

116 

117 # Find the frequency lines to perform control and compute error over 

118 # print(type(data.specification_cpsd_matrix)) 

119 # print(data.specification_cpsd_matrix) 

120 self.error_indices = ~np.all( 

121 (data.specification_cpsd_matrix == 0) | np.isnan(data.specification_cpsd_matrix), 

122 axis=(-1, -2), 

123 ) 

124 self.frequencies = self.parameters.frequency_spacing * np.arange(self.parameters.fft_lines) 

125 # Load in the control script 

126 _, file = os.path.split(data.control_python_script) 

127 file, _ = os.path.splitext(file) 

128 spec = importlib.util.spec_from_file_location(file, data.control_python_script) 

129 module = importlib.util.module_from_spec(spec) 

130 spec.loader.exec_module(module) 

131 # Pull out the function from the loaded module 

132 if self.parameters.control_python_function_type == 1: # Generator 

133 # Get the generator function 

134 generator_function = getattr(module, data.control_python_function)() 

135 # Get us to the first yield statement 

136 next(generator_function) 

137 # Define the control function as the generator's send function 

138 self.control_function = generator_function.send 

139 elif self.parameters.control_python_function_type == 2: # Class 

140 control_frf = ( 

141 self.control_frf if self.parameters.update_tf_during_control else self.sysid_frf 

142 ) 

143 control_coherence = ( 

144 self.control_coherence 

145 if self.parameters.update_tf_during_control 

146 else self.sysid_coherence 

147 ) 

148 self.control_function = getattr(module, data.control_python_function)( 

149 data.specification_cpsd_matrix, # Specifications 

150 data.specification_warning_matrix, # Warning levels 

151 data.specification_abort_matrix, # Abort Levels 

152 data.control_python_function_parameters, # Extra parameters for the control law 

153 control_frf, # Transfer Functions 

154 self.sysid_response_noise, # Noise levels and correlation 

155 self.sysid_reference_noise, # from the system identification 

156 self.sysid_response_cpsd, # Response levels and correlation 

157 self.sysid_reference_cpsd, # from the system identification 

158 control_coherence, # Coherence from the system identification 

159 self.frames, # Number of frames in the CPSD 

160 self.parameters.frames_in_cpsd, # Total number of frames 

161 self.last_response_cpsd, # Last Control Response for Error Correction 

162 self.last_drive_cpsd, # Last Control Excitation for Drive-based control 

163 ) 

164 elif self.parameters.control_python_function_type == 3: # Interactive 

165 control_frf = ( 

166 self.control_frf if self.parameters.update_tf_during_control else self.sysid_frf 

167 ) 

168 control_coherence = ( 

169 self.control_coherence 

170 if self.parameters.update_tf_during_control 

171 else self.sysid_coherence 

172 ) 

173 control_class = getattr(module, data.control_python_function) 

174 self.control_function = control_class( 

175 self.environment_name, 

176 self.gui_update_queue, 

177 data.specification_cpsd_matrix, # Specifications 

178 data.specification_warning_matrix, # Warning levels 

179 data.specification_abort_matrix, # Abort Levels 

180 data.control_python_function_parameters, # Extra parameters for the control law 

181 control_frf, # Transfer Functions 

182 self.sysid_response_noise, # Noise levels and correlation 

183 self.sysid_reference_noise, # from the system identification 

184 self.sysid_response_cpsd, # Response levels and correlation 

185 self.sysid_reference_cpsd, # from the system identification 

186 control_coherence, # Coherence from the system identification 

187 self.frames, # Number of frames in the CPSD 

188 self.parameters.frames_in_cpsd, # Total number of frames 

189 self.last_response_cpsd, # Last Control Response for Error Correction 

190 self.last_drive_cpsd, # Last Control Excitation for Drive-based control 

191 ) 

192 self.last_interactive_parameters = None 

193 self.has_sent_interactive_control_transfer_function_results = False 

194 else: # Function 

195 self.control_function = getattr(module, data.control_python_function) 

196 

197 def perform_control_prediction(self, data): # pylint: disable=unused-argument 

198 """Runs the control law with system identification information to predict control""" 

199 if self.sysid_frf is None: 

200 return 

201 if self.parameters.control_python_function_type == 1: # Generator 

202 output_cpsd = self.control_function( 

203 ( 

204 self.parameters.specification_cpsd_matrix, # Specifications 

205 self.parameters.specification_warning_matrix, # Warning levels 

206 self.parameters.specification_abort_matrix, # Abort Levels 

207 self.sysid_frf, # Transfer Functions 

208 self.sysid_response_noise, # Noise levels and correlation 

209 self.sysid_reference_noise, # from the system identification 

210 self.sysid_response_cpsd, # Response levels and correlation 

211 self.sysid_reference_cpsd, # from the system identification 

212 self.sysid_coherence, # Coherence from the system identification 

213 self.parameters.sysid_averages, # Number of frames in the CPSD 

214 self.parameters.sysid_averages, # Total number of frames 

215 self.parameters.control_python_function_parameters, # Extra parameters 

216 None, # Last Control Response for Error Correction 

217 None, # Last Control Excitation for Drive-based control 

218 ) 

219 ) 

220 elif self.parameters.control_python_function_type in [ 

221 2, 

222 3, 

223 ]: # Class or Interactive 

224 if ( 

225 self.parameters.control_python_function == 2 

226 or not self.has_sent_interactive_control_transfer_function_results 

227 ): 

228 self.control_function.system_id_update( 

229 self.sysid_frf, # Transfer Functions 

230 self.sysid_response_noise, # Noise levels and correlation 

231 self.sysid_reference_noise, # from the system identification 

232 self.sysid_response_cpsd, # Response levels and correlation 

233 self.sysid_reference_cpsd, # from the system identification 

234 self.sysid_coherence, # Coherence from the system identification 

235 self.parameters.sysid_averages, # Number of frames in the CPSD 

236 self.parameters.sysid_averages, # Total number of frames 

237 ) 

238 if self.parameters.control_python_function_type == 3: 

239 self.gui_update_queue.put( 

240 ( 

241 self.environment_name, 

242 ( 

243 "interactive_control_sysid_update", 

244 ( 

245 self.sysid_frf, # Transfer Functions 

246 self.sysid_response_noise, # Noise levels and correlation 

247 self.sysid_reference_noise, # from the system identification 

248 self.sysid_response_cpsd, # Response levels and correlation 

249 self.sysid_reference_cpsd, # from the system identification 

250 self.sysid_coherence, # Coherence from the system id 

251 ), 

252 ), 

253 ) 

254 ) 

255 self.has_sent_interactive_control_transfer_function_results = True 

256 if ( 

257 self.parameters.control_python_function_type == 2 

258 or self.last_interactive_parameters is not None 

259 ): 

260 output_cpsd = self.control_function.control( 

261 self.sysid_frf, # Transfer Functions 

262 self.sysid_coherence, # Coherence from the system identification 

263 self.parameters.sysid_averages, # Number of frames in the CPSD 

264 self.parameters.sysid_averages, # Total number of frames 

265 None, 

266 None, 

267 ) 

268 else: 

269 self.log("Have not yet received control parameters from interactive control law!") 

270 self.command_queue.put( 

271 self.process_name, 

272 ( 

273 RandomVibrationDataAnalysisCommands.PERFORM_CONTROL_PREDICTION, 

274 None, 

275 ), 

276 ) 

277 time.sleep(0.25) 

278 return 

279 else: # Function 

280 output_cpsd = self.control_function( 

281 self.parameters.specification_cpsd_matrix, # Specifications 

282 self.parameters.specification_warning_matrix, # Warning levels 

283 self.parameters.specification_abort_matrix, # Abort Levels 

284 self.sysid_frf, # Transfer Functions 

285 self.sysid_response_noise, # Noise levels and correlation 

286 self.sysid_reference_noise, # from the system identification 

287 self.sysid_response_cpsd, # Response levels and correlation 

288 self.sysid_reference_cpsd, # from the system identification 

289 self.sysid_coherence, # Coherence from the system identification 

290 self.parameters.sysid_averages, # Number of frames in the CPSD 

291 self.parameters.sysid_averages, # Total number of frames 

292 self.parameters.control_python_function_parameters, # Extra parameters 

293 None, # Last Control Response for Error Correction 

294 None, # Last Control Excitation for Drive-based control 

295 ) 

296 response_cpsd = self.sysid_frf @ output_cpsd @ self.sysid_frf.conjugate().transpose(0, 2, 1) 

297 self.drive_cpsd_prediction = output_cpsd 

298 self.response_cpsd_prediction = response_cpsd 

299 rms_drives = rms_csd(self.drive_cpsd_prediction, self.parameters.frequency_spacing) 

300 response_db_error = power2db( 

301 np.einsum("ijj->ij", self.response_cpsd_prediction[self.error_indices]).real 

302 ) - power2db( 

303 np.einsum("ijj->ij", self.parameters.specification_cpsd_matrix[self.error_indices]).real 

304 ) 

305 rms_db_error = rms_time(response_db_error, axis=0) 

306 self.gui_update_queue.put( 

307 ( 

308 self.environment_name, 

309 ( 

310 "control_predictions", 

311 ( 

312 self.frequencies, 

313 self.drive_cpsd_prediction, 

314 self.response_cpsd_prediction, 

315 self.parameters.specification_cpsd_matrix, 

316 rms_drives, 

317 rms_db_error, 

318 ), 

319 ), 

320 ) 

321 ) 

322 if self.parameters.control_python_function_type == 3: 

323 self.has_sent_interactive_control_transfer_function_results = False 

324 

325 def run_control(self, data): # pylint: disable=unused-argument 

326 """Runs the control law to generate new output CPSDs""" 

327 if self.startup: 

328 self.log("Starting Control") 

329 self.frames = 0 

330 self.data_out_queue.put([self.drive_cpsd_prediction]) 

331 self.startup = False 

332 spectral_data = flush_queue(self.data_in_queue) 

333 if len(spectral_data) > 0: 

334 self.log("Obtained Spectral Data") 

335 ( 

336 self.frames, 

337 self.frequencies, 

338 self.control_frf, 

339 self.control_coherence, 

340 self.last_response_cpsd, 

341 self.last_drive_cpsd, 

342 self.control_frf_condition, 

343 ) = spectral_data[-1] 

344 self.gui_update_queue.put( 

345 ( 

346 self.environment_name, 

347 ( 

348 "control_update", 

349 ( 

350 self.frames, 

351 self.parameters.frames_in_cpsd, 

352 self.frequencies, 

353 self.control_frf, 

354 self.control_coherence, 

355 self.last_response_cpsd, 

356 self.last_drive_cpsd, 

357 self.control_frf_condition, 

358 ), 

359 ), 

360 ) 

361 ) 

362 # Check to see if there are any aborts or warnings 

363 warning_channels = [] 

364 abort_channels = [] 

365 with np.errstate(invalid="ignore"): 

366 lines_out = (self.parameters.percent_lines_out / 100) * self.parameters.fft_lines 

367 for i in range(self.last_response_cpsd.shape[-1]): 

368 if ( 

369 sum( 

370 self.last_response_cpsd[:, i, i] 

371 > self.parameters.specification_abort_matrix[1, :, i] 

372 ) 

373 > lines_out 

374 ): 

375 abort_channels.append(i) 

376 elif ( 

377 sum( 

378 self.last_response_cpsd[:, i, i] 

379 < self.parameters.specification_abort_matrix[0, :, i] 

380 ) 

381 > lines_out 

382 ): 

383 abort_channels.append(i) 

384 elif ( 

385 sum( 

386 self.last_response_cpsd[:, i, i] 

387 > self.parameters.specification_warning_matrix[1, :, i] 

388 ) 

389 > lines_out 

390 ): 

391 warning_channels.append(i) 

392 elif ( 

393 sum( 

394 self.last_response_cpsd[:, i, i] 

395 < self.parameters.specification_warning_matrix[0, :, i] 

396 ) 

397 > lines_out 

398 ): 

399 warning_channels.append(i) 

400 if ( 

401 len(abort_channels) > 0 

402 and self.frames == self.parameters.frames_in_cpsd 

403 and self.parameters.allow_automatic_aborts 

404 ): 

405 print(f"Aborting due to channel indices {abort_channels}") 

406 self.log(f"Aborting due to channel indices {abort_channels}") 

407 self.environment_command_queue.put( 

408 self.process_name, (RandomVibrationCommands.STOP_CONTROL, None) 

409 ) 

410 response_db_error = power2db( 

411 np.einsum("ijj->ij", self.last_response_cpsd[self.error_indices]).real 

412 ) - power2db( 

413 np.einsum( 

414 "ijj->ij", 

415 self.parameters.specification_cpsd_matrix[self.error_indices], 

416 ).real 

417 ) 

418 rms_db_error = rms_time(response_db_error, axis=0) 

419 self.gui_update_queue.put( 

420 ( 

421 self.environment_name, 

422 ( 

423 "update_test_response_error_list", 

424 (rms_db_error, warning_channels, abort_channels), 

425 ), 

426 ) 

427 ) 

428 self.log("Controlling") 

429 # Create the new control output 

430 control_frf = ( 

431 self.control_frf if self.parameters.update_tf_during_control else self.sysid_frf 

432 ) 

433 control_coherence = ( 

434 self.control_coherence 

435 if self.parameters.update_tf_during_control 

436 else self.sysid_coherence 

437 ) 

438 if self.parameters.control_python_function_type == 1: # Generator 

439 output_cpsd = self.control_function( 

440 ( 

441 self.parameters.specification_cpsd_matrix, # Specifications 

442 self.parameters.specification_warning_matrix, # Warning levels 

443 self.parameters.specification_abort_matrix, # Abort Levels 

444 control_frf, # Transfer Functions 

445 self.sysid_response_noise, # Noise levels and correlation 

446 self.sysid_reference_noise, # from the system identification 

447 self.sysid_response_cpsd, # Response levels and correlation 

448 self.sysid_reference_cpsd, # from the system identification 

449 control_coherence, # Coherence 

450 self.frames, # Number of frames in the CPSD 

451 self.parameters.frames_in_cpsd, # Total number of frames 

452 self.parameters.control_python_function_parameters, # Extra parameters 

453 self.last_response_cpsd, # Last Control Response for Error Correction 

454 self.last_drive_cpsd, # Last Control Excitation for Drive-based control 

455 ) 

456 ) 

457 elif self.parameters.control_python_function_type in [ 

458 2, 

459 3, 

460 ]: # Class or interactive class 

461 output_cpsd = self.control_function.control( 

462 control_frf, # Transfer Functions 

463 control_coherence, # Coherence 

464 self.frames, # Number of frames in the CPSD 

465 self.parameters.frames_in_cpsd, # Total number of frames 

466 self.last_response_cpsd, # Last Control Response for Error Correction 

467 self.last_drive_cpsd, # Last Control Excitation for Drive-based control 

468 ) 

469 else: # Function 

470 output_cpsd = self.control_function( 

471 self.parameters.specification_cpsd_matrix, # Specifications 

472 self.parameters.specification_warning_matrix, # Warning levels 

473 self.parameters.specification_abort_matrix, # Abort Levels 

474 control_frf, # Transfer Functions 

475 self.sysid_response_noise, # Noise levels and correlation 

476 self.sysid_reference_noise, # from the system identification 

477 self.sysid_response_cpsd, # Response levels and correlation 

478 self.sysid_reference_cpsd, # from the system identification 

479 control_coherence, # Coherence 

480 self.frames, # Number of frames in the CPSD 

481 self.parameters.frames_in_cpsd, # Total number of frames 

482 self.parameters.control_python_function_parameters, # Extra parameters 

483 self.last_response_cpsd, # Last Control Response for Error Correction 

484 self.last_drive_cpsd, # Last Control Excitation for Drive-based control 

485 ) 

486 self.log( 

487 f"RMS Outputs from Control \n " 

488 f"{rms_csd(output_cpsd, self.parameters.frequency_spacing)}" 

489 ) 

490 self.data_out_queue.put([output_cpsd]) 

491 self.log("Finished Controlling") 

492 rms_voltages = rms_csd(output_cpsd, self.parameters.frequency_spacing) 

493 self.gui_update_queue.put( 

494 (self.environment_name, ("test_output_voltage_list", rms_voltages)) 

495 ) 

496 self.command_queue.put( 

497 self.process_name, (RandomVibrationDataAnalysisCommands.RUN_CONTROL, None) 

498 ) 

499 

500 def update_interactive_control_parameters(self, interactive_control_parameters): 

501 """Updates parameters for the interactive control law""" 

502 if self.parameters.control_python_function_type == 3: # Interactive 

503 self.control_function.update_parameters(interactive_control_parameters) 

504 self.last_interactive_parameters = interactive_control_parameters 

505 else: 

506 raise ValueError( 

507 "Received an UPDATE_INTERACTIVE_CONTROL_PARAMETERS signal without an " 

508 "interactive control law. How did this happen?" 

509 ) 

510 

511 def send_interactive_command(self, command): 

512 """Sends a command to the interactive control law""" 

513 if self.parameters.control_python_function_type == 3: # Interactive 

514 self.control_function.send_command(command) 

515 else: 

516 raise ValueError( 

517 "Received an UPDATE_INTERACTIVE_CONTROL_PARAMETERS signal without an " 

518 "interactive control law. How did this happen?" 

519 ) 

520 

521 def stop_control(self, data): # pylint: disable=unused-argument 

522 """Stops the data acquisition process""" 

523 # Remove any run_transfer_function or run_control from the queue 

524 instructions = self.command_queue.flush(self.process_name) 

525 for instruction in instructions: 

526 if not instruction[0] in [RandomVibrationDataAnalysisCommands.RUN_CONTROL]: 

527 self.command_queue.put(self.process_name, instruction) 

528 flush_queue(self.data_out_queue) 

529 self.startup = True 

530 self.control_frf = None 

531 self.control_coherence = None 

532 self.control_frf_condition = None 

533 self.last_response_cpsd = None 

534 self.last_drive_cpsd = None 

535 self.environment_command_queue.put( 

536 self.process_name, (SysIDDataAnalysisCommands.SHUTDOWN_ACHIEVED, None) 

537 ) 

538 

539 

540def random_data_analysis_process( 

541 environment_name: str, 

542 command_queue: VerboseMessageQueue, 

543 data_in_queue: mp.queues.Queue, 

544 data_out_queue: mp.queues.Queue, 

545 environment_command_queue: VerboseMessageQueue, 

546 gui_update_queue: mp.queues.Queue, 

547 log_file_queue: mp.queues.Queue, 

548 process_name=None, 

549): 

550 """Process defining the random vibration control calculations and data analysis 

551 

552 Parameters 

553 ---------- 

554 environment_name : str 

555 The name of the random vibration environment 

556 command_queue : VerboseMessageQueue 

557 A message queue that will provide commands to this process 

558 data_in_queue : mp.queues.Queue 

559 A queue from which data will be received to use for control updates 

560 data_out_queue : mp.queues.Queue 

561 A queue into which the results of control calculations will be placed 

562 environment_command_queue : VerboseMessageQueue 

563 A message queue to send commands back to the main controller 

564 gui_update_queue : mp.queues.Queue 

565 A queue to send updates to the graphical user interface for updating interactive control 

566 laws 

567 log_file_queue : mp.queues.Queue 

568 A queue to send messages that will be logged in the log file 

569 process_name : str, optional 

570 A name for the process. If not specified, it will be 

571 """ 

572 data_analysis_instance = RandomVibrationDataAnalysisProcess( 

573 environment_name + " Data Analysis" if process_name is None else process_name, 

574 command_queue, 

575 data_in_queue, 

576 data_out_queue, 

577 environment_command_queue, 

578 log_file_queue, 

579 gui_update_queue, 

580 environment_name, 

581 ) 

582 

583 data_analysis_instance.run()