Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / time_environment.py: 46%

334 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3This file defines a Time Signal Generator environment where a signal can be 

4loaded and played directly to the output devices. This is perhaps the simplest 

5control type that might be implemented by the controller, so start here when 

6designing new control types. 

7 

8Rattlesnake Vibration Control Software 

9Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

10(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

11Government retains certain rights in this software. 

12 

13This program is free software: you can redistribute it and/or modify 

14it under the terms of the GNU General Public License as published by 

15the Free Software Foundation, either version 3 of the License, or 

16(at your option) any later version. 

17 

18This program is distributed in the hope that it will be useful, 

19but WITHOUT ANY WARRANTY; without even the implied warranty of 

20MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

21GNU General Public License for more details. 

22 

23You should have received a copy of the GNU General Public License 

24along with this program. If not, see <https://www.gnu.org/licenses/>. 

25""" 

26 

27import copy 

28import multiprocessing as mp 

29import multiprocessing.sharedctypes # pylint: disable=unused-import 

30from multiprocessing.queues import Queue 

31 

32import netCDF4 as nc4 

33import numpy as np 

34import openpyxl 

35from qtpy import QtCore, QtWidgets, uic 

36 

37from .abstract_environment import AbstractEnvironment, AbstractMetadata, AbstractUI 

38from .environments import ( 

39 ControlTypes, 

40 environment_definition_ui_paths, 

41 environment_run_ui_paths, 

42) 

43from .ui_utilities import load_time_history, multiline_plotter 

44from .utilities import ( 

45 DataAcquisitionParameters, 

46 GlobalCommands, 

47 VerboseMessageQueue, 

48 db2scale, 

49 rms_time, 

50) 

51 

52CONTROL_TYPE = ControlTypes.TIME 

53TEST_LEVEL_THRESHOLD = 1.01 

54MAX_RESPONSES_TO_PLOT = 20 

55MAX_SAMPLES_TO_PLOT = 100000 

56 

57 

58class TimeQueues: 

59 """A set of queues used by the Time environment""" 

60 

61 def __init__( 

62 self, 

63 environment_command_queue: VerboseMessageQueue, 

64 gui_update_queue: mp.queues.Queue, 

65 controller_communication_queue: VerboseMessageQueue, 

66 data_in_queue: mp.queues.Queue, 

67 data_out_queue: mp.queues.Queue, 

68 log_file_queue: VerboseMessageQueue, 

69 ): 

70 """ 

71 Creates a namespace to store all the queues used by the Time Environment 

72 

73 Parameters 

74 ---------- 

75 environment_command_queue : VerboseMessageQueue 

76 Queue from which the environment will receive instructions. 

77 gui_update_queue : mp.queues.Queue 

78 Queue to which the environment will put GUI updates. 

79 controller_communication_queue : VerboseMessageQueue 

80 Queue to which the environment will put global contorller instructions. 

81 data_in_queue : mp.queues.Queue 

82 Queue from which the environment will receive data from acquisition. 

83 data_out_queue : mp.queues.Queue 

84 Queue to which the environment will write data for output. 

85 log_file_queue : VerboseMessageQueue 

86 Queue to which the environment will write log file messages. 

87 """ 

88 self.environment_command_queue = environment_command_queue 

89 self.gui_update_queue = gui_update_queue 

90 self.controller_communication_queue = controller_communication_queue 

91 self.data_in_queue = data_in_queue 

92 self.data_out_queue = data_out_queue 

93 self.log_file_queue = log_file_queue 

94 

95 

96class TimeParameters(AbstractMetadata): 

97 """Storage container for parameters used by the Time Environment""" 

98 

99 def __init__(self, sample_rate, output_signal, cancel_rampdown_time): 

100 """ 

101 Container to hold signal processing parameters for the Time environment 

102 

103 Parameters 

104 ---------- 

105 sample_rate : int 

106 Number of samples per second that the controller runs. 

107 output_signal : np.ndarray 

108 Signal that will be generated by the sources, 2D array (n_outputs x 

109 n_samples) 

110 cancel_rampdown_time : float 

111 Time used to decay the signal to zero if the environment is stopped. 

112 Prevents "hard stops" from damaging equipment. 

113 

114 """ 

115 self.sample_rate = sample_rate 

116 self.output_signal = output_signal 

117 self.cancel_rampdown_time = cancel_rampdown_time 

118 

119 @property 

120 def signal_samples(self): 

121 """The number of samples in the signal""" 

122 return self.output_signal.shape[-1] 

123 

124 @property 

125 def output_channels(self): 

126 """The number of output channels in the signal""" 

127 return self.output_signal.shape[0] 

128 

129 @property 

130 def signal_time(self): 

131 """The length of the signal in seconds""" 

132 return self.signal_samples / self.sample_rate 

133 

134 @property 

135 def cancel_rampdown_samples(self): 

136 """The number of samples required to ramp down the signal when cancelled""" 

137 return int(self.cancel_rampdown_time * self.sample_rate) 

138 

139 def store_to_netcdf( 

140 self, netcdf_group_handle: nc4._netCDF4.Group # pylint: disable=c-extension-no-member 

141 ): 

142 """ 

143 Stores parameters to a netCDF group so they can be recovered. 

144 

145 Parameters 

146 ---------- 

147 netcdf_group_handle : nc4._netCDF4.Group 

148 Reference to the netCDF4 group in which the environment data should 

149 be stored. 

150 

151 """ 

152 netcdf_group_handle.cancel_rampdown_time = self.cancel_rampdown_time 

153 # Save the output signal 

154 netcdf_group_handle.createDimension("output_channels", self.output_channels) 

155 netcdf_group_handle.createDimension("signal_samples", self.signal_samples) 

156 var = netcdf_group_handle.createVariable( 

157 "output_signal", "f8", ("output_channels", "signal_samples") 

158 ) 

159 var[...] = self.output_signal 

160 

161 @classmethod 

162 def from_ui(cls, ui): 

163 """Creates a TimeParameters object from the user interface 

164 

165 Parameters 

166 ---------- 

167 ui : TimeUI 

168 A Time User Interface 

169 

170 Returns 

171 ------- 

172 test_parameters : TimeParameters 

173 Parameters corresponding to the data in the user interface. 

174 

175 """ 

176 return cls( 

177 sample_rate=ui.definition_widget.output_sample_rate_display.value(), 

178 output_signal=ui.signal, 

179 cancel_rampdown_time=ui.definition_widget.cancel_rampdown_selector.value(), 

180 ) 

181 

182 

183class TimeUI(AbstractUI): 

184 """Class defining the user interface for a Random Vibration environment. 

185 

186 This class will contain two main UIs, the environment definition and run. 

187 The widgets corresponding to these interfaces are stored in TabWidgets in 

188 the main UI. 

189 

190 This class defines all the call backs and user interface operations required 

191 for the Time environment.""" 

192 

193 def __init__( 

194 self, 

195 environment_name: str, 

196 definition_tabwidget: QtWidgets.QTabWidget, 

197 system_id_tabwidget: QtWidgets.QTabWidget, # pylint: disable=unused-argument 

198 test_predictions_tabwidget: QtWidgets.QTabWidget, # pylint: disable=unused-argument 

199 run_tabwidget: QtWidgets.QTabWidget, 

200 environment_command_queue: VerboseMessageQueue, 

201 controller_communication_queue: VerboseMessageQueue, 

202 log_file_queue: Queue, 

203 ): 

204 """ 

205 Constructs a Time User Interfae 

206 

207 Given the tab widgets from the main interface as well as communication 

208 queues, this class assembles the user interface components specific to 

209 the Time Environment 

210 

211 Parameters 

212 ---------- 

213 definition_tabwidget : QtWidgets.QTabWidget 

214 QTabWidget containing the environment subtabs on the Control 

215 Definition main tab 

216 system_id_tabwidget : QtWidgets.QTabWidget 

217 QTabWidget containing the environment subtabs on the System 

218 Identification main tab. The Time Environment has no system 

219 identification step, so this is not used. 

220 test_predictions_tabwidget : QtWidgets.QTabWidget 

221 QTabWidget containing the environment subtabs on the Test Predictions 

222 main tab. The Time Environment has no system identification 

223 step, so this is not used. 

224 run_tabwidget : QtWidgets.QTabWidget 

225 QTabWidget containing the environment subtabs on the Run 

226 main tab. 

227 environment_command_queue : VerboseMessageQueue 

228 Queue for sending commands to the Random Vibration Environment 

229 controller_communication_queue : VerboseMessageQueue 

230 Queue for sending global commands to the controller 

231 log_file_queue : Queue 

232 Queue where log file messages can be written. 

233 

234 """ 

235 super().__init__( 

236 environment_name, 

237 environment_command_queue, 

238 controller_communication_queue, 

239 log_file_queue, 

240 ) 

241 # Add the page to the control definition tabwidget 

242 self.definition_widget = QtWidgets.QWidget() 

243 uic.loadUi(environment_definition_ui_paths[CONTROL_TYPE], self.definition_widget) 

244 definition_tabwidget.addTab(self.definition_widget, self.environment_name) 

245 # Add the page to the run tabwidget 

246 self.run_widget = QtWidgets.QWidget() 

247 uic.loadUi(environment_run_ui_paths[CONTROL_TYPE], self.run_widget) 

248 run_tabwidget.addTab(self.run_widget, self.environment_name) 

249 

250 # Set up some persistent data 

251 self.data_acquisition_parameters = None 

252 self.environment_parameters = None 

253 self.signal = None 

254 self.physical_output_names = None 

255 self.physical_measurement_names = None 

256 self.show_signal_checkboxes = None 

257 self.plot_data_items = {} 

258 

259 self.complete_ui() 

260 self.connect_callbacks() 

261 

262 # Complete the profile commands 

263 self.command_map["Set Test Level"] = self.change_test_level_from_profile 

264 self.command_map["Set Repeat"] = self.set_repeat_from_profile 

265 self.command_map["Set No Repeat"] = self.set_norepeat_from_profile 

266 

267 def collect_environment_definition_parameters(self) -> TimeParameters: 

268 """Collect the parameters from the user interface defining the environment 

269 

270 Returns 

271 ------- 

272 TimeParameters 

273 A metadata or parameters object containing the parameters defining 

274 the corresponding environment. 

275 """ 

276 return TimeParameters.from_ui(self) 

277 

278 def complete_ui(self): 

279 """Helper Function to continue setting up the user interface""" 

280 # Set common look and feel for plots 

281 plot_widgets = [ 

282 self.definition_widget.signal_display_plot, 

283 self.run_widget.output_signal_plot, 

284 self.run_widget.response_signal_plot, 

285 ] 

286 for plot_widget in plot_widgets: 

287 plot_item = plot_widget.getPlotItem() 

288 plot_item.showGrid(True, True, 0.25) 

289 plot_item.enableAutoRange() 

290 plot_item.getViewBox().enableAutoRange(enable=True) 

291 

292 def connect_callbacks(self): 

293 """Helper function to connect callbacks to functions in the class""" 

294 self.definition_widget.load_signal_button.clicked.connect(self.load_signal) 

295 self.run_widget.start_test_button.clicked.connect(self.start_control) 

296 self.run_widget.stop_test_button.clicked.connect(self.stop_control) 

297 

298 def initialize_data_acquisition(self, data_acquisition_parameters: DataAcquisitionParameters): 

299 """Update the user interface with data acquisition parameters 

300 

301 This function is called when the Data Acquisition parameters are 

302 initialized. This function should set up the environment user interface 

303 accordingly. 

304 

305 Parameters 

306 ---------- 

307 data_acquisition_parameters : DataAcquisitionParameters : 

308 Container containing the data acquisition parameters, including 

309 channel table and sampling information. 

310 

311 """ 

312 self.log("Initializing Data Acquisition") 

313 self.signal = None 

314 # Get channel information 

315 channels = data_acquisition_parameters.channel_list 

316 num_measurements = len([channel for channel in channels if channel.feedback_device is None]) 

317 num_output = len([channel for channel in channels if channel.feedback_device is not None]) 

318 self.physical_output_names = [ 

319 f"{'' if channel.channel_type is None else channel.channel_type} " 

320 f"{channel.node_number}{channel.node_direction}" 

321 for channel in channels 

322 if channel.feedback_device 

323 ] 

324 self.physical_measurement_names = [ 

325 f"{'' if channel.channel_type is None else channel.channel_type} " 

326 "{channel.node_number}{channel.node_direction}" 

327 for channel in channels 

328 if channel.feedback_device is None 

329 ] 

330 # Add rows to the signal table 

331 self.definition_widget.signal_information_table.setRowCount(num_output) 

332 self.show_signal_checkboxes = [] 

333 for i, name in enumerate(self.physical_output_names): 

334 item = QtWidgets.QTableWidgetItem() 

335 item.setText(name) 

336 item.setFlags(item.flags() ^ QtCore.Qt.ItemIsEditable) 

337 self.definition_widget.signal_information_table.setItem(i, 1, item) 

338 checkbox = QtWidgets.QCheckBox() 

339 checkbox.setChecked(True) 

340 checkbox.stateChanged.connect(self.show_signal) 

341 self.show_signal_checkboxes.append(checkbox) 

342 self.definition_widget.signal_information_table.setCellWidget(i, 0, checkbox) 

343 item = QtWidgets.QTableWidgetItem() 

344 item.setText("0.0") 

345 item.setFlags(item.flags() ^ QtCore.Qt.ItemIsEditable) 

346 self.definition_widget.signal_information_table.setItem(i, 2, item) 

347 item = QtWidgets.QTableWidgetItem() 

348 item.setText("0.0") 

349 item.setFlags(item.flags() ^ QtCore.Qt.ItemIsEditable) 

350 self.definition_widget.signal_information_table.setItem(i, 3, item) 

351 # Fill in the info at the bottom 

352 self.definition_widget.sample_rate_display.setValue(data_acquisition_parameters.sample_rate) 

353 self.definition_widget.output_sample_rate_display.setValue( 

354 data_acquisition_parameters.sample_rate * data_acquisition_parameters.output_oversample 

355 ) 

356 self.definition_widget.output_channels_display.setValue(num_output) 

357 

358 # Clear the signal plot 

359 self.definition_widget.signal_display_plot.getPlotItem().clear() 

360 self.run_widget.output_signal_plot.getPlotItem().clear() 

361 self.run_widget.response_signal_plot.getPlotItem().clear() 

362 

363 # Set initial lines 

364 self.plot_data_items["output_signal_definition"] = multiline_plotter( 

365 np.arange(2), 

366 np.zeros((num_output, 2)), 

367 widget=self.definition_widget.signal_display_plot, 

368 other_pen_options={"width": 1}, 

369 names=self.physical_output_names, 

370 ) 

371 self.plot_data_items["output_signal_measurement"] = multiline_plotter( 

372 np.arange(2), 

373 np.zeros((num_output, 2)), 

374 widget=self.run_widget.output_signal_plot, 

375 other_pen_options={"width": 1}, 

376 names=self.physical_output_names, 

377 ) 

378 self.plot_data_items["response_signal_measurement"] = multiline_plotter( 

379 np.arange(2), 

380 np.zeros( 

381 ( 

382 ( 

383 num_measurements 

384 if num_measurements < MAX_RESPONSES_TO_PLOT 

385 else MAX_RESPONSES_TO_PLOT 

386 ), 

387 2, 

388 ) 

389 ), 

390 widget=self.run_widget.response_signal_plot, 

391 other_pen_options={"width": 1}, 

392 names=self.physical_measurement_names, 

393 ) 

394 

395 self.data_acquisition_parameters = data_acquisition_parameters 

396 

397 def load_signal(self, clicked, filename=None): # pylint: disable=unused-argument 

398 """Loads a time signal using a dialog or the specified filename 

399 

400 Parameters 

401 ---------- 

402 clicked : 

403 The clicked event that triggered the callback. 

404 filename : 

405 File name defining the specification for bypassing the callback when 

406 loading from a file (Default value = None). 

407 

408 """ 

409 if filename is None: 

410 filename, _ = QtWidgets.QFileDialog.getOpenFileName( 

411 self.definition_widget, 

412 "Select Signal File", 

413 filter="Numpy or Mat (*.npy *.npz *.mat)", 

414 ) 

415 if filename == "": 

416 return 

417 self.definition_widget.signal_file_name_display.setText(filename) 

418 self.signal = load_time_history( 

419 filename, self.definition_widget.output_sample_rate_display.value() 

420 ) 

421 self.definition_widget.signal_samples_display.setValue(self.signal.shape[-1]) 

422 self.definition_widget.signal_time_display.setValue( 

423 self.signal.shape[-1] / self.definition_widget.output_sample_rate_display.value() 

424 ) 

425 maxs = np.max(np.abs(self.signal), axis=-1) 

426 rmss = rms_time(self.signal, axis=-1) 

427 for i, (mx, rms) in enumerate(zip(maxs, rmss)): 

428 self.definition_widget.signal_information_table.item(i, 2).setText(f"{mx:0.2f}") 

429 self.definition_widget.signal_information_table.item(i, 3).setText(f"{rms:0.2f}") 

430 self.show_signal() 

431 

432 def show_signal(self): 

433 """Shows the signal on the user interface""" 

434 for curve, signal, check_box in zip( 

435 self.plot_data_items["output_signal_definition"], 

436 self.signal, 

437 self.show_signal_checkboxes, 

438 ): 

439 if check_box.isChecked(): 

440 x = ( 

441 np.arange(signal.shape[-1]) 

442 / self.definition_widget.output_sample_rate_display.value() 

443 ) 

444 curve.setData(x, signal) 

445 else: 

446 curve.setData((0, 0), (0, 0)) 

447 

448 def initialize_environment(self) -> AbstractMetadata: 

449 """Update the user interface with environment parameters 

450 

451 This function is called when the Environment parameters are initialized. 

452 This function should set up the user interface accordingly. It must 

453 return the parameters class of the environment that inherits from 

454 AbstractMetadata. 

455 

456 Returns 

457 ------- 

458 environment_parameters : TimeParameters 

459 A TimeParameters object that contains the parameters 

460 defining the environment. 

461 """ 

462 self.log("Initializing Environment Parameters") 

463 data = self.collect_environment_definition_parameters() 

464 # Make sure everything is defined 

465 if data.output_signal is None: 

466 raise ValueError("Output Signal is not defined!") 

467 # Initialize the correct sizes of the arrays 

468 for plot_items in [ 

469 self.plot_data_items["output_signal_measurement"], 

470 self.plot_data_items["response_signal_measurement"], 

471 ]: 

472 for curve in plot_items: 

473 curve.setData( 

474 np.arange( 

475 ( 

476 data.output_signal.shape[-1] 

477 // self.data_acquisition_parameters.output_oversample 

478 * 2 

479 if data.output_signal.shape[-1] 

480 // self.data_acquisition_parameters.output_oversample 

481 * 2 

482 < MAX_SAMPLES_TO_PLOT 

483 else MAX_SAMPLES_TO_PLOT 

484 ) 

485 ) 

486 / self.data_acquisition_parameters.sample_rate, 

487 np.zeros( 

488 ( 

489 data.output_signal.shape[-1] 

490 // self.data_acquisition_parameters.output_oversample 

491 * 2 

492 if data.output_signal.shape[-1] 

493 // self.data_acquisition_parameters.output_oversample 

494 * 2 

495 < MAX_SAMPLES_TO_PLOT 

496 else MAX_SAMPLES_TO_PLOT 

497 ) 

498 ), 

499 ) 

500 self.environment_parameters = data 

501 return data 

502 

503 def retrieve_metadata( 

504 self, netcdf_handle: nc4._netCDF4.Dataset # pylint: disable=c-extension-no-member 

505 ): 

506 """Collects environment parameters from a netCDF dataset. 

507 

508 This function retrieves parameters from a netCDF dataset that was written 

509 by the controller during streaming. It must populate the widgets 

510 in the user interface with the proper information. 

511 

512 This function is the "read" counterpart to the store_to_netcdf 

513 function in the TimeParameters class, which will write 

514 parameters to the netCDF file to document the metadata. 

515 

516 Note that the entire dataset is passed to this function, so the function 

517 should collect parameters pertaining to the environment from a Group 

518 in the dataset sharing the environment's name, e.g. 

519 

520 ``group = netcdf_handle.groups[self.environment_name]`` 

521 ``self.definition_widget.parameter_selector.setValue(group.parameter)`` 

522 

523 Parameters 

524 ---------- 

525 netcdf_handle : nc4._netCDF4.Dataset : 

526 The netCDF dataset from which the data will be read. It should have 

527 a group name with the enviroment's name. 

528 """ 

529 group = netcdf_handle.groups[self.environment_name] 

530 self.signal = group.variables["output_signal"][...].data 

531 self.definition_widget.cancel_rampdown_selector.setValue(group.cancel_rampdown_time) 

532 maxs = np.max(np.abs(self.signal), axis=-1) 

533 rmss = rms_time(self.signal, axis=-1) 

534 for i, (mx, rms) in enumerate(zip(maxs, rmss)): 

535 self.definition_widget.signal_information_table.item(i, 2).setText(f"{mx:0.2f}") 

536 self.definition_widget.signal_information_table.item(i, 3).setText(f"{rms:0.2f}") 

537 self.show_signal() 

538 

539 def start_control(self): 

540 """Starts running the environment""" 

541 self.run_widget.stop_test_button.setEnabled(True) 

542 self.run_widget.start_test_button.setEnabled(False) 

543 self.run_widget.test_level_selector.setEnabled(False) 

544 self.run_widget.repeat_signal_checkbox.setEnabled(False) 

545 self.controller_communication_queue.put( 

546 self.log_name, (GlobalCommands.START_ENVIRONMENT, self.environment_name) 

547 ) 

548 self.environment_command_queue.put( 

549 self.log_name, 

550 ( 

551 GlobalCommands.START_ENVIRONMENT, 

552 ( 

553 db2scale(self.run_widget.test_level_selector.value()), 

554 self.run_widget.repeat_signal_checkbox.isChecked(), 

555 ), 

556 ), 

557 ) 

558 self.controller_communication_queue.put( 

559 self.log_name, (GlobalCommands.AT_TARGET_LEVEL, self.environment_name) 

560 ) 

561 

562 def stop_control(self): 

563 """Stops running the environment""" 

564 self.environment_command_queue.put(self.log_name, (GlobalCommands.STOP_ENVIRONMENT, None)) 

565 

566 def change_test_level_from_profile(self, test_level): 

567 """Sets the test level from a profile instruction 

568 

569 Parameters 

570 ---------- 

571 test_level : 

572 Value to set the test level to. 

573 """ 

574 self.run_widget.test_level_selector.setValue(int(test_level)) 

575 

576 def set_repeat_from_profile(self, data): # pylint: disable=unused-argument 

577 """Sets the the signal to repeat from a profile instruction 

578 

579 Parameters 

580 ---------- 

581 data : Ignored 

582 Parameter is ignored but required by the ``command_map`` 

583 

584 """ 

585 self.run_widget.repeat_signal_checkbox.setChecked(True) 

586 

587 def set_norepeat_from_profile(self, data): # pylint: disable=unused-argument 

588 """Sets the the signal to not repeat from a profile instruction 

589 

590 Parameters 

591 ---------- 

592 data : Ignored 

593 Parameter is ignored but required by the ``command_map`` 

594 

595 """ 

596 self.run_widget.repeat_signal_checkbox.setChecked(False) 

597 

598 def update_gui(self, queue_data): 

599 """Update the graphical interface for the environment 

600 

601 Parameters 

602 ---------- 

603 queue_data : 

604 A 2-tuple consisting of ``(message,data)`` pairs where the message 

605 denotes what to change and the data contains the information needed 

606 to be displayed. 

607 """ 

608 message, data = queue_data 

609 if message == "time_data": 

610 response_data, output_data = data 

611 for curve, this_data in zip( 

612 self.plot_data_items["response_signal_measurement"], response_data 

613 ): 

614 x, y = curve.getData() 

615 y = np.concatenate((y[this_data.size :], this_data[-x.size :]), axis=0) 

616 curve.setData(x, y) 

617 # Display the data 

618 for curve, this_output in zip( 

619 self.plot_data_items["output_signal_measurement"], output_data 

620 ): 

621 x, y = curve.getData() 

622 y = np.concatenate((y[this_output.size :], this_output[-x.size :]), axis=0) 

623 curve.setData(x, y) 

624 elif message == "enable": 

625 widget = None 

626 for parent in [self.definition_widget, self.run_widget]: 

627 try: 

628 widget = getattr(parent, data) 

629 break 

630 except AttributeError: 

631 continue 

632 if widget is None: 

633 raise ValueError(f"Cannot Enable Widget {data}: not found in UI") 

634 widget.setEnabled(True) 

635 elif message == "disable": 

636 widget = None 

637 for parent in [self.definition_widget, self.run_widget]: 

638 try: 

639 widget = getattr(parent, data) 

640 break 

641 except AttributeError: 

642 continue 

643 if widget is None: 

644 raise ValueError(f"Cannot Disable Widget {data}: not found in UI") 

645 widget.setEnabled(False) 

646 else: 

647 widget = None 

648 for parent in [self.definition_widget, self.run_widget]: 

649 try: 

650 widget = getattr(parent, message) 

651 break 

652 except AttributeError: 

653 continue 

654 if widget is None: 

655 raise ValueError(f"Cannot Update Widget {message}: not found in UI") 

656 if isinstance(widget, QtWidgets.QDoubleSpinBox): 

657 widget.setValue(data) 

658 elif isinstance(widget, QtWidgets.QSpinBox): 

659 widget.setValue(data) 

660 elif isinstance(widget, QtWidgets.QLineEdit): 

661 widget.setText(data) 

662 elif isinstance(widget, QtWidgets.QListWidget): 

663 widget.clear() 

664 widget.addItems([f"{d:.3f}" for d in data]) 

665 

666 @staticmethod 

667 def create_environment_template( 

668 environment_name: str, workbook: openpyxl.workbook.workbook.Workbook 

669 ): 

670 """Creates a template worksheet in an Excel workbook defining the 

671 environment. 

672 

673 This function creates a template worksheet in an Excel workbook that 

674 when filled out could be read by the controller to re-create the 

675 environment. 

676 

677 This function is the "write" counterpart to the 

678 ``set_parameters_from_template`` function in the ``TimeUI`` class, 

679 which reads the values from the template file to populate the user 

680 interface. 

681 

682 Parameters 

683 ---------- 

684 environment_name : str : 

685 The name of the environment that will specify the worksheet's name 

686 workbook : openpyxl.workbook.workbook.Workbook : 

687 A reference to an ``openpyxl`` workbook. 

688 

689 """ 

690 worksheet = workbook.create_sheet(environment_name) 

691 worksheet.cell(1, 1, "Control Type") 

692 worksheet.cell(1, 2, "Time") 

693 worksheet.cell( 

694 1, 

695 4, 

696 "Note: Replace cells with hash marks (#) to provide the requested parameters.", 

697 ) 

698 worksheet.cell(2, 1, "Signal File") 

699 worksheet.cell(2, 2, "# Path to the file that contains the time signal that will be output") 

700 worksheet.cell(3, 1, "Cancel Rampdown Time") 

701 worksheet.cell( 

702 3, 

703 2, 

704 "# Time for the environment to ramp to zero if the environment is cancelled.", 

705 ) 

706 

707 def set_parameters_from_template(self, worksheet: openpyxl.worksheet.worksheet.Worksheet): 

708 """ 

709 Collects parameters for the user interface from the Excel template file 

710 

711 This function reads a filled out template worksheet to create an 

712 environment. Cells on this worksheet contain parameters needed to 

713 specify the environment, so this function should read those cells and 

714 update the UI widgets with those parameters. 

715 

716 This function is the "read" counterpart to the 

717 ``create_environment_template`` function in the ``TimeUI`` class, 

718 which writes a template file that can be filled out by a user. 

719 

720 

721 Parameters 

722 ---------- 

723 worksheet : openpyxl.worksheet.worksheet.Worksheet 

724 An openpyxl worksheet that contains the environment template. 

725 Cells on this worksheet should contain the parameters needed for the 

726 user interface. 

727 

728 """ 

729 self.load_signal(None, worksheet.cell(2, 2).value) 

730 self.definition_widget.cancel_rampdown_selector.setValue(float(worksheet.cell(3, 2).value)) 

731 

732 

733class TimeEnvironment(AbstractEnvironment): 

734 """Environment defined by a generated time history signal""" 

735 

736 def __init__( 

737 self, 

738 environment_name: str, 

739 queue_container: TimeQueues, 

740 acquisition_active: mp.sharedctypes.Synchronized, 

741 output_active: mp.sharedctypes.Synchronized, 

742 ): 

743 """ 

744 Time History Generation Environment Constructor 

745 

746 This function fills out the command map and initializes parameters to 

747 zero or null. 

748 

749 Parameters 

750 ---------- 

751 environment_name : str 

752 Name of the environment. 

753 queue_container : TimeQueues 

754 Container of queues used by the Time Environment. 

755 

756 """ 

757 super().__init__( 

758 environment_name, 

759 queue_container.environment_command_queue, 

760 queue_container.gui_update_queue, 

761 queue_container.controller_communication_queue, 

762 queue_container.log_file_queue, 

763 queue_container.data_in_queue, 

764 queue_container.data_out_queue, 

765 acquisition_active, 

766 output_active, 

767 ) 

768 self.queue_container = queue_container 

769 # Define command map 

770 self.command_map[GlobalCommands.START_ENVIRONMENT] = self.run_environment 

771 # Persistent data 

772 self.data_acquisition_parameters = None 

773 self.environment_parameters = None 

774 self.startup = True 

775 self.shutdown_flag = False 

776 self.current_test_level = 0.0 

777 self.target_test_level = 0.0 

778 self.test_level_target = 0.0 

779 self.test_level_change = 0.0 

780 self.repeat = False 

781 self.signal_remainder = None 

782 self.output_channels = None 

783 self.measurement_channels = None 

784 

785 def initialize_data_acquisition_parameters( 

786 self, data_acquisition_parameters: DataAcquisitionParameters 

787 ): 

788 """Initialize the data acquisition parameters in the environment. 

789 

790 The environment will receive the global data acquisition parameters from 

791 the controller, and must set itself up accordingly. 

792 

793 Parameters 

794 ---------- 

795 data_acquisition_parameters : DataAcquisitionParameters : 

796 A container containing data acquisition parameters, including 

797 channels active in the environment as well as sampling parameters. 

798 """ 

799 self.log("Initializing Data Acquisition Parameters") 

800 self.data_acquisition_parameters = data_acquisition_parameters 

801 self.measurement_channels = [ 

802 index 

803 for index, channel in enumerate(self.data_acquisition_parameters.channel_list) 

804 if channel.feedback_device is None 

805 ] 

806 self.output_channels = [ 

807 index 

808 for index, channel in enumerate(self.data_acquisition_parameters.channel_list) 

809 if channel.feedback_device is not None 

810 ] 

811 

812 def initialize_environment_test_parameters(self, environment_parameters: TimeParameters): 

813 """ 

814 Initialize the environment parameters specific to this environment 

815 

816 The environment will recieve parameters defining itself from the 

817 user interface and must set itself up accordingly. 

818 

819 Parameters 

820 ---------- 

821 environment_parameters : TimeParameters 

822 A container containing the parameters defining the environment 

823 

824 """ 

825 self.log("Initializing Environment Parameters") 

826 self.environment_parameters = environment_parameters 

827 

828 def run_environment(self, data): 

829 """Runs the time history environment. 

830 

831 This function handles start up, running, and shutting down the environment 

832 

833 During startup, the function will initialize a buffer that it will 

834 write from that consists of the output signal. 

835 

836 When running, it will collect data that comes in from the ``data_in_queue`` 

837 and tell the GUI to display it. This will also tell 

838 the environment whether or not the signal is the last signal and that 

839 it should shut down. The function determines if a 

840 new signal is required by checking if the data_out_queue is empty. 

841 If it is empty, we write the next portion of the buffer. 

842 If the buffer runs dry and the signal is not repeating, it will set the 

843 last_signal flag which tells the process to begin shutting down. If 

844 shutdown is occuring, the process continues to get data until the last 

845 signal is received. 

846 

847 Parameters 

848 ---------- 

849 data : Tuple 

850 A tuple containing the test level to run the environment at and 

851 a boolean specifying whether or not to repeat the signal. 

852 

853 """ 

854 if self.startup: 

855 if data is not None: 

856 self.current_test_level, self.repeat = data 

857 self.log(f"Test Level set to {self.current_test_level}") 

858 self.signal_remainder = self.environment_parameters.output_signal 

859 self.startup = False 

860 # See if any data has come in 

861 try: 

862 acquisition_data, last_acquisition = self.queue_container.data_in_queue.get_nowait() 

863 measurement_data = acquisition_data[self.measurement_channels] 

864 output_data = acquisition_data[self.output_channels] 

865 self.queue_container.gui_update_queue.put( 

866 (self.environment_name, ("time_data", (measurement_data, output_data))) 

867 ) 

868 except mp.queues.Empty: 

869 last_acquisition = False 

870 # See if we need to output data 

871 if self.queue_container.data_out_queue.empty(): 

872 last_signal = False 

873 # See if there is enough in the remainder 

874 if ( 

875 self.data_acquisition_parameters.samples_per_write > self.signal_remainder.shape[-1] 

876 and self.repeat 

877 ): 

878 self.signal_remainder = np.concatenate( 

879 (self.signal_remainder, self.environment_parameters.output_signal), 

880 axis=-1, 

881 ) 

882 elif ( 

883 self.data_acquisition_parameters.samples_per_write 

884 >= self.signal_remainder.shape[-1] 

885 and not self.repeat 

886 ) or self.current_test_level == 0.0: 

887 last_signal = True 

888 self.output( 

889 self.signal_remainder[:, : self.data_acquisition_parameters.samples_per_write], 

890 last_signal, 

891 ) 

892 self.signal_remainder = self.signal_remainder[ 

893 :, self.data_acquisition_parameters.samples_per_write : 

894 ] 

895 if last_signal: 

896 # Wait until we get the last signal from the acquisition 

897 while not last_acquisition: 

898 self.log("Waiting for Last Acquisition") 

899 acquisition_data, last_acquisition = self.queue_container.data_in_queue.get() 

900 measurement_data = acquisition_data[self.measurement_channels] 

901 output_data = acquisition_data[self.output_channels] 

902 self.queue_container.gui_update_queue.put( 

903 ( 

904 self.environment_name, 

905 ("time_data", (measurement_data, output_data)), 

906 ) 

907 ) 

908 self.shutdown() 

909 return 

910 self.queue_container.environment_command_queue.put( 

911 self.environment_name, (GlobalCommands.START_ENVIRONMENT, None) 

912 ) 

913 

914 def output(self, write_data, last_signal=False): 

915 """Puts data to the data_out_queue and handles test level changes 

916 

917 This function keeps track of the environment test level and scales the 

918 output signals accordingly prior to placing them into the data_out_queue. 

919 This function also handles the ramping between two test levels. 

920 

921 Parameters 

922 ---------- 

923 write_data : np.ndarray 

924 A numpy array containing the signals to be written. 

925 

926 last_signal : 

927 Specifies if the signal being written is the last signal that will 

928 be generated due to the signal generation shutting down. This is 

929 passed to the output task to tell it that there will be no more 

930 signals from this environment until it is restarted. (Default value 

931 = False) 

932 """ 

933 # Perform the output transformation if necessary 

934 # Compute the test_level scaling for this dataset 

935 if self.test_level_change == 0.0: 

936 test_level = self.current_test_level 

937 self.log(f"Test Level at {test_level}") 

938 else: 

939 test_level = ( 

940 self.current_test_level 

941 + (np.arange(write_data.shape[-1]) + 1) * self.test_level_change 

942 ) 

943 # Compute distance in steps from the target test_level 

944 # and find where it is near the target 

945 full_level_index = np.nonzero( 

946 abs(test_level - self.test_level_target) / abs(self.test_level_change) 

947 < TEST_LEVEL_THRESHOLD 

948 )[0] 

949 # Check if any are 

950 if len(full_level_index) > 0: 

951 # If so, set all test_levels after that one to the target test_level 

952 test_level[full_level_index[0] + 1 :] = self.test_level_target 

953 # And update that our current test_level is now the target test_level 

954 self.current_test_level = self.test_level_target 

955 self.test_level_change = 0.0 

956 else: 

957 # Otherwise, our current test_level is the last entry in the test_level scaling 

958 self.current_test_level = test_level[-1] 

959 self.log(f"Test level from {test_level[0]} to {test_level[-1]}") 

960 # Write the test level-scaled data to the task 

961 self.log("Sending data to data_out queue") 

962 self.queue_container.data_out_queue.put( 

963 (copy.deepcopy(write_data * test_level), last_signal) 

964 ) 

965 

966 def stop_environment(self, data): 

967 """Stops the environment by setting the test level to zero. 

968 

969 Parameters 

970 ---------- 

971 data : Ignored 

972 This parameter is not used by the function but is required for the 

973 ``command_map`` calling signature. 

974 

975 """ 

976 self.adjust_test_level(0.0) 

977 

978 def adjust_test_level(self, data): 

979 """Adjusts the test level of the signal 

980 

981 Parameters 

982 ---------- 

983 data : 

984 New target test level 

985 

986 """ 

987 self.test_level_target = data 

988 self.test_level_change = ( 

989 self.test_level_target - self.current_test_level 

990 ) / self.environment_parameters.cancel_rampdown_samples 

991 if self.test_level_change != 0.0: 

992 self.log( 

993 f"Changed test level to {self.test_level_target} from " 

994 f"{self.current_test_level}, {self.test_level_change} change per sample" 

995 ) 

996 

997 def shutdown(self): 

998 """Performs final cleanup operations when the system has shut down 

999 

1000 This function is called when the environment has been instructed 

1001 to shut down and the last acquisition data has been received. The signal generation 

1002 is the first process in the Random Vibration environment to stop when 

1003 shutdown is called, so it notifies the environment process to stop the 

1004 acquisition and analysis tasks because it is no longer generating signals 

1005 

1006 """ 

1007 self.log("Shutting Down Time History Generation") 

1008 self.queue_container.environment_command_queue.flush(self.environment_name) 

1009 # Enable the volume controls 

1010 self.queue_container.gui_update_queue.put( 

1011 (self.environment_name, ("enable", "test_level_selector")) 

1012 ) 

1013 self.queue_container.gui_update_queue.put( 

1014 (self.environment_name, ("enable", "repeat_signal_checkbox")) 

1015 ) 

1016 self.queue_container.gui_update_queue.put( 

1017 (self.environment_name, ("enable", "start_test_button")) 

1018 ) 

1019 self.queue_container.gui_update_queue.put( 

1020 (self.environment_name, ("disable", "stop_test_button")) 

1021 ) 

1022 self.startup = True 

1023 

1024 

1025def time_process( 

1026 environment_name: str, 

1027 input_queue: VerboseMessageQueue, 

1028 gui_update_queue: Queue, 

1029 controller_communication_queue: VerboseMessageQueue, 

1030 log_file_queue: Queue, 

1031 data_in_queue: Queue, 

1032 data_out_queue: Queue, 

1033 acquisition_active: mp.sharedctypes.Synchronized, 

1034 output_active: mp.sharedctypes.Synchronized, 

1035): 

1036 """Time signal generation environment process function called by multiprocessing 

1037 

1038 This function defines the environment process that 

1039 gets run by the multiprocessing module when it creates a new process. It 

1040 creates a TimeEnviornment object and runs it. 

1041 

1042 Parameters 

1043 ---------- 

1044 environment_name : str : 

1045 Name of the environment 

1046 input_queue : VerboseMessageQueue : 

1047 Queue containing instructions for the environment 

1048 gui_update_queue : Queue : 

1049 Queue where GUI updates are put 

1050 controller_communication_queue : Queue : 

1051 Queue for global communications with the controller 

1052 log_file_queue : Queue : 

1053 Queue for writing log file messages 

1054 data_in_queue : Queue : 

1055 Queue from which data will be read by the environment 

1056 data_out_queue : Queue : 

1057 Queue to which data will be written that will be output by the hardware. 

1058 

1059 """ 

1060 

1061 # Create vibration queues 

1062 queue_container = TimeQueues( 

1063 input_queue, 

1064 gui_update_queue, 

1065 controller_communication_queue, 

1066 data_in_queue, 

1067 data_out_queue, 

1068 log_file_queue, 

1069 ) 

1070 

1071 process_class = TimeEnvironment( 

1072 environment_name, queue_container, acquisition_active, output_active 

1073 ) 

1074 process_class.run()