Coverage for / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / acquisition.py: 73%
241 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
1# -*- coding: utf-8 -*-
2"""
3Controller Subsystem that handles the reading of data from the hardware.
5Rattlesnake Vibration Control Software
6Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
7(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
8Government retains certain rights in this software.
10This program is free software: you can redistribute it and/or modify
11it under the terms of the GNU General Public License as published by
12the Free Software Foundation, either version 3 of the License, or
13(at your option) any later version.
15This program is distributed in the hope that it will be useful,
16but WITHOUT ANY WARRANTY; without even the implied warranty of
17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18GNU General Public License for more details.
20You should have received a copy of the GNU General Public License
21along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""
24import multiprocessing as mp
25import multiprocessing.sharedctypes # pylint: disable=unused-import
26from time import sleep, time
28import numpy as np
30from .abstract_message_process import AbstractMessageProcess
31from .utilities import (
32 GlobalCommands,
33 QueueContainer,
34 align_signals,
35 correlation_norm_signal_spec_ratio,
36 flush_queue,
37)
39DEBUG = False
40if DEBUG:
41 from glob import glob
43 FILE_OUTPUT = "debug_data/acquisition_{:}.npz"
46class AcquisitionProcess(AbstractMessageProcess):
47 """Class defining the acquisition behavior of the controller
49 This class will handle reading data from the hardware and then sending it
50 to the individual environment processes.
52 See AbstractMesssageProcess for inherited class members.
53 """
55 def __init__(
56 self,
57 process_name: str,
58 queue_container: QueueContainer,
59 environments: list,
60 acquisition_active: mp.sharedctypes.Synchronized,
61 ):
62 """
63 Constructor for the AcquisitionProcess class
65 Sets up the ``command_map`` and initializes all data members.
67 Parameters
68 ----------
69 process_name : str
70 The name of the process.
71 queue_container : QueueContainer
72 A container containing the queues used to communicate between
73 controller processes
74 environments : list
75 A list of ``(ControlType,environment_name)`` pairs that define the
76 environments in the controller.
79 """
80 super().__init__(
81 process_name,
82 queue_container.log_file_queue,
83 queue_container.acquisition_command_queue,
84 queue_container.gui_update_queue,
85 )
86 self.map_command(
87 GlobalCommands.INITIALIZE_DATA_ACQUISITION, self.initialize_data_acquisition
88 )
89 self.map_command(GlobalCommands.RUN_HARDWARE, self.acquire_signal)
90 self.map_command(GlobalCommands.STOP_HARDWARE, self.stop_acquisition)
91 self.map_command(GlobalCommands.STOP_ENVIRONMENT, self.stop_environment)
92 self.map_command(GlobalCommands.START_STREAMING, self.start_streaming)
93 self.map_command(GlobalCommands.STOP_STREAMING, self.stop_streaming)
95 # Communication
96 self.queue_container = queue_container
97 self.startup = True
98 self.shutdown_flag = False
99 self.any_environments_started = False
100 # Sampling data
101 self.sample_rate = None
102 self.read_size = None
103 # Environment Data
104 self.environment_list = [environment[1] for environment in environments]
105 self.environment_acquisition_channels = None
106 self.environment_active_flags = {
107 environment: False for environment in self.environment_list
108 }
109 self.environment_last_data = {environment: False for environment in self.environment_list}
110 self.environment_samples_remaining_to_read = {
111 environment: 0 for environment in self.environment_list
112 }
113 self.environment_first_data = {environment: None for environment in self.environment_list}
114 # Hardware data
115 self.hardware = None
116 # Streaming Information
117 self.streaming = False
118 self.has_streamed = False
119 # Persistent data
120 self.read_data = None
121 self.output_indices = None
122 # Abort and Warning Limits
123 self.abort_limits = None
124 self.warning_limits = None
125 self._acquisition_active = acquisition_active
126 # print('acquisition setup')
128 @property
129 def acquisition_active(self):
130 """Returns True if the acquisition is currently running"""
131 return bool(self._acquisition_active.value)
133 @acquisition_active.setter
134 def acquisition_active(self, val):
135 # print('output currently active: {:}'.format(self.acquisition_active))
136 # print('setting acquisition active')
137 if val:
138 self._acquisition_active.value = 1
139 else:
140 self._acquisition_active.value = 0
141 # print('set acquisition active')
143 def initialize_data_acquisition(self, data):
144 """Sets up the acquisition according to the specified parameters
146 Parameters
147 ----------
148 data : tuple
149 A tuple consisting of data acquisition parameters and the channels
150 used by each environment.
152 """
153 self.log("Initializing Data Acquisition")
154 # Pull out information from the queue
155 data_acquisition_parameters, self.environment_acquisition_channels = data
156 # Store pertinent data
157 self.sample_rate = data_acquisition_parameters.sample_rate
158 self.read_size = data_acquisition_parameters.samples_per_read
159 # Check which type of hardware we have
160 if self.hardware is not None:
161 self.hardware.close()
162 if data_acquisition_parameters.hardware == 0:
163 from .nidaqmx_hardware_multitask import NIDAQmxAcquisition
165 self.hardware = NIDAQmxAcquisition(
166 data_acquisition_parameters.extra_parameters["task_trigger"],
167 data_acquisition_parameters.extra_parameters["task_trigger_output_channel"],
168 )
169 elif data_acquisition_parameters.hardware == 1:
170 from .lanxi_hardware_multiprocessing import LanXIAcquisition
172 self.hardware = LanXIAcquisition(
173 data_acquisition_parameters.extra_parameters["maximum_acquisition_processes"]
174 )
175 elif data_acquisition_parameters.hardware == 2:
176 from .data_physics_hardware import DataPhysicsAcquisition
178 self.hardware = DataPhysicsAcquisition(
179 data_acquisition_parameters.hardware_file,
180 self.queue_container.single_process_hardware_queue,
181 )
182 elif data_acquisition_parameters.hardware == 3:
183 from .data_physics_dp900_hardware import DataPhysicsDP900Acquisition
185 self.hardware = DataPhysicsDP900Acquisition(
186 data_acquisition_parameters.hardware_file,
187 self.queue_container.single_process_hardware_queue,
188 )
189 elif data_acquisition_parameters.hardware == 4:
190 from .exodus_modal_solution_hardware import ExodusAcquisition
192 self.hardware = ExodusAcquisition(
193 data_acquisition_parameters.hardware_file,
194 self.queue_container.single_process_hardware_queue,
195 )
196 elif data_acquisition_parameters.hardware == 5:
197 from .state_space_virtual_hardware import StateSpaceAcquisition
199 self.hardware = StateSpaceAcquisition(
200 data_acquisition_parameters.hardware_file,
201 self.queue_container.single_process_hardware_queue,
202 )
203 elif data_acquisition_parameters.hardware == 6:
204 from .sdynpy_system_virtual_hardware import SDynPySystemAcquisition
206 self.hardware = SDynPySystemAcquisition(
207 data_acquisition_parameters.hardware_file,
208 self.queue_container.single_process_hardware_queue,
209 )
210 elif data_acquisition_parameters.hardware == 7:
211 from .sdynpy_frf_virtual_hardware import SDynPyFRFAcquisition
213 self.hardware = SDynPyFRFAcquisition(
214 data_acquisition_parameters.hardware_file,
215 self.queue_container.single_process_hardware_queue,
216 )
217 else:
218 raise ValueError("Invalid Hardware or Hardware Not Implemented!")
219 # Initialize hardware and create channels
220 self.hardware.set_up_data_acquisition_parameters_and_channels(
221 data_acquisition_parameters, data_acquisition_parameters.channel_list
222 )
223 # Set up warning and abort limits
224 self.abort_limits = []
225 self.warning_limits = []
226 for channel in data_acquisition_parameters.channel_list:
227 try:
228 warning_limit = float(channel.warning_level)
229 except (ValueError, TypeError):
230 warning_limit = float("inf") # Never warn on this channel
231 try:
232 abort_limit = float(channel.abort_level)
233 except (ValueError, TypeError):
234 abort_limit = float("inf") # Never abort on this channel if not specified
235 self.warning_limits.append(warning_limit)
236 self.abort_limits.append(abort_limit)
237 self.abort_limits = np.array(self.abort_limits)
238 self.warning_limits = np.array(self.warning_limits)
239 self.output_indices = [
240 index
241 for index, channel in enumerate(data_acquisition_parameters.channel_list)
242 if not (channel.feedback_device is None) and not (channel.feedback_device.strip() == "")
243 ]
244 self.read_data = np.zeros(
245 (
246 len(data_acquisition_parameters.channel_list),
247 4
248 * np.max(
249 [
250 data_acquisition_parameters.samples_per_read,
251 data_acquisition_parameters.samples_per_write
252 // data_acquisition_parameters.output_oversample,
253 ]
254 ),
255 )
256 )
258 def stop_environment(self, data):
259 """Sets flags stating that the specified environment will be ending.
261 Parameters
262 ----------
263 data : str
264 The environment name that should be deactivated
266 """
267 self.log(f"Deactivating Environment {data}")
268 self.environment_active_flags[data] = False
269 self.environment_last_data[data] = True
270 self.environment_samples_remaining_to_read[data] = self.hardware.get_acquisition_delay()
272 def start_streaming(self, data): # pylint: disable=unused-argument
273 """Sets the flag to tell the acquisition to write data to disk
275 Parameters
276 ----------
277 data : Ignored
278 This parameter is not used by the function but must be present
279 due to the calling signature of functions called through the
280 ``command_map``
282 """
283 self.streaming = True
284 if self.has_streamed:
285 self.queue_container.streaming_command_queue.put(
286 self.process_name, (GlobalCommands.CREATE_NEW_STREAM, None)
287 )
288 else:
289 self.has_streamed = True
291 def stop_streaming(self, data): # pylint: disable=unused-argument
292 """Sets the flag to tell the acquisition to not write data to disk
294 Parameters
295 ----------
296 data : Ignored
297 This parameter is not used by the function but must be present
298 due to the calling signature of functions called through the
299 ``command_map``
301 """
302 self.streaming = False
304 def acquire_signal(self, data):
305 """The main acquisition loop of the controller.
307 If it is the first time through this loop, startup will be set to True
308 and the hardware will be started.
310 If it is the last time through this loop, the hardware will be shut
311 down.
313 The function will simply read the data from the hardware and pass it
314 to any active environment and to the streaming process if the process
315 is active.
317 Parameters
318 ----------
319 data : Ignored
320 This parameter is not used by the function but must be present
321 due to the calling signature of functions called through the
322 ``command_map``
324 """
325 if self.startup:
326 self.any_environments_started = False
327 self.log("Waiting for Output to Start")
328 start_wait_time = time()
329 while True:
330 # Try to get data from the measurement if we can
331 try:
332 environment, data = self.queue_container.input_output_sync_queue.get_nowait()
333 except mp.queues.Empty:
334 if time() - start_wait_time > 30:
335 self.queue_container.gui_update_queue.put(
336 (
337 "error",
338 (
339 "Acquisition Error",
340 "Acquisition timed out waiting for output to start. "
341 "Check output task for errors!",
342 ),
343 )
344 )
345 break
346 sleep(0.1)
347 continue
348 if environment is None:
349 self.log("Detected Output Started")
350 break
351 else:
352 self.log(f"Listening for first data for environment {environment}")
353 self.environment_first_data[environment] = data
354 self.any_environments_started = True
355 self.log("Starting Hardware Acquisition")
356 self.hardware.start()
357 self.startup = False
358 self.acquisition_active = True
359 # print('started acquisition')
360 self.get_first_output_data()
361 if (
362 self.shutdown_flag # We're shutting down
363 and all(
364 [not flag for environment, flag in self.environment_active_flags.items()]
365 ) # All the environments are inactive
366 and all(
367 [flag is None for environment, flag in self.environment_first_data.items()]
368 ) # All the environments are not starting
369 and all(
370 [not flag for environment, flag in self.environment_last_data.items()]
371 ) # None of the environments are expecting their last data
372 ):
373 self.log("Acquiring Remaining Data")
374 read_data = self.hardware.read_remaining()
375 self.add_data_to_buffer(read_data)
376 if read_data.shape[-1] != 0:
377 max_vals = np.max(np.abs(read_data), axis=-1)
378 self.gui_update_queue.put(("monitor", max_vals))
379 warn_channels = max_vals > self.warning_limits
380 if np.any(warn_channels):
381 warning_numbers = [i + 1 for i in range(len(warn_channels)) if warn_channels[i]]
382 print(f"Channels {warning_numbers} Reached Warning Limit")
383 self.log(f"Channels {warning_numbers} Reached Warning Limit")
384 abort_channels = max_vals > self.abort_limits
385 if np.any(abort_channels):
386 abort_numbers = [i + 1 for i in range(len(abort_channels)) if abort_channels[i]]
387 print(f"Channels {abort_numbers} Reached Abort Limit")
388 self.log(f"Channels {abort_numbers} Reached Abort Limit")
389 # Don't stop because we're already shutting down.
390 self.hardware.stop()
391 self.shutdown_flag = False
392 self.startup = True
393 # print('{:} {:}'.format(self.streaming,self.any_environments_started))
394 if self.streaming and self.any_environments_started:
395 self.queue_container.streaming_command_queue.put(
396 self.process_name, (GlobalCommands.STREAMING_DATA, read_data.copy())
397 )
398 self.streaming = False
399 if self.has_streamed and self.any_environments_started:
400 self.queue_container.streaming_command_queue.put(
401 self.process_name, (GlobalCommands.FINALIZE_STREAMING, None)
402 )
403 self.has_streamed = False
404 self.acquisition_active = False
405 self.log("Acquisition Shut Down")
406 else:
407 aquiring_environments = [
408 name for name, flag in self.environment_active_flags.items() if flag
409 ]
410 self.log(f"Acquiring Data for {aquiring_environments} environments")
411 read_data = self.hardware.read()
412 self.add_data_to_buffer(read_data)
413 if read_data.shape[-1] != 0:
414 max_vals = np.max(np.abs(read_data), axis=-1)
415 self.gui_update_queue.put(("monitor", max_vals))
416 warn_channels = max_vals > self.warning_limits
417 if np.any(warn_channels):
418 warning_numbers = [i + 1 for i in range(len(warn_channels)) if warn_channels[i]]
419 print(f"Channels {warning_numbers} Reached Warning Limit")
420 self.log(f"Channels {warning_numbers} Reached Warning Limit")
421 abort_channels = max_vals > self.abort_limits
422 if np.any(abort_channels):
423 abort_numbers = [i + 1 for i in range(len(abort_channels)) if abort_channels[i]]
424 print(f"Channels {abort_numbers} Reached Abort Limit")
425 self.log(f"Channels {abort_numbers} Reached Abort Limit")
426 self.gui_update_queue.put(("stop", None))
428 # Send the data to the different channels
429 for environment in self.environment_list:
430 # Check to see if we're waiting for the first data for this environment
431 if self.environment_first_data[environment] is not None:
432 if np.all(np.abs(self.environment_first_data[environment]) < 1e-10):
433 delay = -self.read_size
434 else:
435 correlation_start_time = time()
436 if DEBUG:
437 num_files = len(glob(FILE_OUTPUT.format("*")))
438 np.savez(
439 FILE_OUTPUT.format(num_files),
440 read_data_buffer=self.read_data,
441 read_data=read_data,
442 output_indices=self.output_indices,
443 first_data=self.environment_first_data[environment],
444 )
445 _, delay, _, _ = align_signals(
446 self.read_data[self.output_indices],
447 self.environment_first_data[environment],
448 perform_subsample=False,
449 correlation_threshold=0.5,
450 correlation_metric=correlation_norm_signal_spec_ratio,
451 )
452 correlation_end_time = time()
453 corr_time = correlation_end_time - correlation_start_time
454 self.log(
455 f"Correlation check for environment {environment} took "
456 f"{corr_time:0.2f} seconds"
457 )
458 # Adding a criteria that the delay must be in the first half
459 # of the buffer, otherwise we could still be increasing
460 # in correlation as more data is acquired. If it's in
461 # the first half, it means that we have acquired more
462 # data and the best match did not improve
463 if delay is None or delay > self.read_data.shape[-1] // 2:
464 continue
465 self.log(f"Found First Data for Environment {environment}")
466 environment_data = self.read_data[
467 self.environment_acquisition_channels[environment], delay:
468 ]
469 if DEBUG:
470 np.savez(
471 f"debug_data/environment_first_data_{environment}.npz",
472 found_data=environment_data,
473 expected_data=self.environment_first_data[environment],
474 )
475 self.environment_first_data[environment] = None
476 if not self.environment_last_data[environment]:
477 self.environment_active_flags[environment] = True
478 else:
479 self.log(
480 f"Already received environment {environment} "
481 "shutdown signal, not starting"
482 )
483 # Check to see if the environment is active
484 elif (
485 self.environment_active_flags[environment]
486 or self.environment_last_data[environment]
487 ):
488 environment_data = read_data[
489 self.environment_acquisition_channels[environment]
490 ].copy()
491 # Otherwise the environment isn't active
492 else:
493 continue
494 if self.environment_last_data[environment]:
495 self.environment_samples_remaining_to_read[environment] -= self.read_size
496 self.log(
497 f"Reading last data for {environment}, "
498 f"{self.environment_samples_remaining_to_read[environment]} samples "
499 f"remaining"
500 )
501 environment_finished = (
502 self.environment_last_data[environment]
503 and self.environment_samples_remaining_to_read[environment] <= 0
504 )
505 self.log(f"Sending {environment_data.shape} data to {environment} environment")
506 self.queue_container.environment_data_in_queues[environment].put(
507 (environment_data, environment_finished)
508 )
509 if environment_finished:
510 self.environment_last_data[environment] = False
511 self.log(f"Delivered last data to {environment}")
512 # np.savez('test_data/acquisition_data_check.npz',
513 # read_data = self.read_data,
514 # environment_data = environment_data,
515 # environment_channels = self.environment_acquisition_channels[environment])
516 self.queue_container.acquisition_command_queue.put(
517 self.process_name, (GlobalCommands.RUN_HARDWARE, None)
518 )
519 # print('{:} {:}'.format(self.streaming,self.any_environments_started))
520 if self.streaming and self.any_environments_started:
521 self.queue_container.streaming_command_queue.put(
522 self.process_name, (GlobalCommands.STREAMING_DATA, read_data.copy())
523 )
525 def add_data_to_buffer(self, data):
526 """Adds data to the end of the buffer and shifts existing in the buffer forward
528 Parameters
529 ----------
530 data : np.ndarray
531 A 2D array with shape num_channels x num_samples
532 """
533 # Roll the buffer with new data
534 read_size = data.shape[-1]
535 if read_size != 0:
536 self.read_data[..., :-read_size] = self.read_data[..., read_size:]
537 self.read_data[..., -read_size:] = data
539 def get_first_output_data(self):
540 """Searches through the sync queue for first data packages from the output process"""
541 first_output_data = flush_queue(self.queue_container.input_output_sync_queue)
542 for environment, data in first_output_data:
543 self.log(f"Listening for first data for environment {environment}")
544 self.environment_first_data[environment] = data
545 self.any_environments_started = True
547 def stop_acquisition(self, data): # pylint: disable=unused-argument
548 """Sets a flag telling the acquisition that it should start shutting down
550 Parameters
551 ----------
552 data : Ignored
553 This parameter is not used by the function but must be present
554 due to the calling signature of functions called through the
555 ``command_map``
557 """
558 self.shutdown_flag = True
560 def quit(self, data):
561 """Stops the process and shuts down the hardware if necessary.
563 Parameters
564 ----------
565 data : Ignored
566 This parameter is not used by the function but must be present
567 due to the calling signature of functions called through the
568 ``command_map``
569 """
570 # Pull any data off the queues that have been put to
571 queue_flush_sum = 0
572 for queue in [q for name, q in self.queue_container.environment_data_in_queues.items()] + [
573 self.queue_container.acquisition_command_queue
574 ]:
575 queue_flush_sum += len(flush_queue(queue))
576 self.log(f"Flushed {queue_flush_sum} items out of queues")
577 if self.hardware is not None:
578 self.hardware.close()
579 return True
582def acquisition_process(
583 queue_container: QueueContainer,
584 environments: list,
585 acquisition_active: mp.sharedctypes.Synchronized,
586):
587 """Function passed to multiprocessing as the acquisition process
589 This process creates the ``AcquisitionProcess`` object and calls the ``run``
590 command.
592 Parameters
593 ----------
594 queue_container : QueueContainer
595 A container containing the queues used to communicate between
596 controller processes
597 environments : list
598 A list of ``(ControlType,environment_name)`` pairs that define the
599 environments in the controller.
601 """
603 acquisition_instance = AcquisitionProcess(
604 "Acquisition", queue_container, environments, acquisition_active
605 )
607 acquisition_instance.run()