Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / output.py: 51%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Controller subsystem that handles the output from the hardware to the 

4shaker amplifiers or other excitation device. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24 

25import multiprocessing as mp 

26import multiprocessing.sharedctypes # pylint: disable=unused-import 

27 

28import numpy as np 

29 

30from .abstract_message_process import AbstractMessageProcess 

31from .utilities import GlobalCommands, QueueContainer, flush_queue, rms_time 

32 

33TASK_NAME = "Output" 

34 

35DEBUG = False 

36if DEBUG: 

37 from glob import glob 

38 

39 FILE_OUTPUT = "debug_data/output_{:}.npz" 

40 ENV_OUTPUT = "debug_data/output_first_data_{:}.npz" 

41 

42 

43class OutputProcess(AbstractMessageProcess): 

44 """Class defining the output behavior of the controller 

45 

46 This class will handle collecting data from the environments and writing 

47 data to be output to the hardware 

48 

49 See AbstractMessageProcess for inherited members. 

50 """ 

51 

52 def __init__( 

53 self, 

54 process_name: str, 

55 queue_container: QueueContainer, 

56 environments: list, 

57 output_active: mp.sharedctypes.Synchronized, 

58 ): 

59 """ 

60 Constructor for the OutputProcess Class 

61 

62 Sets up the ``command_map`` and initializes all data members. 

63 

64 Parameters 

65 ---------- 

66 process_name : str 

67 The name of the process. 

68 queue_container : QueueContainer 

69 A container containing the queues used to communicate between 

70 controller processes 

71 environments : list 

72 A list of ``(ControlType,environment_name)`` pairs that define the 

73 environments in the controller. 

74 

75 """ 

76 super().__init__( 

77 process_name, 

78 queue_container.log_file_queue, 

79 queue_container.output_command_queue, 

80 queue_container.gui_update_queue, 

81 ) 

82 self.map_command( 

83 GlobalCommands.INITIALIZE_DATA_ACQUISITION, self.initialize_data_acquisition 

84 ) 

85 self.map_command(GlobalCommands.RUN_HARDWARE, self.output_signal) 

86 self.map_command(GlobalCommands.STOP_HARDWARE, self.stop_output) 

87 self.map_command(GlobalCommands.START_ENVIRONMENT, self.start_environment) 

88 # Communication 

89 self.queue_container = queue_container 

90 self.startup = True 

91 self.shutdown_flag = False 

92 # Sampling data 

93 self.sample_rate = None 

94 self.write_size = None 

95 self.num_outputs = None 

96 self.output_oversample = None 

97 # Environment Data 

98 self.environment_list = self.environment_list = [ 

99 environment[1] for environment in environments 

100 ] 

101 self.environment_output_channels = None 

102 self.environment_active_flags = { 

103 environment: False for environment in self.environment_list 

104 } 

105 self.environment_starting_up_flags = { 

106 environment: False for environment in self.environment_list 

107 } 

108 self.environment_shutting_down_flags = { 

109 environment: False for environment in self.environment_list 

110 } 

111 self.environment_data_out_remainders = None 

112 self.environment_first_data = {environment: False for environment in self.environment_list} 

113 # Hardware data 

114 self.hardware = None 

115 # Shared memory to record activity 

116 self._output_active = output_active 

117 # print('output setup') 

118 

119 @property 

120 def output_active(self): 

121 """Returns True if the output is currently active""" 

122 return bool(self._output_active.value) 

123 

124 @output_active.setter 

125 def output_active(self, val): 

126 # print('output currently active: {:}'.format(self.output_active)) 

127 # print('setting output active') 

128 if val: 

129 self._output_active.value = 1 

130 else: 

131 self._output_active.value = 0 

132 # print('set output active') 

133 

134 def initialize_data_acquisition(self, data): 

135 """ 

136 Sets up the output according to the specified parameters 

137 

138 Parameters 

139 ---------- 

140 data : tuple 

141 A tuple consisting of data acquisition parameters and the channels 

142 used by each environment. 

143 

144 """ 

145 self.log("Initializing Data Acquisition") 

146 # Pull out invormation from the queue 

147 data_acquisition_parameters, environment_channels = data 

148 # Store pertinent data 

149 self.sample_rate = data_acquisition_parameters.sample_rate 

150 self.write_size = data_acquisition_parameters.samples_per_write 

151 self.output_oversample = data_acquisition_parameters.output_oversample 

152 # Check which type of hardware we have 

153 if self.hardware is not None: 

154 self.hardware.close() 

155 if data_acquisition_parameters.hardware == 0: 

