Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / skeleton_sys_id_environment.py: 0%

114 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3This file defines a skeleton of an environment that utilizes system 

4identification. This file should be modified to construct a full environment. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24# flake8: noqa 

25# pylint: skip-file 

26import multiprocessing as mp 

27from enum import Enum 

28from multiprocessing.queues import Queue 

29 

30import netCDF4 as nc4 

31from qtpy import QtWidgets, uic 

32 

33from .abstract_sysid_environment import ( 

34 AbstractSysIdEnvironment, 

35 AbstractSysIdMetadata, 

36 AbstractSysIdUI, 

37) 

38from .environments import ( 

39 ControlTypes, 

40 environment_definition_ui_paths, 

41 environment_prediction_ui_paths, 

42 environment_run_ui_paths, 

43) 

44from .utilities import VerboseMessageQueue 

45 

46# Update this line to define the controller type, and add to the ControlTypes enumeration in 

47# components/environments.py 

48control_type = ControlTypes.Skeleton # noqa pylint: disable=no-member 

49 

50 

51# %% Queues 

52class SkeletonQueues: 

53 """A container class for the queues that this environment will manage.""" 

54 

55 def __init__( 

56 self, 

57 environment_name: str, 

58 environment_command_queue: VerboseMessageQueue, 

59 gui_update_queue: Queue, 

60 controller_communication_queue: VerboseMessageQueue, 

61 data_in_queue: Queue, 

62 data_out_queue: Queue, 

63 log_file_queue: VerboseMessageQueue, 

64 ): 

65 """A container class for the queues that random vibration will manage. 

66 

67 The environment uses many queues to pass data between the various pieces. 

68 This class organizes those queues into one common namespace. 

69 

70 

71 Parameters 

72 ---------- 

73 environment_name : str 

74 Name of the environment 

75 environment_command_queue : VerboseMessageQueue 

76 Queue that is read by the environment for environment commands 

77 gui_update_queue : mp.queues.Queue 

78 Queue where various subtasks put instructions for updating the 

79 widgets in the user interface 

80 controller_communication_queue : VerboseMessageQueue 

81 Queue that is read by the controller for global controller commands 

82 data_in_queue : mp.queues.Queue 

83 Multiprocessing queue that connects the acquisition subtask to the 

84 environment subtask. Each environment will retrieve acquired data 

85 from this queue. 

86 data_out_queue : mp.queues.Queue 

87 Multiprocessing queue that connects the output subtask to the 

88 environment subtask. Each environment will put data that it wants 

89 the controller to generate in this queue. 

90 log_file_queue : VerboseMessageQueue 

91 Queue for putting logging messages that will be read by the logging 

92 subtask and written to a file. 

93 """ 

94 self.environment_command_queue = environment_command_queue 

95 self.gui_update_queue = gui_update_queue 

96 self.data_analysis_command_queue = VerboseMessageQueue( 

97 log_file_queue, environment_name + " Data Analysis Command Queue" 

98 ) 

99 self.signal_generation_command_queue = VerboseMessageQueue( 

100 log_file_queue, environment_name + " Signal Generation Command Queue" 

101 ) 

102 self.spectral_command_queue = VerboseMessageQueue( 

103 log_file_queue, environment_name + " Spectral Computation Command Queue" 

104 ) 

105 self.collector_command_queue = VerboseMessageQueue( 

106 log_file_queue, environment_name + " Data Collector Command Queue" 

107 ) 

108 self.controller_communication_queue = controller_communication_queue 

109 self.data_in_queue = data_in_queue 

110 self.data_out_queue = data_out_queue 

111 self.data_for_spectral_computation_queue = mp.Queue() 

112 self.updated_spectral_quantities_queue = mp.Queue() 

113 self.time_histories_to_generate_queue = mp.Queue() 

114 self.log_file_queue = log_file_queue 

115 

116 

117# %% Metadata 

118class SkeletonMetadata(AbstractSysIdMetadata): 

119 def __init__(self): 

120 pass 

121 

122 @property 

123 def number_of_channels(self): 

124 pass 

125 

126 @property 

127 def response_channel_indices(self): 

128 pass 

129 

130 @property 

131 def reference_channel_indices(self): 

132 pass 

133 

134 @property 

135 def response_transformation_matrix(self): 

136 pass 

137 

138 @property 

139 def reference_transformation_matrix(self): 

140 pass 

141 

142 @property 

143 def sample_rate(self): 

144 pass 

145 

146 def store_to_netcdf(self, netcdf_group_handle: nc4._netCDF4.Group): 

147 super().store_to_netcdf(netcdf_group_handle) 

148 

149 

150# %% UI 

151 

152from .data_collector import ( 

153 data_collector_process, 

154) 

155from .signal_generation_process import ( 

156 signal_generation_process, 

157) 

158from .spectral_processing import ( 

159 spectral_processing_process, 

160) 

161 

162 

163class SkeletonUI(AbstractSysIdUI): 

164 def __init__( 

165 self, 

166 environment_name: str, 

167 definition_tabwidget: QtWidgets.QTabWidget, 

168 system_id_tabwidget: QtWidgets.QTabWidget, 

169 test_predictions_tabwidget: QtWidgets.QTabWidget, 

170 run_tabwidget: QtWidgets.QTabWidget, 

171 environment_command_queue: VerboseMessageQueue, 

172 controller_communication_queue: VerboseMessageQueue, 

173 log_file_queue: Queue, 

174 ): 

175 super().__init__( 

176 environment_name, 

177 environment_command_queue, 

178 controller_communication_queue, 

179 log_file_queue, 

180 system_id_tabwidget, 

181 ) 

