Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / signal_generation_process.py: 88%

164 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Rattlesnake Vibration Control Software 

4Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

5(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

6Government retains certain rights in this software. 

7 

8This program is free software: you can redistribute it and/or modify 

9it under the terms of the GNU General Public License as published by 

10the Free Software Foundation, either version 3 of the License, or 

11(at your option) any later version. 

12 

13This program is distributed in the hope that it will be useful, 

14but WITHOUT ANY WARRANTY; without even the implied warranty of 

15MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16GNU General Public License for more details. 

17 

18You should have received a copy of the GNU General Public License 

19along with this program. If not, see <https://www.gnu.org/licenses/>. 

20""" 

21 

22import copy 

23import multiprocessing as mp 

24from enum import Enum 

25 

26import numpy as np 

27 

28from .abstract_message_process import AbstractMessageProcess 

29from .signal_generation import SignalGenerator 

30from .utilities import VerboseMessageQueue, flush_queue, rms_time 

31 

32TEST_LEVEL_THRESHOLD = 1.01 

33 

34DEBUG = False 

35if DEBUG: 

36 from glob import glob 

37 

38 FILE_OUTPUT = "debug_data/signal_generation_{:}.npz" 

39 

40 

41class SignalGenerationCommands(Enum): 

42 """Commands that the Random Vibration Signal Generation Process can accept""" 

43 

44 INITIALIZE_PARAMETERS = 0 

45 INITIALIZE_SIGNAL_GENERATOR = 1 

46 GENERATE_SIGNALS = 2 

47 START_SHUTDOWN = 3 

48 SHUTDOWN = 4 

49 MUTE = 5 

50 ADJUST_TEST_LEVEL = 6 

51 SET_TEST_LEVEL = 7 

52 SHUTDOWN_ACHIEVED = 8 

53 

54 

55class SignalGenerationMetadata: 

56 """General metadata required to define the signal generation process""" 

57 

58 def __init__( 

59 self, 

60 samples_per_write, 

61 level_ramp_samples, 

62 output_transformation_matrix=None, 

63 new_signal_sample_threshold=None, 

64 disabled_signals=None, 

65 ): 

66 self.ramp_samples = level_ramp_samples 

67 self.output_transformation_matrix = output_transformation_matrix 

68 self.samples_per_write = samples_per_write 

69 self.new_signal_sample_threshold = ( 

70 self.samples_per_write 

71 if new_signal_sample_threshold is None 

72 else new_signal_sample_threshold 

73 ) 

74 self.disabled_signals = [] if disabled_signals is None else disabled_signals 

75 

76 def __eq__(self, other): 

77 try: 

78 return np.all( 

79 [np.all(value == other.__dict__[field]) for field, value in self.__dict__.items()] 

80 ) 

81 except (AttributeError, KeyError): 

82 return False 

83 

84 

85class SignalGenerationProcess(AbstractMessageProcess): 

86 """Class encapsulating the signal generation process for random vibration 

87 

88 This class handles the signal generation of the Random Vibration environment. 

89 It accepts data from the data analysis process and transforms that data into 

90 time histories that are put into the data_out_queue.""" 

91 

92 def __init__( 

93 self, 

94 process_name: str, 

95 command_queue: VerboseMessageQueue, 

96 data_in_queue: mp.queues.Queue, 

97 data_out_queue: mp.queues.Queue, 

98 environment_command_queue: VerboseMessageQueue, 

99 log_file_queue: mp.queues.Queue, 

100 gui_update_queue: mp.queues.Queue, 

101 environment_name: str, 

102 ): 

103 """ 

104 Class encapsulating the signal generation process for random vibration 

105 

106 Parameters 

107 ---------- 

108 process_name : str 

109 A name to assign the process, primarily for logging purposes. 

110 queues : RandomEnvironmentQueues 

111 A list of Random Environment queues for communcation with other parts 

112 of the environment and the controller 

113 environment_name : str 

114 The name of the environment that this process is generating signals for. 

115 

116 """ 

117 super().__init__(process_name, log_file_queue, command_queue, gui_update_queue) 

118 self.map_command(SignalGenerationCommands.INITIALIZE_PARAMETERS, self.initialize_parameters) 

119 self.map_command( 

120 SignalGenerationCommands.INITIALIZE_SIGNAL_GENERATOR, 

121 self.initialize_signal_generator, 

122 ) 

123 self.map_command(SignalGenerationCommands.GENERATE_SIGNALS, self.generate_signals) 

124 self.map_command(SignalGenerationCommands.START_SHUTDOWN, self.start_shutdown) 

125 self.map_command(SignalGenerationCommands.SHUTDOWN, self.shutdown) 

126 self.map_command(SignalGenerationCommands.MUTE, self.mute) 

127 self.map_command(SignalGenerationCommands.ADJUST_TEST_LEVEL, self.adjust_test_level) 

128 self.map_command(SignalGenerationCommands.SET_TEST_LEVEL, self.set_test_level) 

129 self.environment_name = environment_name 

130 self.data_in_queue = data_in_queue 

131 self.data_out_queue = data_out_queue 

132 self.environment_command_queue = environment_command_queue 

133 self.ramp_samples = None 

134 self.output_transformation_matrix = None 

135 self.samples_per_write = None 

136 self.new_signal_sample_threshold = None 

137 self.test_level_target = 1.0 

138 self.current_test_level = 0.0 

139 self.test_level_change = 0.0 

140 self.signal_remainder = None 

141 self.startup = True 

142 self.shutdown_flag = False 

143 self.done_generating = False 

144 self.signal_generator = None 

145 self.disabled_signals = None 

146 

147 def initialize_parameters(self, data: SignalGenerationMetadata): 

148 """Stores environment signal processing parameters to the process 

149 

150 Parameters 

151 ---------- 

152 data : Metadata : 

153 Signal processing parameters for the environment 

154 

155 """ 

156 self.log("Initializing Test Parameters") 

157 self.ramp_samples = data.ramp_samples 

158 self.output_transformation_matrix = ( 

159 None 

160 if data.output_transformation_matrix is None 

161 else np.linalg.pinv(data.output_transformation_matrix) 

162 ) 

163 self.samples_per_write = data.samples_per_write 

164 self.new_signal_sample_threshold = data.new_signal_sample_threshold 

165 self.disabled_signals = data.disabled_signals 

166 

167 def initialize_signal_generator(self, signal_generator: SignalGenerator): 

168 """Stores the signal generator that will generate the signals 

169 

170 Parameters 

171 ---------- 

172 signal_generator : SignalGenerator 

173 A SignalGenerator object used by the process to generate signals 

174 """ 

175 self.signal_generator = signal_generator 

176 self.signal_remainder = None 

177 

178 def generate_signals(self, data): 

179 """Function to handle generation of signals for controlling the environment. 

180 

181 Parameters 

182 ---------- 

183 data : None 

184 Unused argument required due to the expectation that functions called 

185 by the RandomSignalGenerationProcess.generate_signals function will have one argument 

186 accepting any data passed along with the instruction. 

187 

188 """ 

189 # Check to make sure that the signal generator is defined 

190 if self.signal_generator is None: 

191 raise RuntimeError("Signal Generator object not yet defined!") 

192 # Check to see if we are just starting up 

193 if self.startup: 

194 self.log("Starting up output") 

195 # Check if we are ready to output immediately, otherwise, wait for 

196 # data to come in. 

197 if not self.signal_generator.ready_for_next_output: 

198 self.log("Waiting for Input Data") 

199 try: 

200 data = self.data_in_queue.get(timeout=10) 

201 except mp.queues.Empty: 

202 self.gui_update_queue.put( 

203 ( 

204 "error", 

205 ( 

206 f"{self.process_name} Error", 

207 f"{self.process_name} timed out while waiting for first " 

208 "set of parameters", 

209 ), 

210 ) 

211 ) 

212 return 

213 self.signal_generator.update_parameters(*data) 

214 self.startup = False 

215 # Check and see if there is any data in the queue that can be used to 

216 # update the signal generator 

217 update_data = flush_queue(self.data_in_queue) 

218 if len(update_data) > 0: 

219 # Assign the most recent data to the signal generator 

220 self.log("Got Updated Parameters") 

221 self.signal_generator.update_parameters(*update_data[-1]) 

222 if ( 

223 ( 

224 self.signal_remainder is None 

225 or self.signal_remainder.shape[-1] < self.new_signal_sample_threshold 

226 ) 

227 and not self.done_generating 

228 and self.signal_generator.ready_for_next_output 

229 ): 

230 self.log("Generating Frame of Data") 

231 new_signal, self.done_generating = self.signal_generator.generate_frame() 

232 self.log(f"Generated Signal with RMS \n {rms_time(new_signal, axis=-1)}") 

233 # During the first run through, signal_remainder will be None 

234 if self.signal_remainder is None: 

235 self.signal_remainder = new_signal 

236 else: 

237 # Otherwise we just concatenate the new data at the end 

238 self.signal_remainder = np.concatenate((self.signal_remainder, new_signal), axis=-1) 

239 # Now check if we need to send it to the output task 

240 if ( 

241 self.data_out_queue.empty() 

242 and self.signal_remainder is not None 

243 and self.signal_remainder.shape[-1] > 0 

244 ): 

245 self.log("Outputting Data") 

246 # Determine if this is the last output. This will be the last output 

247 # if the shutdown flag is set and the current test level is zero, 

248 # but also if we are done generating and there is no more signal in 

249 # the measurement frame. 

250 signal_to_output = self.signal_remainder[..., : self.samples_per_write] 

251 self.signal_remainder = self.signal_remainder[..., self.samples_per_write :] 

252 last_run = (self.shutdown_flag and self.current_test_level == 0.0) or ( 

253 self.done_generating and self.signal_remainder.shape[-1] == 0 

254 ) 

255 self.output(signal_to_output, last_run) 

256 # Run again 

257 if last_run: 

258 self.log("Received Last Run, Shutting Down") 

259 self.shutdown() 

260 return 

261 self.command_queue.put(self.process_name, (SignalGenerationCommands.GENERATE_SIGNALS, None)) 

262 

263 def output(self, write_data, last_signal=False): 

264 """Puts data to the data_out_queue and handles test level changes 

265 

266 This function keeps track of the environment test level and scales the 

267 output signals accordingly prior to placing them into the data_out_queue. 

268 This function also handles the ramping between two test levels. 

269 

270 Parameters 

271 ---------- 

272 write_data : np.ndarray 

273 A numpy array containing the signals to be written. 

274 

275 last_signal : 

276 Specifies if the signal being written is the last signal that will 

277 be generated due to the signal generation shutting down. This is 

278 passed to the output task to tell it that there will be no more 

279 signals from this environment until it is restarted. (Default value 

280 = False) 

281 """ 

282 # np.savez('test_data/signal_generation_initial_output_data_check.npz', 

283 # write_data = write_data) 

284 # Perform the output transformation if necessary 

285 if len(self.disabled_signals) > 0: 

286 write_data = write_data.copy() 

287 write_data[self.disabled_signals] = 0 

288 if self.output_transformation_matrix is not None: 

289 self.log("Applying Transformation") 

290 write_data = self.output_transformation_matrix @ write_data 

291 # Compute the test_level scaling for this dataset 

292 if self.test_level_change == 0.0: 

293 test_level = self.current_test_level 

294 self.log(f"Test Level at {test_level}") 

295 else: 

296 test_level = ( 

297 self.current_test_level 

298 + (np.arange(write_data.shape[-1]) + 1) * self.test_level_change 

299 ) 

300 # Compute distance in steps from the target 

301 # test_level and find where it is near the target 

302 full_level_index = np.nonzero( 

303 abs(test_level - self.test_level_target) / abs(self.test_level_change) 

304 < TEST_LEVEL_THRESHOLD 

305 )[0] 

306 # Check if any are 

307 if len(full_level_index) > 0: 

308 # If so, set all test_levels after that one to the target test_level 

309 test_level[full_level_index[0] + 1 :] = self.test_level_target 

310 # And update that our current test_level is now the target test_level 

311 self.current_test_level = self.test_level_target 

312 self.test_level_change = 0.0 

313 else: 

314 # Otherwise, our current test_level is the last entry in the test_level scaling 

315 self.current_test_level = test_level[-1] 

316 self.log(f"Test level from {test_level[0]} to {test_level[-1]}") 

317 # Write the test level-scaled data to the task 

318 self.log("Sending data to data_out queue") 

319 # self.write_index += 1 

320 # np.savez('signal_generation_output_data_check_{:}.npz'.format(self.write_index), 

321 # write_data = write_data,test_level = test_level) 

322 self.log(f"Sending Output with RMS \n {rms_time(write_data * test_level, axis=-1)}") 

323 if DEBUG: 

324 num_files = len(glob(FILE_OUTPUT.format("*"))) 

325 np.savez( 

326 FILE_OUTPUT.format(num_files), 

327 write_data=write_data * test_level, 

328 last_signal=last_signal, 

329 ) 

330 self.data_out_queue.put((copy.deepcopy(write_data * test_level), last_signal)) 

331 

332 def mute(self, data): # pylint: disable=unused-argument 

333 """Immediately mute the signal generation task 

334 

335 This function should primarily only be called at the beginning of an 

336 analysis to ensure the system ramps up from zero excitation. Muting 

337 the signal generation during excitation will shock load the exciters and 

338 test article and may damage those hardware. 

339 

340 Parameters 

341 ---------- 

342 data : None 

343 Unused argument required due to the expectation that functions called 

344 by the SignalGenerationProcess.run function will have one argument 

345 accepting any data passed along with the instruction. 

346 

347 """ 

348 self.current_test_level = 0.0 

349 self.test_level_target = 0.0 

350 self.test_level_change = 0.0 

351 

352 def set_test_level(self, data): 

353 """Immediately set the level of the signal generation task 

354 

355 This function should primarily only be called at the beginning of an 

356 analysis to ensure the system ramps up from zero excitation. Setting 

357 the signal generation during excitation will shock load the exciters and 

358 test article and may damage those hardware. 

359 

360 Parameters 

361 ---------- 

362 data : level 

363 The level of the signal generator 

364 

365 """ 

366 self.current_test_level = data 

367 self.test_level_target = data 

368 self.test_level_change = 0.0 

369 

370 def adjust_test_level(self, data): 

371 """Sets a new target test level and computes the test level change per sample 

372 

373 Parameters 

374 ---------- 

375 data : float: 

376 The new test level target scale factor. 

377 

378 """ 

379 self.test_level_target = data 

380 self.test_level_change = ( 

381 self.test_level_target - self.current_test_level 

382 ) / self.ramp_samples 

383 if self.test_level_change != 0.0: 

384 self.log( 

385 f"Changed test level from {self.current_test_level} to {self.test_level_target}, " 

386 f"{self.test_level_change} change per sample" 

387 ) 

388 

389 def start_shutdown(self, data): # pylint: disable=unused-argument 

390 """Starts the shutdown process for the signal generation process 

391 

392 This will set the shutdown flag to true and adjust the test level to 

393 zero. Note that this does not immediately stop the generation because 

394 the actual test level will take some time to ramp down to zero as to 

395 not shock load the test system. 

396 

397 Parameters 

398 ---------- 

399 data : None 

400 Unused argument required due to the expectation that functions called 

401 by the RandomSignalGenerationProcess.Run function will have one argument 

402 accepting any data passed along with the instruction. 

403 

404 """ 

405 if self.shutdown_flag or self.startup: 

406 # This means we weren't supposed to shutdown, it was an extra signal. 

407 return 

408 self.shutdown_flag = True 

409 self.adjust_test_level(0.0) 

410 # Get any commands that might be in the queue currently 

411 commands = self.command_queue.flush(self.process_name) 

412 commands = [command[0] for command in commands] 

413 # Put the run command back onto the stack. 

414 if SignalGenerationCommands.GENERATE_SIGNALS in commands: 

415 self.command_queue.put( 

416 self.process_name, (SignalGenerationCommands.GENERATE_SIGNALS, None) 

417 ) 

418 

419 def shutdown(self): 

420 """Performs final cleanup operations when the system has shut down 

421 

422 This function is called when the signal generation has been instructed 

423 to shut down and the test level has reached zero. The signal generation 

424 is the first process in the Random Vibration environment to stop when 

425 shutdown is called, so it notifies the environment process to stop the 

426 acquisition and analysis tasks because it is no longer generating signals 

427 """ 

428 self.log("Shutting Down Signal Generation") 

429 self.command_queue.flush(self.process_name) 

430 # Tell the other processes to shut down as well 

431 self.environment_command_queue.put( 

432 self.process_name, (SignalGenerationCommands.SHUTDOWN_ACHIEVED, None) 

433 ) 

434 self.startup = True 

435 self.shutdown_flag = False 

436 self.done_generating = False 

437 

438 

439def signal_generation_process( 

440 environment_name: str, 

441 command_queue: VerboseMessageQueue, 

442 data_in_queue: mp.queues.Queue, 

443 data_out_queue: mp.queues.Queue, 

444 environment_command_queue: VerboseMessageQueue, 

445 log_file_queue: mp.queues.Queue, 

446 gui_update_queue: mp.queues.Queue, 

447 process_name: str = None, 

448): 

449 """Signal generation process function called by multiprocessing 

450 

451 This function defines the Signal Generation process that 

452 gets run by the multiprocessing module when it creates a new process. It 

453 creates a SignalGenerationProcess object and runs it. 

454 

455 Parameters 

456 ---------- 

457 environment_name : str : 

458 Name of the environment associated with this signal generation process 

459 """ 

460 

461 signal_generation_instance = SignalGenerationProcess( 

462 (environment_name + " Signal Generation" if process_name is None else process_name), 

463 command_queue, 

464 data_in_queue, 

465 data_out_queue, 

466 environment_command_queue, 

467 log_file_queue, 

468 gui_update_queue, 

469 environment_name, 

470 ) 

471 

472 signal_generation_instance.run()