Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / abstract_message_process.py: 100%

53 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Defines abstract processes that can be used as subprocesses in the controller. 

4Uses the message,data producer and consumer paradigm. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24 

25import os 

26import traceback 

27from abc import ABC 

28from datetime import datetime 

29from multiprocessing.queues import Queue 

30 

31from .utilities import GlobalCommands, VerboseMessageQueue 

32 

33 

34class AbstractMessageProcess(ABC): 

35 """Abstract class for a subprocess of an environment. 

36 

37 This class operates similarly to an AbstractEnvironment class but is 

38 designed to be a sub-process of the environment 

39 

40 """ 

41 

42 def __init__( 

43 self, 

44 process_name: str, 

45 log_file_queue: Queue, 

46 command_queue: VerboseMessageQueue, 

47 gui_update_queue: Queue, 

48 ): 

49 """ 

50 Constructor for the AbstractMessageProcess class. 

51 

52 Sets up private data members for the properties. Initializes the 

53 ``command_map`` with the ``GlobalCommands.QUIT`` message 

54 

55 Parameters 

56 ---------- 

57 process_name : str 

58 Name of the process 

59 log_file_queue : Queue 

60 Queue to which log file messages should be written. 

61 command_queue : VerboseMessageQueue 

62 Queue from which instructions for the process will be pulled 

63 gui_update_queue : Queue 

64 Queue to which GUI update instructions will be written. 

65 

66 """ 

67 self._process_name = process_name 

68 self._log_file_queue = log_file_queue 

69 self._gui_update_queue = gui_update_queue 

70 self._command_queue = command_queue 

71 self._command_map = {GlobalCommands.QUIT: self.quit} 

72 

73 def log(self, message): 

74 """Write a message to the log file 

75 

76 This function puts a message onto the ``log_file_queue`` so it will 

77 eventually be written to the log file. 

78 

79 When written to the log file, the message will include the date and 

80 time that the message was queued, the name of the environment, and 

81 then the message itself. 

82 

83 Parameters 

84 ---------- 

85 message : str : 

86 A message that will be written to the log file. 

87 

88 """ 

89 self.log_file_queue.put(f"{datetime.now()}: {self.process_name} -- {message}\n") 

90 

91 @property 

92 def process_name(self) -> str: 

93 """Property containing the name of the process used when writing log messages""" 

94 return self._process_name 

95 

96 @property 

97 def command_map(self) -> dict: 

98 """Dictionary mapping instructions to functions of the class""" 

99 return self._command_map 

100 

101 @property 

102 def gui_update_queue(self) -> Queue: 

103 """Queue to which GUI update instructions will be written.""" 

104 return self._gui_update_queue 

105 

106 def map_command(self, key, function): 

107 """Maps commands to instructions 

108 

109 Maps the instruction ``key`` to the function ``function`` so when 

110 ``(key,data)`` pairs are pulled from the ``command_queue``, the function 

111 ``function`` is called with argument ``data``. 

112 

113 Parameters 

114 ---------- 

115 key : 

116 Instruction pulled from the command queue 

117 

118 function : 

119 Function to be called when the given ``key`` is pulled from the 

120 ``command_queue`` 

121 

122 """ 

123 self._command_map[key] = function 

124 

125 @property 

126 def command_queue(self) -> VerboseMessageQueue: 

127 """Queue from which instructions for the process will be pulled""" 

128 return self._command_queue 

129 

130 @property 

131 def log_file_queue(self) -> VerboseMessageQueue: 

132 """Queue to which log file messages should be written.""" 

133 return self._log_file_queue 

134 

135 def run(self): 

136 """The main function that is run by the process 

137 

138 A function that is called by the process function that 

139 sits in a while loop waiting for instructions on the command queue. 

140 

141 When the instructions are recieved, they are separated into 

142 ``(message,data)`` pairs. The ``message`` is used in conjuction with 

143 the ``command_map`` to identify which function should be called, and 

144 the ``data`` is passed to that function as the argument. If the 

145 function returns a truthy value, it signals to the ``run`` function 

146 that it is time to stop the loop and exit. 

147 

148 

149 """ 

150 self.log(f"Starting Process with PID {os.getpid()}") 

151 while True: 

152 # Get the message from the queue 

153 message, data = self.command_queue.get(self.process_name) 

154 # Call the function corresponding to that message with the data as argument 

155 try: 

156 function = self.command_map[message] 

157 except KeyError: 

158 self.log( 

159 f"Undefined Message {message}, acceptable messages are {list(self.command_map)}" 

160 ) 

161 continue 

162 try: 

163 halt_flag = function(data) 

164 except Exception: # pylint: disable=broad-exception-caught 

165 tb = traceback.format_exc() 

166 self.log(f"ERROR\n\n {tb}") 

167 self.gui_update_queue.put( 

168 ( 

169 "error", 

170 ( 

171 f"{self.process_name} Error", 

172 f"!!!UNKNOWN ERROR!!!\n\n{tb}", 

173 ), 

174 ) 

175 ) 

176 halt_flag = False 

177 # If we get a true value, stop. 

178 if halt_flag: 

179 self.log("Stopping Process") 

180 break 

181 

182 def quit(self, data): # pylint: disable=unused-argument 

183 """Returns True to stop the ``run`` while loop and exit the process 

184 

185 Parameters 

186 ---------- 

187 data : Ignored 

188 This parameter is not used by the function but must be present 

189 due to the calling signature of functions called through the 

190 ``command_map`` 

191 

192 Returns 

193 ------- 

194 True : 

195 This function returns True to signal to the ``run`` while loop 

196 that it is time to close down the environment. 

197 

198 """ 

199 return True