182 # Add the page to the control definition tabwidget 

183 self.definition_widget = QtWidgets.QWidget() 

184 uic.loadUi(environment_definition_ui_paths[control_type], self.definition_widget) 

185 definition_tabwidget.addTab(self.definition_widget, self.environment_name) 

186 # Add the page to the control prediction tabwidget 

187 self.prediction_widget = QtWidgets.QWidget() 

188 uic.loadUi(environment_prediction_ui_paths[control_type], self.prediction_widget) 

189 test_predictions_tabwidget.addTab(self.prediction_widget, self.environment_name) 

190 # Add the page to the run tabwidget 

191 self.run_widget = QtWidgets.QWidget() 

192 uic.loadUi(environment_run_ui_paths[control_type], self.run_widget) 

193 run_tabwidget.addTab(self.run_widget, self.environment_name) 

194 

195 def collect_environment_definition_parameters(self): 

196 pass 

197 

198 def create_environment_template(self, environment_name, workbook): 

199 pass 

200 

201 def initialize_data_acquisition(self, data_acquisition_parameters): 

202 pass 

203 

204 def initialize_environment(self): 

205 pass 

206 

207 @property 

208 def initialized_control_names(self): 

209 pass 

210 

211 @property 

212 def initialized_output_names(self): 

213 pass 

214 

215 def retrieve_metadata(self, netcdf_handle): 

216 pass 

217 

218 def set_parameters_from_template(self, worksheet): 

219 pass 

220 

221 def start_control(self): 

222 pass 

223 

224 def stop_control(self): 

225 pass 

226 

227 def update_gui(self, queue_data): 

228 if super().update_gui(queue_data): 

229 return 

230 

231 

232# %% Environment 

233 

234 

235class SkeletonEnvironment(AbstractSysIdEnvironment): 

236 

237 def __init__(self, environment_name: str, queue_container: SkeletonQueues): 

238 super().__init__( 

239 environment_name, 

240 queue_container.environment_command_queue, 

241 queue_container.gui_update_queue, 

242 queue_container.controller_communication_queue, 

243 queue_container.log_file_queue, 

244 queue_container.collector_command_queue, 

245 queue_container.signal_generation_command_queue, 

246 queue_container.spectral_command_queue, 

247 queue_container.data_analysis_command_queue, 

248 queue_container.data_in_queue, 

249 queue_container.data_out_queue, 

250 ) 

251 

252 def start_control(self, data): 

253 pass 

254 

255 def stop_environment(self, data): 

256 pass 

257 

258 

259# %% Process 

260 

261 

262def skeleton_process( 

263 environment_name: str, 

264 input_queue: VerboseMessageQueue, 

265 gui_update_queue: Queue, 

266 controller_communication_queue: VerboseMessageQueue, 

267 log_file_queue: Queue, 

268 data_in_queue: Queue, 

269 data_out_queue: Queue, 

270): 

271 # Create vibration queues 

272 queue_container = SkeletonQueues( 

273 environment_name, 

274 input_queue, 

275 gui_update_queue, 

276 controller_communication_queue, 

277 data_in_queue, 

278 data_out_queue, 

279 log_file_queue, 

280 ) 

281 

282 spectral_proc = mp.Process( 

283 target=spectral_processing_process, 

284 args=( 

285 environment_name, 

286 queue_container.spectral_command_queue, 

287 queue_container.data_for_spectral_computation_queue, 

288 queue_container.updated_spectral_quantities_queue, 

289 queue_container.environment_command_queue, 

290 queue_container.gui_update_queue, 

291 queue_container.log_file_queue, 

292 ), 

293 ) 

294 spectral_proc.start() 

295 analysis_proc = mp.Process( 

296 # Will need to define and import a data_analysis_process 

297 target=data_analysis_process, # type: ignore 

298 args=( 

299 environment_name, 

300 queue_container.data_analysis_command_queue, 

301 queue_container.updated_spectral_quantities_queue, 

302 queue_container.time_histories_to_generate_queue, 

303 queue_container.environment_command_queue, 

304 queue_container.gui_update_queue, 

305 queue_container.log_file_queue, 

306 ), 

307 ) 

308 analysis_proc.start() 

309 siggen_proc = mp.Process( 

310 target=signal_generation_process, 

311 args=( 

312 environment_name, 

313 queue_container.signal_generation_command_queue, 

314 queue_container.time_histories_to_generate_queue, 

315 queue_container.data_out_queue, 

316 queue_container.environment_command_queue, 

317 queue_container.log_file_queue, 

318 queue_container.gui_update_queue, 

319 ), 

320 ) 

321 siggen_proc.start() 

322 collection_proc = mp.Process( 

323 target=data_collector_process, 

324 args=( 

325 environment_name, 

326 queue_container.collector_command_queue, 

327 queue_container.data_in_queue, 

328 [queue_container.data_for_spectral_computation_queue], 

329 queue_container.environment_command_queue, 

330 queue_container.log_file_queue, 

331 queue_container.gui_update_queue, 

332 ), 

333 ) 

334 

335 # collection_proc.start() 

336 process_class = SkeletonEnvironment(environment_name, queue_container) 

337 process_class.run() 

338 

339 # Rejoin all the processes 

340 process_class.log("Joining Subprocesses") 

341 process_class.log("Joining Spectral Computation") 

342 spectral_proc.join() 

343 process_class.log("Joining Data Analysis") 

344 analysis_proc.join() 

345 process_class.log("Joining Signal Generation") 

346 siggen_proc.join() 

347 process_class.log("Joining Data Collection") 

348 collection_proc.join()