Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / acquisition.py: 73%

241 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Controller Subsystem that handles the reading of data from the hardware. 

4 

5Rattlesnake Vibration Control Software 

6Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

7(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

8Government retains certain rights in this software. 

9 

10This program is free software: you can redistribute it and/or modify 

11it under the terms of the GNU General Public License as published by 

12the Free Software Foundation, either version 3 of the License, or 

13(at your option) any later version. 

14 

15This program is distributed in the hope that it will be useful, 

16but WITHOUT ANY WARRANTY; without even the implied warranty of 

17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18GNU General Public License for more details. 

19 

20You should have received a copy of the GNU General Public License 

21along with this program. If not, see <https://www.gnu.org/licenses/>. 

22""" 

23 

24import multiprocessing as mp 

25import multiprocessing.sharedctypes # pylint: disable=unused-import 

26from time import sleep, time 

27 

28import numpy as np 

29 

30from .abstract_message_process import AbstractMessageProcess 

31from .utilities import ( 

32 GlobalCommands, 

33 QueueContainer, 

34 align_signals, 

35 correlation_norm_signal_spec_ratio, 

36 flush_queue, 

37) 

38 

39DEBUG = False 

40if DEBUG: 

41 from glob import glob 

42 

43 FILE_OUTPUT = "debug_data/acquisition_{:}.npz" 

44 

45 

46class AcquisitionProcess(AbstractMessageProcess): 

47 """Class defining the acquisition behavior of the controller 

48 

49 This class will handle reading data from the hardware and then sending it 

50 to the individual environment processes. 

51 

52 See AbstractMesssageProcess for inherited class members. 

53 """ 

54 

55 def __init__( 

56 self, 

57 process_name: str, 

58 queue_container: QueueContainer, 

59 environments: list, 

60 acquisition_active: mp.sharedctypes.Synchronized, 

61 ): 

62 """ 

63 Constructor for the AcquisitionProcess class 

64 

65 Sets up the ``command_map`` and initializes all data members. 

66 

67 Parameters 

68 ---------- 

69 process_name : str 

70 The name of the process. 

71 queue_container : QueueContainer 

72 A container containing the queues used to communicate between 

73 controller processes 

74 environments : list 

75 A list of ``(ControlType,environment_name)`` pairs that define the 

76 environments in the controller. 

77 

78 

79 """ 

80 super().__init__( 

81 process_name, 

82 queue_container.log_file_queue, 

83 queue_container.acquisition_command_queue, 

84 queue_container.gui_update_queue, 

85 ) 

86 self.map_command( 

87 GlobalCommands.INITIALIZE_DATA_ACQUISITION, self.initialize_data_acquisition 

88 ) 

89 self.map_command(GlobalCommands.RUN_HARDWARE, self.acquire_signal) 

90 self.map_command(GlobalCommands.STOP_HARDWARE, self.stop_acquisition) 

91 self.map_command(GlobalCommands.STOP_ENVIRONMENT, self.stop_environment) 

92 self.map_command(GlobalCommands.START_STREAMING, self.start_streaming) 

93 self.map_command(GlobalCommands.STOP_STREAMING, self.stop_streaming) 

94 

95 # Communication 

96 self.queue_container = queue_container 

97 self.startup = True 

98 self.shutdown_flag = False 

99 self.any_environments_started = False 

100 # Sampling data 

101 self.sample_rate = None 

102 self.read_size = None 

103 # Environment Data 

104 self.environment_list = [environment[1] for environment in environments] 

105 self.environment_acquisition_channels = None 

106 self.environment_active_flags = { 

107 environment: False for environment in self.environment_list 

108 } 

109 self.environment_last_data = {environment: False for environment in self.environment_list} 

110 self.environment_samples_remaining_to_read = { 

111 environment: 0 for environment in self.environment_list 

112 } 

113 self.environment_first_data = {environment: None for environment in self.environment_list} 

114 # Hardware data 

115 self.hardware = None 

116 # Streaming Information 

117 self.streaming = False 

118 self.has_streamed = False 

119 # Persistent data 

120 self.read_data = None 

121 self.output_indices = None 

122 # Abort and Warning Limits 

123 self.abort_limits = None 

124 self.warning_limits = None 

125 self._acquisition_active = acquisition_active 

126 # print('acquisition setup') 

127 

128 @property 

129 def acquisition_active(self): 

130 """Returns True if the acquisition is currently running""" 

131 return bool(self._acquisition_active.value) 

132 

133 @acquisition_active.setter 

134 def acquisition_active(self, val): 

135 # print('output currently active: {:}'.format(self.acquisition_active)) 

136 # print('setting acquisition active') 

137 if val: 

138 self._acquisition_active.value = 1 

139 else: 

140 self._acquisition_active.value = 0 

141 # print('set acquisition active') 

142 

143 def initialize_data_acquisition(self, data): 

144 """Sets up the acquisition according to the specified parameters 

145 

146 Parameters 

147 ---------- 

148 data : tuple 

149 A tuple consisting of data acquisition parameters and the channels 

150 used by each environment. 

151 

152 """ 

153 self.log("Initializing Data Acquisition") 

154 # Pull out information from the queue 

155 data_acquisition_parameters, self.environment_acquisition_channels = data 

156 # Store pertinent data 

157 self.sample_rate = data_acquisition_parameters.sample_rate 

158 self.read_size = data_acquisition_parameters.samples_per_read 

159 # Check which type of hardware we have 

160 if self.hardware is not None: 

161 self.hardware.close() 

162 if data_acquisition_parameters.hardware == 0: 

163 from .nidaqmx_hardware_multitask import NIDAQmxAcquisition 

164 

165 self.hardware = NIDAQmxAcquisition( 

166 data_acquisition_parameters.extra_parameters["task_trigger"], 

167 data_acquisition_parameters.extra_parameters["task_trigger_output_channel"], 

168 ) 

169 elif data_acquisition_parameters.hardware == 1: 

170 from .lanxi_hardware_multiprocessing import LanXIAcquisition 

171 

172 self.hardware = LanXIAcquisition( 

173 data_acquisition_parameters.extra_parameters["maximum_acquisition_processes"] 

174 ) 

175 elif data_acquisition_parameters.hardware == 2: 

176 from .data_physics_hardware import DataPhysicsAcquisition 

177 

178 self.hardware = DataPhysicsAcquisition( 

179 data_acquisition_parameters.hardware_file, 

180 self.queue_container.single_process_hardware_queue, 

181 ) 

182 elif data_acquisition_parameters.hardware == 3: 

183 from .data_physics_dp900_hardware import DataPhysicsDP900Acquisition 

184 

185 self.hardware = DataPhysicsDP900Acquisition( 

186 data_acquisition_parameters.hardware_file, 

187 self.queue_container.single_process_hardware_queue, 

188 ) 

189 elif data_acquisition_parameters.hardware == 4: 

190 from .exodus_modal_solution_hardware import ExodusAcquisition 

191 

192 self.hardware = ExodusAcquisition( 

193 data_acquisition_parameters.hardware_file, 

194 self.queue_container.single_process_hardware_queue, 

195 ) 

196 elif data_acquisition_parameters.hardware == 5: 

197 from .state_space_virtual_hardware import StateSpaceAcquisition 

198 

199 self.hardware = StateSpaceAcquisition( 

200 data_acquisition_parameters.hardware_file, 

201 self.queue_container.single_process_hardware_queue, 

202 ) 

203 elif data_acquisition_parameters.hardware == 6: 

204 from .sdynpy_system_virtual_hardware import SDynPySystemAcquisition 

205 

206 self.hardware = SDynPySystemAcquisition( 

207 data_acquisition_parameters.hardware_file, 

208 self.queue_container.single_process_hardware_queue, 

209 ) 

210 elif data_acquisition_parameters.hardware == 7: 

211 from .sdynpy_frf_virtual_hardware import SDynPyFRFAcquisition 

212 

213 self.hardware = SDynPyFRFAcquisition( 

214 data_acquisition_parameters.hardware_file, 

215 self.queue_container.single_process_hardware_queue, 

216 ) 

217 else: 

218 raise ValueError("Invalid Hardware or Hardware Not Implemented!") 

219 # Initialize hardware and create channels 

220 self.hardware.set_up_data_acquisition_parameters_and_channels( 

221 data_acquisition_parameters, data_acquisition_parameters.channel_list 

222 ) 

223 # Set up warning and abort limits 

224 self.abort_limits = [] 

225 self.warning_limits = [] 

226 for channel in data_acquisition_parameters.channel_list: 

227 try: 

228 warning_limit = float(channel.warning_level) 

229 except (ValueError, TypeError): 

230 warning_limit = float("inf") # Never warn on this channel 

231 try: 

232 abort_limit = float(channel.abort_level) 

233 except (ValueError, TypeError): 

234 abort_limit = float("inf") # Never abort on this channel if not specified 

235 self.warning_limits.append(warning_limit) 

236 self.abort_limits.append(abort_limit) 

237 self.abort_limits = np.array(self.abort_limits) 

238 self.warning_limits = np.array(self.warning_limits) 

239 self.output_indices = [ 

240 index 

241 for index, channel in enumerate(data_acquisition_parameters.channel_list) 

242 if not (channel.feedback_device is None) and not (channel.feedback_device.strip() == "") 

243 ] 

244 self.read_data = np.zeros( 

245 ( 

246 len(data_acquisition_parameters.channel_list), 

247 4 

248 * np.max( 

249 [ 

250 data_acquisition_parameters.samples_per_read, 

251 data_acquisition_parameters.samples_per_write 

252 // data_acquisition_parameters.output_oversample, 

253 ] 

254 ), 

255 ) 

256 ) 

257 

258 def stop_environment(self, data): 

259 """Sets flags stating that the specified environment will be ending. 

260 

261 Parameters 

262 ---------- 

263 data : str 

264 The environment name that should be deactivated 

265 

266 """ 

267 self.log(f"Deactivating Environment {data}") 

268 self.environment_active_flags[data] = False 

269 self.environment_last_data[data] = True 

270 self.environment_samples_remaining_to_read[data] = self.hardware.get_acquisition_delay() 

271 

272 def start_streaming(self, data): # pylint: disable=unused-argument 

273 """Sets the flag to tell the acquisition to write data to disk 

274 

275 Parameters 

276 ---------- 

277 data : Ignored 

278 This parameter is not used by the function but must be present 

279 due to the calling signature of functions called through the 

280 ``command_map`` 

281 

282 """ 

283 self.streaming = True 

284 if self.has_streamed: 

285 self.queue_container.streaming_command_queue.put( 

286 self.process_name, (GlobalCommands.CREATE_NEW_STREAM, None) 

287 ) 

288 else: 

289 self.has_streamed = True 

290 

291 def stop_streaming(self, data): # pylint: disable=unused-argument 

292 """Sets the flag to tell the acquisition to not write data to disk 

293 

294 Parameters 

295 ---------- 

296 data : Ignored 

297 This parameter is not used by the function but must be present 

298 due to the calling signature of functions called through the 

299 ``command_map`` 

300 

301 """ 

302 self.streaming = False 

303 

304 def acquire_signal(self, data): 

305 """The main acquisition loop of the controller. 

306 

307 If it is the first time through this loop, startup will be set to True 

308 and the hardware will be started. 

309 

310 If it is the last time through this loop, the hardware will be shut 

311 down. 

312 

313 The function will simply read the data from the hardware and pass it 

314 to any active environment and to the streaming process if the process 

315 is active. 

316 

317 Parameters 

318 ---------- 

319 data : Ignored 

320 This parameter is not used by the function but must be present 

321 due to the calling signature of functions called through the 

322 ``command_map`` 

323 

324 """ 

325 if self.startup: 

326 self.any_environments_started = False 

327 self.log("Waiting for Output to Start") 

328 start_wait_time = time() 

329 while True: 

330 # Try to get data from the measurement if we can 

331 try: 

332 environment, data = self.queue_container.input_output_sync_queue.get_nowait() 

333 except mp.queues.Empty: 

334 if time() - start_wait_time > 30: 

335 self.queue_container.gui_update_queue.put( 

336 ( 

337 "error", 

338 ( 

339 "Acquisition Error", 

340 "Acquisition timed out waiting for output to start. " 

341 "Check output task for errors!", 

342 ), 

343 ) 

344 ) 

345 break 

346 sleep(0.1) 

347 continue 

348 if environment is None: 

349 self.log("Detected Output Started") 

350 break 

351 else: 

352 self.log(f"Listening for first data for environment {environment}") 

353 self.environment_first_data[environment] = data 

354 self.any_environments_started = True 

355 self.log("Starting Hardware Acquisition") 

356 self.hardware.start() 

357 self.startup = False 

358 self.acquisition_active = True 

359 # print('started acquisition') 

360 self.get_first_output_data() 

361 if ( 

362 self.shutdown_flag # We're shutting down 

363 and all( 

364 [not flag for environment, flag in self.environment_active_flags.items()] 

365 ) # All the environments are inactive 

366 and all( 

367 [flag is None for environment, flag in self.environment_first_data.items()] 

368 ) # All the environments are not starting 

369 and all( 

370 [not flag for environment, flag in self.environment_last_data.items()] 

371 ) # None of the environments are expecting their last data 

372 ): 

373 self.log("Acquiring Remaining Data") 

374 read_data = self.hardware.read_remaining() 

375 self.add_data_to_buffer(read_data) 

376 if read_data.shape[-1] != 0: 

377 max_vals = np.max(np.abs(read_data), axis=-1) 

378 self.gui_update_queue.put(("monitor", max_vals)) 

379 warn_channels = max_vals > self.warning_limits 

380 if np.any(warn_channels): 

381 warning_numbers = [i + 1 for i in range(len(warn_channels)) if warn_channels[i]] 

382 print(f"Channels {warning_numbers} Reached Warning Limit") 

383 self.log(f"Channels {warning_numbers} Reached Warning Limit") 

384 abort_channels = max_vals > self.abort_limits 

385 if np.any(abort_channels): 

386 abort_numbers = [i + 1 for i in range(len(abort_channels)) if abort_channels[i]] 

387 print(f"Channels {abort_numbers} Reached Abort Limit") 

388 self.log(f"Channels {abort_numbers} Reached Abort Limit") 

389 # Don't stop because we're already shutting down. 

390 self.hardware.stop() 

391 self.shutdown_flag = False 

392 self.startup = True 

393 # print('{:} {:}'.format(self.streaming,self.any_environments_started)) 

394 if self.streaming and self.any_environments_started: 

395 self.queue_container.streaming_command_queue.put( 

396 self.process_name, (GlobalCommands.STREAMING_DATA, read_data.copy()) 

397 ) 

398 self.streaming = False 

399 if self.has_streamed and self.any_environments_started: 

400 self.queue_container.streaming_command_queue.put( 

401 self.process_name, (GlobalCommands.FINALIZE_STREAMING, None) 

402 ) 

403 self.has_streamed = False 

404 self.acquisition_active = False 

405 self.log("Acquisition Shut Down") 

406 else: 

407 aquiring_environments = [ 

408 name for name, flag in self.environment_active_flags.items() if flag 

409 ] 

410 self.log(f"Acquiring Data for {aquiring_environments} environments") 

411 read_data = self.hardware.read() 

412 self.add_data_to_buffer(read_data) 

413 if read_data.shape[-1] != 0: 

414 max_vals = np.max(np.abs(read_data), axis=-1) 

415 self.gui_update_queue.put(("monitor", max_vals)) 

416 warn_channels = max_vals > self.warning_limits 

417 if np.any(warn_channels): 

418 warning_numbers = [i + 1 for i in range(len(warn_channels)) if warn_channels[i]] 

419 print(f"Channels {warning_numbers} Reached Warning Limit") 

420 self.log(f"Channels {warning_numbers} Reached Warning Limit") 

421 abort_channels = max_vals > self.abort_limits 

422 if np.any(abort_channels): 

423 abort_numbers = [i + 1 for i in range(len(abort_channels)) if abort_channels[i]] 

424 print(f"Channels {abort_numbers} Reached Abort Limit") 

425 self.log(f"Channels {abort_numbers} Reached Abort Limit") 

426 self.gui_update_queue.put(("stop", None)) 

427 

428 # Send the data to the different channels 

429 for environment in self.environment_list: 

430 # Check to see if we're waiting for the first data for this environment 

431 if self.environment_first_data[environment] is not None: 

432 if np.all(np.abs(self.environment_first_data[environment]) < 1e-10): 

433 delay = -self.read_size 

434 else: 

435 correlation_start_time = time() 

436 if DEBUG: 

437 num_files = len(glob(FILE_OUTPUT.format("*"))) 

438 np.savez( 

439 FILE_OUTPUT.format(num_files), 

440 read_data_buffer=self.read_data, 

441 read_data=read_data, 

442 output_indices=self.output_indices, 

443 first_data=self.environment_first_data[environment], 

444 ) 

445 _, delay, _, _ = align_signals( 

446 self.read_data[self.output_indices], 

447 self.environment_first_data[environment], 

448 perform_subsample=False, 

449 correlation_threshold=0.5, 

450 correlation_metric=correlation_norm_signal_spec_ratio, 

451 ) 

452 correlation_end_time = time() 

453 corr_time = correlation_end_time - correlation_start_time 

454 self.log( 

455 f"Correlation check for environment {environment} took " 

456 f"{corr_time:0.2f} seconds" 

457 ) 

458 # Adding a criteria that the delay must be in the first half 

459 # of the buffer, otherwise we could still be increasing 

460 # in correlation as more data is acquired. If it's in 

461 # the first half, it means that we have acquired more 

462 # data and the best match did not improve 

463 if delay is None or delay > self.read_data.shape[-1] // 2: 

464 continue 

465 self.log(f"Found First Data for Environment {environment}") 

466 environment_data = self.read_data[ 

467 self.environment_acquisition_channels[environment], delay: 

468 ] 

469 if DEBUG: 

470 np.savez( 

471 f"debug_data/environment_first_data_{environment}.npz", 

472 found_data=environment_data, 

473 expected_data=self.environment_first_data[environment], 

474 ) 

475 self.environment_first_data[environment] = None 

476 if not self.environment_last_data[environment]: 

477 self.environment_active_flags[environment] = True 

478 else: 

479 self.log( 

480 f"Already received environment {environment} " 

481 "shutdown signal, not starting" 

482 ) 

483 # Check to see if the environment is active 

484 elif ( 

485 self.environment_active_flags[environment] 

486 or self.environment_last_data[environment] 

487 ): 

488 environment_data = read_data[ 

489 self.environment_acquisition_channels[environment] 

490 ].copy() 

491 # Otherwise the environment isn't active 

492 else: 

493 continue 

494 if self.environment_last_data[environment]: 

495 self.environment_samples_remaining_to_read[environment] -= self.read_size 

496 self.log( 

497 f"Reading last data for {environment}, " 

498 f"{self.environment_samples_remaining_to_read[environment]} samples " 

499 f"remaining" 

500 ) 

501 environment_finished = ( 

502 self.environment_last_data[environment] 

503 and self.environment_samples_remaining_to_read[environment] <= 0 

504 ) 

505 self.log(f"Sending {environment_data.shape} data to {environment} environment") 

506 self.queue_container.environment_data_in_queues[environment].put( 

507 (environment_data, environment_finished) 

508 ) 

509 if environment_finished: 

510 self.environment_last_data[environment] = False 

511 self.log(f"Delivered last data to {environment}") 

512 # np.savez('test_data/acquisition_data_check.npz', 

513 # read_data = self.read_data, 

514 # environment_data = environment_data, 

515 # environment_channels = self.environment_acquisition_channels[environment]) 

516 self.queue_container.acquisition_command_queue.put( 

517 self.process_name, (GlobalCommands.RUN_HARDWARE, None) 

518 ) 

519 # print('{:} {:}'.format(self.streaming,self.any_environments_started)) 

520 if self.streaming and self.any_environments_started: 

521 self.queue_container.streaming_command_queue.put( 

522 self.process_name, (GlobalCommands.STREAMING_DATA, read_data.copy()) 

523 ) 

524 

525 def add_data_to_buffer(self, data): 

526 """Adds data to the end of the buffer and shifts existing in the buffer forward 

527 

528 Parameters 

529 ---------- 

530 data : np.ndarray 

531 A 2D array with shape num_channels x num_samples 

532 """ 

533 # Roll the buffer with new data 

534 read_size = data.shape[-1] 

535 if read_size != 0: 

536 self.read_data[..., :-read_size] = self.read_data[..., read_size:] 

537 self.read_data[..., -read_size:] = data 

538 

539 def get_first_output_data(self): 

540 """Searches through the sync queue for first data packages from the output process""" 

541 first_output_data = flush_queue(self.queue_container.input_output_sync_queue) 

542 for environment, data in first_output_data: 

543 self.log(f"Listening for first data for environment {environment}") 

544 self.environment_first_data[environment] = data 

545 self.any_environments_started = True 

546 

547 def stop_acquisition(self, data): # pylint: disable=unused-argument 

548 """Sets a flag telling the acquisition that it should start shutting down 

549 

550 Parameters 

551 ---------- 

552 data : Ignored 

553 This parameter is not used by the function but must be present 

554 due to the calling signature of functions called through the 

555 ``command_map`` 

556 

557 """ 

558 self.shutdown_flag = True 

559 

560 def quit(self, data): 

561 """Stops the process and shuts down the hardware if necessary. 

562 

563 Parameters 

564 ---------- 

565 data : Ignored 

566 This parameter is not used by the function but must be present 

567 due to the calling signature of functions called through the 

568 ``command_map`` 

569 """ 

570 # Pull any data off the queues that have been put to 

571 queue_flush_sum = 0 

572 for queue in [q for name, q in self.queue_container.environment_data_in_queues.items()] + [ 

573 self.queue_container.acquisition_command_queue 

574 ]: 

575 queue_flush_sum += len(flush_queue(queue)) 

576 self.log(f"Flushed {queue_flush_sum} items out of queues") 

577 if self.hardware is not None: 

578 self.hardware.close() 

579 return True 

580 

581 

582def acquisition_process( 

583 queue_container: QueueContainer, 

584 environments: list, 

585 acquisition_active: mp.sharedctypes.Synchronized, 

586): 

587 """Function passed to multiprocessing as the acquisition process 

588 

589 This process creates the ``AcquisitionProcess`` object and calls the ``run`` 

590 command. 

591 

592 Parameters 

593 ---------- 

594 queue_container : QueueContainer 

595 A container containing the queues used to communicate between 

596 controller processes 

597 environments : list 

598 A list of ``(ControlType,environment_name)`` pairs that define the 

599 environments in the controller. 

600 

601 """ 

602 

603 acquisition_instance = AcquisitionProcess( 

604 "Acquisition", queue_container, environments, acquisition_active 

605 ) 

606 

607 acquisition_instance.run()