Coverage for / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / streaming.py: 96%
78 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
1# -*- coding: utf-8 -*-
2"""
3Controller subsystem that handles streaming data and metadata to NetCDF4 files
4on the disk.
6Rattlesnake Vibration Control Software
7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
9Government retains certain rights in this software.
11This program is free software: you can redistribute it and/or modify
12it under the terms of the GNU General Public License as published by
13the Free Software Foundation, either version 3 of the License, or
14(at your option) any later version.
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License for more details.
21You should have received a copy of the GNU General Public License
22along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""
25from typing import Dict
27import netCDF4 as nc
28import numpy as np
30from .abstract_environment import AbstractMetadata
31from .abstract_message_process import AbstractMessageProcess
32from .utilities import DataAcquisitionParameters, GlobalCommands, QueueContainer
35class StreamingProcess(AbstractMessageProcess):
36 """Class containing the functionality to stream data to disk.
38 This class will handle receiving data from the acquisition and saving it
39 to a netCDF file."""
41 def __init__(self, process_name: str, queue_container: QueueContainer):
42 """Constructor for the StreamingProcess class
44 Sets up the ``command_map`` and initializes all data members.
46 Parameters
47 ----------
48 process_name : str
49 The name of the process.
50 queue_container : QueueContainer
51 A container containing the queues used to communicate between
52 controller processes
54 """
55 super().__init__(
56 process_name,
57 queue_container.log_file_queue,
58 queue_container.streaming_command_queue,
59 queue_container.gui_update_queue,
60 )
61 self.map_command(GlobalCommands.INITIALIZE_STREAMING, self.initialize)
62 self.map_command(GlobalCommands.STREAMING_DATA, self.write_data)
63 self.map_command(GlobalCommands.FINALIZE_STREAMING, self.finalize)
64 self.map_command(GlobalCommands.CREATE_NEW_STREAM, self.create_new_stream)
65 self.netcdf_handle = None
66 # Track the variable we are streaming data to
67 self.stream_variable = "time_data"
68 self.stream_dimension = "time_samples"
69 self.stream_index = 0
71 def initialize(self, data):
72 """Creates a file with all metadata from the controller
74 Creates a netCDF4 dataset and stores all the global data acquisition
75 parameters as well as the parameters from each environment.
77 Parameters
78 ----------
79 data : tuple
80 Tuple containing a string filename, global DataAcquisitionParameters
81 defining the controller settings, and a dictionary containing the
82 environment names as keys and the environment metadata (inheriting
83 from AbstractMetadata) as values for each environment.
85 """
86 filename: str
87 global_data_parameters: DataAcquisitionParameters
88 environment_metadata: Dict[str, AbstractMetadata]
89 filename, global_data_parameters, environment_metadata = data
90 self.stream_variable = "time_data"
91 self.stream_dimension = "time_samples"
92 self.stream_index = 0
93 self.netcdf_handle = nc.Dataset( # pylint: disable=no-member
94 filename, "w", format="NETCDF4", clobber=True
95 )
96 # Create dimensions
97 self.netcdf_handle.createDimension(
98 "response_channels", len(global_data_parameters.channel_list)
99 )
100 self.netcdf_handle.createDimension(
101 "output_channels",
102 len(
103 [
104 channel
105 for channel in global_data_parameters.channel_list
106 if channel.feedback_device is not None
107 ]
108 ),
109 )
110 self.netcdf_handle.createDimension(self.stream_dimension, None)
111 self.netcdf_handle.createDimension(
112 "num_environments", len(global_data_parameters.environment_names)
113 )
114 # Create attributes
115 self.netcdf_handle.file_version = "3.0.0"
116 self.netcdf_handle.sample_rate = global_data_parameters.sample_rate
117 self.netcdf_handle.time_per_write = (
118 global_data_parameters.samples_per_write / global_data_parameters.output_sample_rate
119 )
120 self.netcdf_handle.time_per_read = (
121 global_data_parameters.samples_per_read / global_data_parameters.sample_rate
122 )
123 self.netcdf_handle.hardware = global_data_parameters.hardware
124 self.netcdf_handle.hardware_file = (
125 "None"
126 if global_data_parameters.hardware_file is None
127 else global_data_parameters.hardware_file
128 )
129 self.netcdf_handle.output_oversample = global_data_parameters.output_oversample
130 for name, value in global_data_parameters.extra_parameters.items():
131 setattr(self.netcdf_handle, name, value)
132 # Create Variables
133 self.netcdf_handle.createVariable(
134 self.stream_variable, "f8", ("response_channels", self.stream_dimension)
135 )
136 var = self.netcdf_handle.createVariable("environment_names", str, ("num_environments",))
137 for i, name in enumerate(global_data_parameters.environment_names):
138 var[i] = name
139 var = self.netcdf_handle.createVariable(
140 "environment_active_channels",
141 "i1",
142 ("response_channels", "num_environments"),
143 )
144 var[...] = global_data_parameters.environment_active_channels.astype("int8")
145 # Create channel table variables
146 labels = [
147 ["node_number", str],
148 ["node_direction", str],
149 ["comment", str],
150 ["serial_number", str],
151 ["triax_dof", str],
152 ["sensitivity", str],
153 ["unit", str],
154 ["make", str],
155 ["model", str],
156 ["expiration", str],
157 ["physical_device", str],
158 ["physical_channel", str],
159 ["channel_type", str],
160 ["minimum_value", str],
161 ["maximum_value", str],
162 ["coupling", str],
163 ["excitation_source", str],
164 ["excitation", str],
165 ["feedback_device", str],
166 ["feedback_channel", str],
167 ["warning_level", str],
168 ["abort_level", str],
169 ]
170 for label, netcdf_datatype in labels:
171 var = self.netcdf_handle.createVariable(
172 "/channels/" + label, netcdf_datatype, ("response_channels",)
173 )
174 channel_data = [
175 getattr(channel, label) for channel in global_data_parameters.channel_list
176 ]
177 if netcdf_datatype == "i1":
178 channel_data = np.array([1 if val else 0 for val in channel_data])
179 else:
180 channel_data = ["" if val is None else val for val in channel_data]
181 for i, cd in enumerate(channel_data):
182 var[i] = cd
183 # Now write all the environment data to the netCDF file
184 for environment_name, metadata in environment_metadata.items():
185 group_handle = self.netcdf_handle.createGroup(environment_name)
186 metadata.store_to_netcdf(group_handle)
188 def write_data(self, data):
189 """Writes data to an initialized netCDF file
191 Parameters
192 ----------
193 data : np.ndarray
194 Data to be written to the netCDF file
197 """
198 if self.netcdf_handle is None:
199 return
200 test_data = data
201 timesteps = slice(self.netcdf_handle.dimensions[self.stream_dimension].size, None, None)
202 self.netcdf_handle.variables[self.stream_variable][:, timesteps] = test_data
204 def create_new_stream(self, data): # pylint: disable=unused-argument
205 """Creates a new stream in the streaming file"""
206 if self.netcdf_handle is None:
207 return
208 self.stream_index += 1
209 self.stream_variable = f"time_data_{self.stream_index}"
210 self.stream_dimension = f"time_samples_{self.stream_index}"
211 self.netcdf_handle.createDimension(self.stream_dimension, None)
212 self.netcdf_handle.createVariable(
213 self.stream_variable, "f8", ("response_channels", self.stream_dimension)
214 )
216 def finalize(self, data): # pylint: disable=unused-argument
217 """Closes the netCDF file when data writing is complete
219 Parameters
220 ----------
221 data : Ignored
222 This parameter is not used by the function but must be present
223 due to the calling signature of functions called through the
224 ``command_map``
225 """
226 if self.netcdf_handle is not None:
227 self.netcdf_handle.close()
228 self.netcdf_handle = None
230 def quit(self, data):
231 """Stops the process.
233 Parameters
234 ----------
235 data : Ignored
236 This parameter is not used by the function but must be present
237 due to the calling signature of functions called through the
238 ``command_map``
239 """
240 self.finalize(None)
241 return True
244def streaming_process(queue_container: QueueContainer):
245 """Function passed to multiprocessing as the streaming process
247 This process creates the ``StreamingProcess`` object and calls the ``run``
248 command.
250 Parameters
251 ----------
252 queue_container : QueueContainer
253 A container containing the queues used to communicate between
254 controller processes
256 """
258 streaming_instance = StreamingProcess("Streaming", queue_container)
260 streaming_instance.run()