Coverage for / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / output.py: 51%
180 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
1# -*- coding: utf-8 -*-
2"""
3Controller subsystem that handles the output from the hardware to the
4shaker amplifiers or other excitation device.
6Rattlesnake Vibration Control Software
7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
9Government retains certain rights in this software.
11This program is free software: you can redistribute it and/or modify
12it under the terms of the GNU General Public License as published by
13the Free Software Foundation, either version 3 of the License, or
14(at your option) any later version.
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License for more details.
21You should have received a copy of the GNU General Public License
22along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""
25import multiprocessing as mp
26import multiprocessing.sharedctypes # pylint: disable=unused-import
28import numpy as np
30from .abstract_message_process import AbstractMessageProcess
31from .utilities import GlobalCommands, QueueContainer, flush_queue, rms_time
33TASK_NAME = "Output"
35DEBUG = False
36if DEBUG:
37 from glob import glob
39 FILE_OUTPUT = "debug_data/output_{:}.npz"
40 ENV_OUTPUT = "debug_data/output_first_data_{:}.npz"
43class OutputProcess(AbstractMessageProcess):
44 """Class defining the output behavior of the controller
46 This class will handle collecting data from the environments and writing
47 data to be output to the hardware
49 See AbstractMessageProcess for inherited members.
50 """
52 def __init__(
53 self,
54 process_name: str,
55 queue_container: QueueContainer,
56 environments: list,
57 output_active: mp.sharedctypes.Synchronized,
58 ):
59 """
60 Constructor for the OutputProcess Class
62 Sets up the ``command_map`` and initializes all data members.
64 Parameters
65 ----------
66 process_name : str
67 The name of the process.
68 queue_container : QueueContainer
69 A container containing the queues used to communicate between
70 controller processes
71 environments : list
72 A list of ``(ControlType,environment_name)`` pairs that define the
73 environments in the controller.
75 """
76 super().__init__(
77 process_name,
78 queue_container.log_file_queue,
79 queue_container.output_command_queue,
80 queue_container.gui_update_queue,
81 )
82 self.map_command(
83 GlobalCommands.INITIALIZE_DATA_ACQUISITION, self.initialize_data_acquisition
84 )
85 self.map_command(GlobalCommands.RUN_HARDWARE, self.output_signal)
86 self.map_command(GlobalCommands.STOP_HARDWARE, self.stop_output)
87 self.map_command(GlobalCommands.START_ENVIRONMENT, self.start_environment)
88 # Communication
89 self.queue_container = queue_container
90 self.startup = True
91 self.shutdown_flag = False
92 # Sampling data
93 self.sample_rate = None
94 self.write_size = None
95 self.num_outputs = None
96 self.output_oversample = None
97 # Environment Data
98 self.environment_list = self.environment_list = [
99 environment[1] for environment in environments
100 ]
101 self.environment_output_channels = None
102 self.environment_active_flags = {
103 environment: False for environment in self.environment_list
104 }
105 self.environment_starting_up_flags = {
106 environment: False for environment in self.environment_list
107 }
108 self.environment_shutting_down_flags = {
109 environment: False for environment in self.environment_list
110 }
111 self.environment_data_out_remainders = None
112 self.environment_first_data = {environment: False for environment in self.environment_list}
113 # Hardware data
114 self.hardware = None
115 # Shared memory to record activity
116 self._output_active = output_active
117 # print('output setup')
119 @property
120 def output_active(self):
121 """Returns True if the output is currently active"""
122 return bool(self._output_active.value)
124 @output_active.setter
125 def output_active(self, val):
126 # print('output currently active: {:}'.format(self.output_active))
127 # print('setting output active')
128 if val:
129 self._output_active.value = 1
130 else:
131 self._output_active.value = 0
132 # print('set output active')
134 def initialize_data_acquisition(self, data):
135 """
136 Sets up the output according to the specified parameters
138 Parameters
139 ----------
140 data : tuple
141 A tuple consisting of data acquisition parameters and the channels
142 used by each environment.
144 """
145 self.log("Initializing Data Acquisition")
146 # Pull out invormation from the queue
147 data_acquisition_parameters, environment_channels = data
148 # Store pertinent data
149 self.sample_rate = data_acquisition_parameters.sample_rate
150 self.write_size = data_acquisition_parameters.samples_per_write
151 self.output_oversample = data_acquisition_parameters.output_oversample
152 # Check which type of hardware we have
153 if self.hardware is not None:
154 self.hardware.close()
155 if data_acquisition_parameters.hardware == 0:
156 from .nidaqmx_hardware_multitask import NIDAQmxOutput
158 self.hardware = NIDAQmxOutput(
159 data_acquisition_parameters.extra_parameters["task_trigger"],
160 data_acquisition_parameters.extra_parameters["task_trigger_output_channel"],
161 )
162 elif data_acquisition_parameters.hardware == 1:
163 from .lanxi_hardware_multiprocessing import LanXIOutput
165 self.hardware = LanXIOutput(
166 data_acquisition_parameters.extra_parameters["maximum_acquisition_processes"]
167 )
168 elif data_acquisition_parameters.hardware == 2:
169 from .data_physics_hardware import DataPhysicsOutput
171 self.hardware = DataPhysicsOutput(self.queue_container.single_process_hardware_queue)
172 elif data_acquisition_parameters.hardware == 3:
173 from .data_physics_dp900_hardware import DataPhysicsDP900Output
175 self.hardware = DataPhysicsDP900Output(
176 self.queue_container.single_process_hardware_queue,
177 )
178 elif data_acquisition_parameters.hardware == 4:
179 from .exodus_modal_solution_hardware import ExodusOutput
181 self.hardware = ExodusOutput(self.queue_container.single_process_hardware_queue)
182 elif data_acquisition_parameters.hardware == 5:
183 from .state_space_virtual_hardware import StateSpaceOutput
185 self.hardware = StateSpaceOutput(self.queue_container.single_process_hardware_queue)
186 elif data_acquisition_parameters.hardware == 6:
187 from .sdynpy_system_virtual_hardware import SDynPySystemOutput
189 self.hardware = SDynPySystemOutput(self.queue_container.single_process_hardware_queue)
190 elif data_acquisition_parameters.hardware == 7:
191 from .sdynpy_frf_virtual_hardware import SDynPyFRFOutput
193 self.hardware = SDynPyFRFOutput(self.queue_container.single_process_hardware_queue)
194 else:
195 raise ValueError("Invalid Hardware or Hardware Not Implemented!")
196 # Initialize hardware and create channels
197 self.hardware.set_up_data_output_parameters_and_channels(
198 data_acquisition_parameters, data_acquisition_parameters.channel_list
199 )
200 # Get the environment output channels in reference to all the output channels
201 output_indices = [
202 index
203 for index, channel in enumerate(data_acquisition_parameters.channel_list)
204 if not (channel.feedback_device is None) and not (channel.feedback_device.strip() == "")
205 ]
206 self.num_outputs = len(output_indices)
207 self.environment_output_channels = {}
208 self.environment_data_out_remainders = {}
209 for environment, active_indices in environment_channels.items():
210 common_indices, out_inds, _ = np.intersect1d(
211 output_indices, active_indices, return_indices=True
212 )
213 self.environment_output_channels[environment] = out_inds
214 self.environment_data_out_remainders[environment] = np.zeros((len(common_indices), 0))
216 def output_signal(self, data): # pylint: disable=unused-argument
217 """The main output loop of the controller.
219 If it is the first time through the loop, ``self.startup`` will be set
220 to ``True`` and the hardware will be started.
222 The output task must be notified that a given channel has started before
223 it will check the channel for data.
225 The function receives data from each of the environment data_out queues
226 as well as a flag specifying whether the signal is the last signal. If
227 so, the output will deactivate the channel and stop looking for data
228 from it until the next time it is activated.
230 If the output is shut down, it will activate a ``shutdown_flag``, after
231 which the output will wait for all environments to pass their last data
232 and be deactivated. Once all environments are deactivated, the output
233 will stop and shutdown the hardware.
235 Parameters
236 ----------
237 data : Ignored
238 This parameter is not used by the function but must be present
239 due to the calling signature of functions called through the
240 ``command_map``
242 """
243 # Skip hardware operations if there are no channels
244 skip_hardware = self.num_outputs == 0
245 # Go through each environment and collect data no matter what.
246 # Start with ready to write and set to false if any environments are not
247 ready_to_write = True
248 for environment in self.environment_list:
249 # If the task isn't active or currently shutting down, we don't need more data,
250 # so just skip it.
251 if (
252 not self.environment_active_flags[environment]
253 and not self.environment_starting_up_flags[environment]
254 ) or self.environment_shutting_down_flags[environment]:
255 continue
256 # Check if we need more data from that environment
257 if self.environment_data_out_remainders[environment].shape[-1] < self.write_size:
258 if not self.environment_starting_up_flags[environment]:
259 ready_to_write = False
260 try:
261 # Try to grab data from the queue and add it to the remainders.
262 environment_data, last_run = self.queue_container.environment_data_out_queues[
263 environment
264 ].get_nowait()
265 except mp.queues.Empty:
266 # If data is not ready yet, simply continue to the next environment and we'll
267 # try on the next time around.
268 continue
269 self.log(
270 f"Got {' x '.join([f'{shape}' for shape in environment_data.shape])} "
271 f"data from {environment} Environment"
272 )
273 if last_run:
274 self.log(f"Deactivating {environment} Environment")
275 self.environment_shutting_down_flags[environment] = True
276 if self.environment_starting_up_flags[environment]:
277 self.environment_starting_up_flags[environment] = False
278 self.environment_active_flags[environment] = True
279 self.environment_first_data[environment] = True
280 self.environment_data_out_remainders[environment] = np.concatenate(
281 (
282 self.environment_data_out_remainders[environment],
283 environment_data,
284 ),
285 axis=-1,
286 )
287 else:
288 if self.environment_starting_up_flags[environment]:
289 self.environment_starting_up_flags[environment] = False
290 self.environment_active_flags[environment] = True
291 self.log(f"Received First Complete Data Write for {environment} Environment")
292 self.environment_first_data[environment] = True
294 # If we got through that previous loop still ready to write, we can
295 # output the next signal if the hardware is ready
296 if ready_to_write and (
297 self.startup or skip_hardware or self.hardware.ready_for_new_output()
298 ):
299 remainder_log = [
300 (environment, remainder.shape[-1])
301 for environment, remainder in self.environment_data_out_remainders.items()
302 if self.environment_active_flags[environment]
303 ]
304 self.log(f"Ready to Write: Environment Remainders {remainder_log}")
305 write_data = np.zeros((self.num_outputs, self.write_size))
306 for environment in self.environment_list:
307 # If the task is shutting down and all the data has been drained from it,
308 # make it inactive and just skip it.
309 if (
310 self.environment_shutting_down_flags[environment]
311 and self.environment_data_out_remainders[environment].shape[-1] == 0
312 ):
313 self.environment_active_flags[environment] = False
314 self.environment_starting_up_flags[environment] = False
315 self.queue_container.acquisition_command_queue.put(
316 self.process_name,
317 (GlobalCommands.STOP_ENVIRONMENT, environment),
318 )
319 self.environment_shutting_down_flags[environment] = False
320 continue
321 # If the task is inactive, also just skip it
322 elif not self.environment_active_flags[environment]:
323 continue
324 # Get the indices corresponding to the output channels
325 output_indices = self.environment_output_channels[environment]
326 # Determine how many time steps are available to write
327 output_timesteps = min(
328 self.environment_data_out_remainders[environment].shape[-1],
329 self.write_size,
330 )
331 # Write one portion of the environment output to write_data
332 write_data[
333 output_indices, :output_timesteps
334 ] += self.environment_data_out_remainders[environment][:, :output_timesteps]
335 self.environment_data_out_remainders[environment] = (
336 self.environment_data_out_remainders[environment][:, output_timesteps:]
337 )
338 # Now that we have each environment accounted for in the output we
339 # can write to the hardware
340 self.log(
341 f"Writing {' x '.join(f'{shape}' for shape in write_data.shape)} data to hardware"
342 )
343 if not skip_hardware:
344 self.log(
345 f"Output Writing Data to Hardware RMS: \n {rms_time(write_data, axis=-1)}"
346 )
347 for environment in self.environment_first_data:
348 if self.environment_first_data[environment]:
349 self.log(
350 f"Sending first data for environment {environment} to Acquisition "
351 "for syncing"
352 )
353 self.queue_container.input_output_sync_queue.put(
354 (
355 environment,
356 write_data[..., :: self.output_oversample].copy(),
357 )
358 )
359 self.environment_first_data[environment] = False
360 if DEBUG:
361 np.savez(ENV_OUTPUT.format(environment), write_data=write_data)
362 if DEBUG:
363 num_files = len(glob(FILE_OUTPUT.format("*")))
364 np.savez(FILE_OUTPUT.format(num_files), write_data=write_data)
365 self.hardware.write(write_data.copy())
366 else:
367 if self.environment_first_data[environment]:
368 self.queue_container.input_output_sync_queue.put((environment, 0))
369 self.environment_first_data[environment] = False
370 # np.savez('test_data/output_data_check.npz',output_data = write_data)
371 # Now check and see if we are starting up and start the hardare if so
372 if self.startup:
373 self.log("Starting Hardware Output")
374 if not skip_hardware:
375 self.hardware.start()
376 # Send something to the sync queue to tell acquisition to start now
377 # We will send None because it is unique and we won't have an
378 # environment with that name
379 self.queue_container.input_output_sync_queue.put((None, True))
380 self.startup = False
381 self.output_active = True
382 # print('started output')
383 # Now check if we need to shut down.
384 if (
385 self.shutdown_flag # Time to shut down
386 and all(
387 [not flag for environment, flag in self.environment_active_flags.items()]
388 ) # Check that all environments are not active
389 and all(
390 [not flag for environment, flag in self.environment_starting_up_flags.items()]
391 ) # Check that all environments are not starting up
392 and all(
393 [
394 remainder.shape[-1] == 0
395 for environment, remainder in self.environment_data_out_remainders.items()
396 ]
397 ) # Check that all data is written
398 ):
399 self.log("Stopping Hardware")
400 if not skip_hardware:
401 self.hardware.stop()
402 self.startup = True
403 self.shutdown_flag = False
404 flush_queue(self.queue_container.input_output_sync_queue)
405 self.output_active = False
406 else:
407 # Otherwise keep going
408 self.queue_container.output_command_queue.put(
409 self.process_name, (GlobalCommands.RUN_HARDWARE, None)
410 )
412 def stop_output(self, data): # pylint: disable=unused-argument
413 """Sets a flag telling the output that it should start shutting down
415 Parameters
416 ----------
417 data : Ignored
418 This parameter is not used by the function but must be present
419 due to the calling signature of functions called through the
420 ``command_map``
422 """
423 self.log("Starting Shutdown Procedure")
424 self.shutdown_flag = True
426 def start_environment(self, data):
427 """Sets the flag stating the specified environment is active
429 Parameters
430 ----------
431 data : str
432 The environment name that should be activated.
434 """
435 self.log(f"Started Environment {data}")
436 self.environment_starting_up_flags[data] = True
437 self.environment_shutting_down_flags[data] = False
438 self.environment_active_flags[data] = False
440 def quit(self, data): # pylint: disable=unused-argument
441 """Stops the process and shuts down the hardware if necessary.
443 Parameters
444 ----------
445 data : Ignored
446 This parameter is not used by the function but must be present
447 due to the calling signature of functions called through the
448 ``command_map``
449 """
450 # Pull any data off the queues that have been put to
451 queue_flush_sum = 0
452 for queue in [q for _, q in self.queue_container.environment_data_out_queues.items()] + [
453 self.queue_container.output_command_queue,
454 self.queue_container.input_output_sync_queue,
455 self.queue_container.single_process_hardware_queue,
456 ]:
457 queue_flush_sum += len(flush_queue(queue))
458 self.log(f"Flushed {queue_flush_sum} items out of queues")
459 if self.hardware is not None:
460 self.hardware.close()
461 return True
464def output_process(
465 queue_container: QueueContainer, environments: list, output_active: mp.sharedctypes.Synchronized
466):
467 """Function passed to multiprocessing as the output process
469 This process creates the ``OutputProcess`` object and calls the ``run``
470 command.
472 Parameters
473 ----------
474 queue_container : QueueContainer
475 A container containing the queues used to communicate between
476 controller processes
477 environments : list
478 A list of ``(ControlType,environment_name)`` pairs that define the
479 environments in the controller.
481 """
483 output_instance = OutputProcess(TASK_NAME, queue_container, environments, output_active)
485 output_instance.run()