156 from .nidaqmx_hardware_multitask import NIDAQmxOutput 

157 

158 self.hardware = NIDAQmxOutput( 

159 data_acquisition_parameters.extra_parameters["task_trigger"], 

160 data_acquisition_parameters.extra_parameters["task_trigger_output_channel"], 

161 ) 

162 elif data_acquisition_parameters.hardware == 1: 

163 from .lanxi_hardware_multiprocessing import LanXIOutput 

164 

165 self.hardware = LanXIOutput( 

166 data_acquisition_parameters.extra_parameters["maximum_acquisition_processes"] 

167 ) 

168 elif data_acquisition_parameters.hardware == 2: 

169 from .data_physics_hardware import DataPhysicsOutput 

170 

171 self.hardware = DataPhysicsOutput(self.queue_container.single_process_hardware_queue) 

172 elif data_acquisition_parameters.hardware == 3: 

173 from .data_physics_dp900_hardware import DataPhysicsDP900Output 

174 

175 self.hardware = DataPhysicsDP900Output( 

176 self.queue_container.single_process_hardware_queue, 

177 ) 

178 elif data_acquisition_parameters.hardware == 4: 

179 from .exodus_modal_solution_hardware import ExodusOutput 

180 

181 self.hardware = ExodusOutput(self.queue_container.single_process_hardware_queue) 

182 elif data_acquisition_parameters.hardware == 5: 

183 from .state_space_virtual_hardware import StateSpaceOutput 

184 

185 self.hardware = StateSpaceOutput(self.queue_container.single_process_hardware_queue) 

186 elif data_acquisition_parameters.hardware == 6: 

187 from .sdynpy_system_virtual_hardware import SDynPySystemOutput 

188 

189 self.hardware = SDynPySystemOutput(self.queue_container.single_process_hardware_queue) 

190 elif data_acquisition_parameters.hardware == 7: 

191 from .sdynpy_frf_virtual_hardware import SDynPyFRFOutput 

192 

193 self.hardware = SDynPyFRFOutput(self.queue_container.single_process_hardware_queue) 

194 else: 

195 raise ValueError("Invalid Hardware or Hardware Not Implemented!") 

196 # Initialize hardware and create channels 

197 self.hardware.set_up_data_output_parameters_and_channels( 

198 data_acquisition_parameters, data_acquisition_parameters.channel_list 

199 ) 

200 # Get the environment output channels in reference to all the output channels 

201 output_indices = [ 

202 index 

203 for index, channel in enumerate(data_acquisition_parameters.channel_list) 

204 if not (channel.feedback_device is None) and not (channel.feedback_device.strip() == "") 

205 ] 

206 self.num_outputs = len(output_indices) 

207 self.environment_output_channels = {} 

208 self.environment_data_out_remainders = {} 

209 for environment, active_indices in environment_channels.items(): 

210 common_indices, out_inds, _ = np.intersect1d( 

211 output_indices, active_indices, return_indices=True 

212 ) 

213 self.environment_output_channels[environment] = out_inds 

214 self.environment_data_out_remainders[environment] = np.zeros((len(common_indices), 0)) 

215 

216 def output_signal(self, data): # pylint: disable=unused-argument 

217 """The main output loop of the controller. 

218 

219 If it is the first time through the loop, ``self.startup`` will be set 

220 to ``True`` and the hardware will be started. 

221 

222 The output task must be notified that a given channel has started before 

223 it will check the channel for data. 

224 

225 The function receives data from each of the environment data_out queues 

226 as well as a flag specifying whether the signal is the last signal. If 

227 so, the output will deactivate the channel and stop looking for data 

228 from it until the next time it is activated. 

229 

230 If the output is shut down, it will activate a ``shutdown_flag``, after 

231 which the output will wait for all environments to pass their last data 

232 and be deactivated. Once all environments are deactivated, the output 

233 will stop and shutdown the hardware. 

234 

235 Parameters 

236 ---------- 

237 data : Ignored 

238 This parameter is not used by the function but must be present 

239 due to the calling signature of functions called through the 

240 ``command_map`` 

241 

242 """ 

243 # Skip hardware operations if there are no channels 

244 skip_hardware = self.num_outputs == 0 

245 # Go through each environment and collect data no matter what. 

246 # Start with ready to write and set to false if any environments are not 

247 ready_to_write = True 

248 for environment in self.environment_list: 

249 # If the task isn't active or currently shutting down, we don't need more data, 

250 # so just skip it. 

