Coverage for / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / rattlesnake.py: 0%
91 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
1# -*- coding: utf-8 -*-
2"""
3Rattlesnake Vibration Control Software
4Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
5(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
6Government retains certain rights in this software.
8This program is free software: you can redistribute it and/or modify
9it under the terms of the GNU General Public License as published by
10the Free Software Foundation, either version 3 of the License, or
11(at your option) any later version.
13This program is distributed in the hope that it will be useful,
14but WITHOUT ANY WARRANTY; without even the implied warranty of
15MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16GNU General Public License for more details.
18You should have received a copy of the GNU General Public License
19along with this program. If not, see <https://www.gnu.org/licenses/>.
20"""
22import datetime
23import multiprocessing as mp
24import sys
26from qtpy import QtWidgets
28from .components import (
29 ControlSelect,
30 ControlTypes,
31 EnvironmentSelect,
32 QueueContainer,
33 Ui,
34 VerboseMessageQueue,
35 acquisition_process,
36 all_environment_processes,
37 log_file_task,
38 output_process,
39 streaming_process,
40)
43def main():
44 """Main Rattlesnake Application Entry Point"""
45 mp.freeze_support() # Required to compile into an executable
46 print("Loading Rattlesnake...")
47 # Create the user interface application
48 app = QtWidgets.QApplication(sys.argv)
50 # Check to see if the arguments have specified the control strategy
51 upper_args = [arg.upper() for arg in sys.argv]
52 control_type = None
53 for ct in ControlTypes:
54 if ct.name in upper_args:
55 control_type = ct
56 print(f"Using Control Type {ct.name} from command line")
57 break
58 if control_type is None:
59 control_type, close_flag = ControlSelect.select_control()
60 if not close_flag:
61 sys.exit()
63 loaded_profile = None
64 if control_type == ControlTypes.COMBINED:
65 environment_select_results = EnvironmentSelect.select_environment()
66 if environment_select_results[0] == 0:
67 sys.exit()
68 environments = environment_select_results[1]
69 if environment_select_results[0] == -1:
70 loaded_profile = environment_select_results[2]
71 else:
72 environments = [[control_type, control_type.name.title()]]
74 # Create the processes
75 # Set up the log file process
76 log_file_queue = mp.Queue()
77 log_file_process = mp.Process(target=log_file_task, args=(log_file_queue,))
78 log_file_process.start()
80 # Set up the other command queues
81 acquisition_command_queue = VerboseMessageQueue(log_file_queue, "Acquisition Command Queue")
82 output_command_queue = VerboseMessageQueue(log_file_queue, "Output Command Queue")
83 streaming_command_queue = VerboseMessageQueue(log_file_queue, "Streaming Command Queue")
85 # Set up synchronization queues
86 input_output_sync_queue = mp.Queue()
87 # environment_sync_queue = VerboseMessageQueue(log_file_queue,'Environment Sync Queue')
88 single_process_hardware_queue = mp.Queue()
90 # Set up shared memory to know when acquisition and output are running
91 acquisition_active = mp.Value("i", 0)
92 output_active = mp.Value("i", 0)
94 # Create Queues for communication back to the GUI
95 gui_update_queue = mp.Queue()
97 # Set up a global communication queue so the subprocesses can talk back to the controller
98 controller_communication_queue = VerboseMessageQueue(
99 log_file_queue, "Controller Communication Queue"
100 )
102 # Set up the individual environment processes and queues
103 environment_processes = {}
104 environment_command_queues = {}
105 environment_data_in_queues = {}
106 environment_data_out_queues = {}
107 for environment_type, environment_name in environments:
108 # Create the queue
109 environment_command_queues[environment_name] = VerboseMessageQueue(
110 log_file_queue, environment_name + " Command Queue"
111 )
112 environment_data_in_queues[environment_name] = mp.Queue()
113 environment_data_out_queues[environment_name] = mp.Queue()
114 # Select the right process function
115 process_fn = all_environment_processes[environment_type]
116 # Start the process
117 environment_processes[environment_name] = mp.Process(
118 target=process_fn,
119 args=(
120 environment_name,
121 environment_command_queues[environment_name],
122 gui_update_queue,
123 controller_communication_queue,
124 log_file_queue,
125 environment_data_in_queues[environment_name],
126 environment_data_out_queues[environment_name],
127 acquisition_active,
128 output_active,
129 ),
130 )
131 environment_processes[environment_name].start()
133 # Put all the queues into one nice package
134 queue_container = QueueContainer(
135 controller_communication_queue,
136 acquisition_command_queue,
137 output_command_queue,
138 streaming_command_queue,
139 log_file_queue,
140 input_output_sync_queue,
141 # environment_sync_queue,
142 single_process_hardware_queue,
143 gui_update_queue,
144 environment_command_queues,
145 environment_data_in_queues,
146 environment_data_out_queues,
147 )
149 # Set up the processes
150 acquisition_proc = mp.Process(
151 target=acquisition_process,
152 args=(queue_container, environments, acquisition_active),
153 )
154 acquisition_proc.start()
156 output_proc = mp.Process(
157 target=output_process, args=(queue_container, environments, output_active)
158 )
159 output_proc.start()
161 streaming_proc = mp.Process(target=streaming_process, args=(queue_container,))
162 streaming_proc.start()
164 _ = Ui(environments, queue_container, loaded_profile)
166 # Run the program
167 app.exec_()
169 # Rejoin all proceseses
170 log_file_queue.put(f"{datetime.datetime.now()}: Joining Acquisition Process\n")
171 acquisition_proc.join(timeout=5)
172 if acquisition_proc.is_alive():
173 log_file_queue.put(f"{datetime.datetime.now()}: Force Closing Acquisition Process\n")
174 acquisition_proc.terminate()
175 acquisition_proc.join()
176 log_file_queue.put(f"{datetime.datetime.now()}: Joining Output Process\n")
177 output_proc.join(timeout=5)
178 if output_proc.is_alive():
179 log_file_queue.put(f"{datetime.datetime.now()}: Force Closing Output Process\n")
180 output_proc.terminate()
181 output_proc.join()
182 log_file_queue.put(f"{datetime.datetime.now()}: Joining Streaming Process\n")
183 streaming_proc.join(timeout=5)
184 if streaming_proc.is_alive():
185 log_file_queue.put(f"{datetime.datetime.now()}: Force Closing Streaming Process\n")
186 streaming_proc.terminate()
187 streaming_proc.join()
188 for environment_name, environment_process in environment_processes.items():
189 log_file_queue.put(f"{datetime.datetime.now()}: Joining {environment_name} Process\n")
190 environment_process.join(timeout=5)
191 if environment_process.is_alive():
192 log_file_queue.put(
193 f"{datetime.datetime.now()}: Force Closing {environment_name} Process\n"
194 )
195 environment_process.terminate()
196 environment_process.join()
197 log_file_queue.put(f"{datetime.datetime.now()}: Joining Log File Process\n")
198 log_file_queue.put("quit")
199 log_file_process.join()
202if __name__ == "__main__":
203 main()