Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / streaming.py: 96%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Controller subsystem that handles streaming data and metadata to NetCDF4 files 

4on the disk. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24 

25from typing import Dict 

26 

27import netCDF4 as nc 

28import numpy as np 

29 

30from .abstract_environment import AbstractMetadata 

31from .abstract_message_process import AbstractMessageProcess 

32from .utilities import DataAcquisitionParameters, GlobalCommands, QueueContainer 

33 

34 

35class StreamingProcess(AbstractMessageProcess): 

36 """Class containing the functionality to stream data to disk. 

37 

38 This class will handle receiving data from the acquisition and saving it 

39 to a netCDF file.""" 

40 

41 def __init__(self, process_name: str, queue_container: QueueContainer): 

42 """Constructor for the StreamingProcess class 

43 

44 Sets up the ``command_map`` and initializes all data members. 

45 

46 Parameters 

47 ---------- 

48 process_name : str 

49 The name of the process. 

50 queue_container : QueueContainer 

51 A container containing the queues used to communicate between 

52 controller processes 

53 

54 """ 

55 super().__init__( 

56 process_name, 

57 queue_container.log_file_queue, 

58 queue_container.streaming_command_queue, 

59 queue_container.gui_update_queue, 

60 ) 

61 self.map_command(GlobalCommands.INITIALIZE_STREAMING, self.initialize) 

62 self.map_command(GlobalCommands.STREAMING_DATA, self.write_data) 

63 self.map_command(GlobalCommands.FINALIZE_STREAMING, self.finalize) 

64 self.map_command(GlobalCommands.CREATE_NEW_STREAM, self.create_new_stream) 

65 self.netcdf_handle = None 

66 # Track the variable we are streaming data to 

67 self.stream_variable = "time_data" 

68 self.stream_dimension = "time_samples" 

69 self.stream_index = 0 

70 

71 def initialize(self, data): 

72 """Creates a file with all metadata from the controller 

73 

74 Creates a netCDF4 dataset and stores all the global data acquisition 

75 parameters as well as the parameters from each environment. 

76 

77 Parameters 

78 ---------- 

79 data : tuple 

80 Tuple containing a string filename, global DataAcquisitionParameters 

81 defining the controller settings, and a dictionary containing the 

82 environment names as keys and the environment metadata (inheriting 

83 from AbstractMetadata) as values for each environment. 

84 

85 """ 

86 filename: str 

87 global_data_parameters: DataAcquisitionParameters 

88 environment_metadata: Dict[str, AbstractMetadata] 

89 filename, global_data_parameters, environment_metadata = data 

90 self.stream_variable = "time_data" 

91 self.stream_dimension = "time_samples" 

92 self.stream_index = 0 

93 self.netcdf_handle = nc.Dataset( # pylint: disable=no-member 

94 filename, "w", format="NETCDF4", clobber=True 

95 ) 

96 # Create dimensions 

97 self.netcdf_handle.createDimension( 

98 "response_channels", len(global_data_parameters.channel_list) 

99 ) 

100 self.netcdf_handle.createDimension( 

101 "output_channels", 

102 len( 

103 [ 

104 channel 

105 for channel in global_data_parameters.channel_list 

106 if channel.feedback_device is not None 

107 ] 

108 ), 

109 ) 

110 self.netcdf_handle.createDimension(self.stream_dimension, None) 

111 self.netcdf_handle.createDimension( 

112 "num_environments", len(global_data_parameters.environment_names) 

113 ) 

114 # Create attributes 

115 self.netcdf_handle.file_version = "3.0.0" 

116 self.netcdf_handle.sample_rate = global_data_parameters.sample_rate 

117 self.netcdf_handle.time_per_write = ( 

118 global_data_parameters.samples_per_write / global_data_parameters.output_sample_rate 

119 ) 

120 self.netcdf_handle.time_per_read = ( 

121 global_data_parameters.samples_per_read / global_data_parameters.sample_rate 

122 ) 

123 self.netcdf_handle.hardware = global_data_parameters.hardware 

124 self.netcdf_handle.hardware_file = ( 

125 "None" 

126 if global_data_parameters.hardware_file is None 

127 else global_data_parameters.hardware_file 

128 ) 

129 self.netcdf_handle.output_oversample = global_data_parameters.output_oversample 

130 for name, value in global_data_parameters.extra_parameters.items(): 

131 setattr(self.netcdf_handle, name, value) 

132 # Create Variables 