251 if ( 

252 not self.environment_active_flags[environment] 

253 and not self.environment_starting_up_flags[environment] 

254 ) or self.environment_shutting_down_flags[environment]: 

255 continue 

256 # Check if we need more data from that environment 

257 if self.environment_data_out_remainders[environment].shape[-1] < self.write_size: 

258 if not self.environment_starting_up_flags[environment]: 

259 ready_to_write = False 

260 try: 

261 # Try to grab data from the queue and add it to the remainders. 

262 environment_data, last_run = self.queue_container.environment_data_out_queues[ 

263 environment 

264 ].get_nowait() 

265 except mp.queues.Empty: 

266 # If data is not ready yet, simply continue to the next environment and we'll 

267 # try on the next time around. 

268 continue 

269 self.log( 

270 f"Got {' x '.join([f'{shape}' for shape in environment_data.shape])} " 

271 f"data from {environment} Environment" 

272 ) 

273 if last_run: 

274 self.log(f"Deactivating {environment} Environment") 

275 self.environment_shutting_down_flags[environment] = True 

276 if self.environment_starting_up_flags[environment]: 

277 self.environment_starting_up_flags[environment] = False 

278 self.environment_active_flags[environment] = True 

279 self.environment_first_data[environment] = True 

280 self.environment_data_out_remainders[environment] = np.concatenate( 

281 ( 

282 self.environment_data_out_remainders[environment], 

283 environment_data, 

284 ), 

285 axis=-1, 

286 ) 

287 else: 

288 if self.environment_starting_up_flags[environment]: 

289 self.environment_starting_up_flags[environment] = False 

290 self.environment_active_flags[environment] = True 

291 self.log(f"Received First Complete Data Write for {environment} Environment") 

292 self.environment_first_data[environment] = True 

293 

294 # If we got through that previous loop still ready to write, we can 

295 # output the next signal if the hardware is ready 

296 if ready_to_write and ( 

297 self.startup or skip_hardware or self.hardware.ready_for_new_output() 

298 ): 

299 remainder_log = [ 

300 (environment, remainder.shape[-1]) 

301 for environment, remainder in self.environment_data_out_remainders.items() 

302 if self.environment_active_flags[environment] 

303 ] 

304 self.log(f"Ready to Write: Environment Remainders {remainder_log}") 

305 write_data = np.zeros((self.num_outputs, self.write_size)) 

306 for environment in self.environment_list: 

307 # If the task is shutting down and all the data has been drained from it, 

308 # make it inactive and just skip it. 

309 if ( 

310 self.environment_shutting_down_flags[environment] 

311 and self.environment_data_out_remainders[environment].shape[-1] == 0 

312 ): 

313 self.environment_active_flags[environment] = False 

314 self.environment_starting_up_flags[environment] = False 

315 self.queue_container.acquisition_command_queue.put( 

316 self.process_name, 

317 (GlobalCommands.STOP_ENVIRONMENT, environment), 

318 ) 

319 self.environment_shutting_down_flags[environment] = False 

320 continue 

321 # If the task is inactive, also just skip it 

322 elif not self.environment_active_flags[environment]: 

323 continue 

324 # Get the indices corresponding to the output channels 

325 output_indices = self.environment_output_channels[environment] 

326 # Determine how many time steps are available to write 

327 output_timesteps = min( 

328 self.environment_data_out_remainders[environment].shape[-1], 

329 self.write_size, 

330 ) 

331 # Write one portion of the environment output to write_data 

332 write_data[ 

333 output_indices, :output_timesteps 

334 ] += self.environment_data_out_remainders[environment][:, :output_timesteps] 

335 self.environment_data_out_remainders[environment] = ( 

336 self.environment_data_out_remainders[environment][:, output_timesteps:] 

337 ) 

338 # Now that we have each environment accounted for in the output we 

339 # can write to the hardware 

340 self.log( 

341 f"Writing {' x '.join(f'{shape}' for shape in write_data.shape)} data to hardware" 

342 ) 

343 if not skip_hardware: 

344 self.log( 

345 f"Output Writing Data to Hardware RMS: \n {rms_time(write_data, axis=-1)}" 

346 ) 

347 for environment in self.environment_first_data: 

348 if self.environment_first_data[environment]: 

349 self.log( 

350 f"Sending first data for environment {environment} to Acquisition " 

351 "for syncing" 

352 ) 

353 self.queue_container.input_output_sync_queue.put( 

354 ( 

355 environment, 

356 write_data[..., :: self.output_oversample].copy(), 

357 ) 

358 ) 

359 self.environment_first_data[environment] = False 

360 if DEBUG: 

