Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / rattlesnake.py: 0%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Rattlesnake Vibration Control Software 

4Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

5(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

6Government retains certain rights in this software. 

7 

8This program is free software: you can redistribute it and/or modify 

9it under the terms of the GNU General Public License as published by 

10the Free Software Foundation, either version 3 of the License, or 

11(at your option) any later version. 

12 

13This program is distributed in the hope that it will be useful, 

14but WITHOUT ANY WARRANTY; without even the implied warranty of 

15MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16GNU General Public License for more details. 

17 

18You should have received a copy of the GNU General Public License 

19along with this program. If not, see <https://www.gnu.org/licenses/>. 

20""" 

21 

22import datetime 

23import multiprocessing as mp 

24import sys 

25 

26from qtpy import QtWidgets 

27 

28from .components import ( 

29 ControlSelect, 

30 ControlTypes, 

31 EnvironmentSelect, 

32 QueueContainer, 

33 Ui, 

34 VerboseMessageQueue, 

35 acquisition_process, 

36 all_environment_processes, 

37 log_file_task, 

38 output_process, 

39 streaming_process, 

40) 

41 

42 

43def main(): 

44 """Main Rattlesnake Application Entry Point""" 

45 mp.freeze_support() # Required to compile into an executable 

46 print("Loading Rattlesnake...") 

47 # Create the user interface application 

48 app = QtWidgets.QApplication(sys.argv) 

49 

50 # Check to see if the arguments have specified the control strategy 

51 upper_args = [arg.upper() for arg in sys.argv] 

52 control_type = None 

53 for ct in ControlTypes: 

54 if ct.name in upper_args: 

55 control_type = ct 

56 print(f"Using Control Type {ct.name} from command line") 

57 break 

58 if control_type is None: 

59 control_type, close_flag = ControlSelect.select_control() 

60 if not close_flag: 

61 sys.exit() 

62 

63 loaded_profile = None 

64 if control_type == ControlTypes.COMBINED: 

65 environment_select_results = EnvironmentSelect.select_environment() 

66 if environment_select_results[0] == 0: 

67 sys.exit() 

68 environments = environment_select_results[1] 

69 if environment_select_results[0] == -1: 

70 loaded_profile = environment_select_results[2] 

71 else: 

72 environments = [[control_type, control_type.name.title()]] 

73 

74 # Create the processes 

75 # Set up the log file process 

76 log_file_queue = mp.Queue() 

77 log_file_process = mp.Process(target=log_file_task, args=(log_file_queue,)) 

78 log_file_process.start() 

79 

80 # Set up the other command queues 

81 acquisition_command_queue = VerboseMessageQueue(log_file_queue, "Acquisition Command Queue") 

82 output_command_queue = VerboseMessageQueue(log_file_queue, "Output Command Queue") 

83 streaming_command_queue = VerboseMessageQueue(log_file_queue, "Streaming Command Queue") 

84 

85 # Set up synchronization queues 

86 input_output_sync_queue = mp.Queue() 

87 # environment_sync_queue = VerboseMessageQueue(log_file_queue,'Environment Sync Queue') 

88 single_process_hardware_queue = mp.Queue() 

89 

90 # Set up shared memory to know when acquisition and output are running 

91 acquisition_active = mp.Value("i", 0) 

92 output_active = mp.Value("i", 0) 

93 

94 # Create Queues for communication back to the GUI 

95 gui_update_queue = mp.Queue() 

96 

97 # Set up a global communication queue so the subprocesses can talk back to the controller 

98 controller_communication_queue = VerboseMessageQueue( 

99 log_file_queue, "Controller Communication Queue" 

100 ) 

101 

102 # Set up the individual environment processes and queues 

103 environment_processes = {} 

104 environment_command_queues = {} 

105 environment_data_in_queues = {} 

106 environment_data_out_queues = {} 

107 for environment_type, environment_name in environments: 

108 # Create the queue 

109 environment_command_queues[environment_name] = VerboseMessageQueue( 

110 log_file_queue, environment_name + " Command Queue" 

111 ) 

112 environment_data_in_queues[environment_name] = mp.Queue() 

113 environment_data_out_queues[environment_name] = mp.Queue() 

114 # Select the right process function 

115 process_fn = all_environment_processes[environment_type] 

116 # Start the process 

117 environment_processes[environment_name] = mp.Process( 

118 target=process_fn, 

119 args=( 

120 environment_name, 

121 environment_command_queues[environment_name], 

122 gui_update_queue, 

123 controller_communication_queue, 

124 log_file_queue, 

125 environment_data_in_queues[environment_name], 

126 environment_data_out_queues[environment_name], 

127 acquisition_active, 

128 output_active, 

129 ), 

130 ) 

131 environment_processes[environment_name].start() 

132 

133 # Put all the queues into one nice package 

134 queue_container = QueueContainer( 

135 controller_communication_queue, 

136 acquisition_command_queue, 

137 output_command_queue, 

138 streaming_command_queue, 

139 log_file_queue, 

140 input_output_sync_queue, 

141 # environment_sync_queue, 

142 single_process_hardware_queue, 

143 gui_update_queue, 

144 environment_command_queues, 

145 environment_data_in_queues, 

146 environment_data_out_queues, 

147 ) 

148 

149 # Set up the processes 

150 acquisition_proc = mp.Process( 

151 target=acquisition_process, 

152 args=(queue_container, environments, acquisition_active), 

153 ) 

154 acquisition_proc.start() 

155 

156 output_proc = mp.Process( 

157 target=output_process, args=(queue_container, environments, output_active) 

158 ) 

159 output_proc.start() 

160 

161 streaming_proc = mp.Process(target=streaming_process, args=(queue_container,)) 

162 streaming_proc.start() 

163 

164 _ = Ui(environments, queue_container, loaded_profile) 

165 

166 # Run the program 

167 app.exec_() 

168 

169 # Rejoin all proceseses 

170 log_file_queue.put(f"{datetime.datetime.now()}: Joining Acquisition Process\n") 

171 acquisition_proc.join(timeout=5) 

172 if acquisition_proc.is_alive(): 

173 log_file_queue.put(f"{datetime.datetime.now()}: Force Closing Acquisition Process\n") 

174 acquisition_proc.terminate() 

175 acquisition_proc.join() 

176 log_file_queue.put(f"{datetime.datetime.now()}: Joining Output Process\n") 

177 output_proc.join(timeout=5) 

178 if output_proc.is_alive(): 

179 log_file_queue.put(f"{datetime.datetime.now()}: Force Closing Output Process\n") 

180 output_proc.terminate() 

181 output_proc.join() 

182 log_file_queue.put(f"{datetime.datetime.now()}: Joining Streaming Process\n") 

183 streaming_proc.join(timeout=5) 

184 if streaming_proc.is_alive(): 

185 log_file_queue.put(f"{datetime.datetime.now()}: Force Closing Streaming Process\n") 

186 streaming_proc.terminate() 

187 streaming_proc.join() 

188 for environment_name, environment_process in environment_processes.items(): 

189 log_file_queue.put(f"{datetime.datetime.now()}: Joining {environment_name} Process\n") 

190 environment_process.join(timeout=5) 

191 if environment_process.is_alive(): 

192 log_file_queue.put( 

193 f"{datetime.datetime.now()}: Force Closing {environment_name} Process\n" 

194 ) 

195 environment_process.terminate() 

196 environment_process.join() 

197 log_file_queue.put(f"{datetime.datetime.now()}: Joining Log File Process\n") 

198 log_file_queue.put("quit") 

199 log_file_process.join() 

200 

201 

202if __name__ == "__main__": 

203 main()