133 self.netcdf_handle.createVariable( 

134 self.stream_variable, "f8", ("response_channels", self.stream_dimension) 

135 ) 

136 var = self.netcdf_handle.createVariable("environment_names", str, ("num_environments",)) 

137 for i, name in enumerate(global_data_parameters.environment_names): 

138 var[i] = name 

139 var = self.netcdf_handle.createVariable( 

140 "environment_active_channels", 

141 "i1", 

142 ("response_channels", "num_environments"), 

143 ) 

144 var[...] = global_data_parameters.environment_active_channels.astype("int8") 

145 # Create channel table variables 

146 labels = [ 

147 ["node_number", str], 

148 ["node_direction", str], 

149 ["comment", str], 

150 ["serial_number", str], 

151 ["triax_dof", str], 

152 ["sensitivity", str], 

153 ["unit", str], 

154 ["make", str], 

155 ["model", str], 

156 ["expiration", str], 

157 ["physical_device", str], 

158 ["physical_channel", str], 

159 ["channel_type", str], 

160 ["minimum_value", str], 

161 ["maximum_value", str], 

162 ["coupling", str], 

163 ["excitation_source", str], 

164 ["excitation", str], 

165 ["feedback_device", str], 

166 ["feedback_channel", str], 

167 ["warning_level", str], 

168 ["abort_level", str], 

169 ] 

170 for label, netcdf_datatype in labels: 

171 var = self.netcdf_handle.createVariable( 

172 "/channels/" + label, netcdf_datatype, ("response_channels",) 

173 ) 

174 channel_data = [ 

175 getattr(channel, label) for channel in global_data_parameters.channel_list 

176 ] 

177 if netcdf_datatype == "i1": 

178 channel_data = np.array([1 if val else 0 for val in channel_data]) 

179 else: 

180 channel_data = ["" if val is None else val for val in channel_data] 

181 for i, cd in enumerate(channel_data): 

182 var[i] = cd 

183 # Now write all the environment data to the netCDF file 

184 for environment_name, metadata in environment_metadata.items(): 

185 group_handle = self.netcdf_handle.createGroup(environment_name) 

186 metadata.store_to_netcdf(group_handle) 

187 

188 def write_data(self, data): 

189 """Writes data to an initialized netCDF file 

190 

191 Parameters 

192 ---------- 

193 data : np.ndarray 

194 Data to be written to the netCDF file 

195 

196 

197 """ 

198 if self.netcdf_handle is None: 

199 return 

200 test_data = data 

201 timesteps = slice(self.netcdf_handle.dimensions[self.stream_dimension].size, None, None) 

202 self.netcdf_handle.variables[self.stream_variable][:, timesteps] = test_data 

203 

204 def create_new_stream(self, data): # pylint: disable=unused-argument 

205 """Creates a new stream in the streaming file""" 

206 if self.netcdf_handle is None: 

207 return 

208 self.stream_index += 1 

209 self.stream_variable = f"time_data_{self.stream_index}" 

210 self.stream_dimension = f"time_samples_{self.stream_index}" 

211 self.netcdf_handle.createDimension(self.stream_dimension, None) 

212 self.netcdf_handle.createVariable( 

213 self.stream_variable, "f8", ("response_channels", self.stream_dimension) 

214 ) 

215 

216 def finalize(self, data): # pylint: disable=unused-argument 

217 """Closes the netCDF file when data writing is complete 

218 

219 Parameters 

220 ---------- 

221 data : Ignored 

222 This parameter is not used by the function but must be present 

223 due to the calling signature of functions called through the 

224 ``command_map`` 

225 """ 

226 if self.netcdf_handle is not None: 

227 self.netcdf_handle.close() 

228 self.netcdf_handle = None 

229 

230 def quit(self, data): 

231 """Stops the process. 

232 

233 Parameters 

234 ---------- 

235 data : Ignored 

236 This parameter is not used by the function but must be present 

237 due to the calling signature of functions called through the 

238 ``command_map`` 

239 """ 

240 self.finalize(None) 

241 return True 

242 

243 

244def streaming_process(queue_container: QueueContainer): 

245 """Function passed to multiprocessing as the streaming process 

246 

247 This process creates the ``StreamingProcess`` object and calls the ``run`` 

248 command. 

249 

250 Parameters 

251 ---------- 

252 queue_container : QueueContainer 

253 A container containing the queues used to communicate between 

254 controller processes 

255 

256 """ 

257 

258 streaming_instance = StreamingProcess("Streaming", queue_container) 

259 

260 streaming_instance.run()