361 np.savez(ENV_OUTPUT.format(environment), write_data=write_data) 

362 if DEBUG: 

363 num_files = len(glob(FILE_OUTPUT.format("*"))) 

364 np.savez(FILE_OUTPUT.format(num_files), write_data=write_data) 

365 self.hardware.write(write_data.copy()) 

366 else: 

367 if self.environment_first_data[environment]: 

368 self.queue_container.input_output_sync_queue.put((environment, 0)) 

369 self.environment_first_data[environment] = False 

370 # np.savez('test_data/output_data_check.npz',output_data = write_data) 

371 # Now check and see if we are starting up and start the hardare if so 

372 if self.startup: 

373 self.log("Starting Hardware Output") 

374 if not skip_hardware: 

375 self.hardware.start() 

376 # Send something to the sync queue to tell acquisition to start now 

377 # We will send None because it is unique and we won't have an 

378 # environment with that name 

379 self.queue_container.input_output_sync_queue.put((None, True)) 

380 self.startup = False 

381 self.output_active = True 

382 # print('started output') 

383 # Now check if we need to shut down. 

384 if ( 

385 self.shutdown_flag # Time to shut down 

386 and all( 

387 [not flag for environment, flag in self.environment_active_flags.items()] 

388 ) # Check that all environments are not active 

389 and all( 

390 [not flag for environment, flag in self.environment_starting_up_flags.items()] 

391 ) # Check that all environments are not starting up 

392 and all( 

393 [ 

394 remainder.shape[-1] == 0 

395 for environment, remainder in self.environment_data_out_remainders.items() 

396 ] 

397 ) # Check that all data is written 

398 ): 

399 self.log("Stopping Hardware") 

400 if not skip_hardware: 

401 self.hardware.stop() 

402 self.startup = True 

403 self.shutdown_flag = False 

404 flush_queue(self.queue_container.input_output_sync_queue) 

405 self.output_active = False 

406 else: 

407 # Otherwise keep going 

408 self.queue_container.output_command_queue.put( 

409 self.process_name, (GlobalCommands.RUN_HARDWARE, None) 

410 ) 

411 

412 def stop_output(self, data): # pylint: disable=unused-argument 

413 """Sets a flag telling the output that it should start shutting down 

414 

415 Parameters 

416 ---------- 

417 data : Ignored 

418 This parameter is not used by the function but must be present 

419 due to the calling signature of functions called through the 

420 ``command_map`` 

421 

422 """ 

423 self.log("Starting Shutdown Procedure") 

424 self.shutdown_flag = True 

425 

426 def start_environment(self, data): 

427 """Sets the flag stating the specified environment is active 

428 

429 Parameters 

430 ---------- 

431 data : str 

432 The environment name that should be activated. 

433 

434 """ 

435 self.log(f"Started Environment {data}") 

436 self.environment_starting_up_flags[data] = True 

437 self.environment_shutting_down_flags[data] = False 

438 self.environment_active_flags[data] = False 

439 

440 def quit(self, data): # pylint: disable=unused-argument 

441 """Stops the process and shuts down the hardware if necessary. 

442 

443 Parameters 

444 ---------- 

445 data : Ignored 

446 This parameter is not used by the function but must be present 

447 due to the calling signature of functions called through the 

448 ``command_map`` 

449 """ 

450 # Pull any data off the queues that have been put to 

451 queue_flush_sum = 0 

452 for queue in [q for _, q in self.queue_container.environment_data_out_queues.items()] + [ 

453 self.queue_container.output_command_queue, 

454 self.queue_container.input_output_sync_queue, 

455 self.queue_container.single_process_hardware_queue, 

456 ]: 

457 queue_flush_sum += len(flush_queue(queue)) 

458 self.log(f"Flushed {queue_flush_sum} items out of queues") 

459 if self.hardware is not None: 

460 self.hardware.close() 

461 return True 

462 

463 

464def output_process( 

465 queue_container: QueueContainer, environments: list, output_active: mp.sharedctypes.Synchronized 

466): 

467 """Function passed to multiprocessing as the output process 

468 

469 This process creates the ``OutputProcess`` object and calls the ``run`` 

470 command. 

471 

472 Parameters 

473 ---------- 

474 queue_container : QueueContainer 

475 A container containing the queues used to communicate between 

476 controller processes 

477 environments : list 

478 A list of ``(ControlType,environment_name)`` pairs that define the 

479 environments in the controller. 

480 

481 """ 

482 

483 output_instance = OutputProcess(TASK_NAME, queue_container, environments, output_active) 

484 

485 output_instance.run()