Coverage for / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / data_collector.py: 79%
356 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
1# -*- coding: utf-8 -*-
2"""
3A general subprocess that collects data, splits it into measurement frames, then
4sends it to the environment.
6Rattlesnake Vibration Control Software
7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
9Government retains certain rights in this software.
11This program is free software: you can redistribute it and/or modify
12it under the terms of the GNU General Public License as published by
13the Free Software Foundation, either version 3 of the License, or
14(at your option) any later version.
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License for more details.
21You should have received a copy of the GNU General Public License
22along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""
25import copy
26import multiprocessing as mp
27from enum import Enum
28from time import sleep
29from typing import List
31import numpy as np
32import scipy.signal as sig
33from scipy.fft import rfft
35from .abstract_message_process import AbstractMessageProcess
36from .utilities import VerboseMessageQueue, flush_queue, load_python_module, rms_time
38DEBUG = False
40if DEBUG:
41 import pickle
44class FrameBuffer:
45 """A class that stores most recently acquired data in a buffer to facilitate overlapping,
46 triggering, and spectral processing"""
48 def __init__(
49 self,
50 num_channels,
51 trigger_index,
52 pretrigger,
53 positive_slope,
54 trigger_level,
55 hysteresis_level,
56 hysteresis_samples,
57 samples_per_frame,
58 maximum_overlap,
59 manual_accept,
60 trigger_enabled,
61 trigger_only_first,
62 wait_samples,
63 dtype="float64",
64 starting_value=np.nan,
65 buffer_size_frame_multiplier=2,
66 ):
67 """Initializes a framebuffer object
69 Parameters
70 ----------
71 num_channels : int
72 The number of signals in the buffer
73 trigger_index : int
74 The signal index to use for a trigger. Only used if trigger_enabled is True
75 pretrigger : float
76 The fraction of the frame sized used for a pretrigger
77 positive_slope : bool
78 If True, the trigger is detected with a positive slow. If False, a negative slope.
79 trigger_level : float
80 The value of the signal required to activate the trigger
81 hysteresis_level : float
82 The value of the signal required to reset the trigger
83 hysteresis_samples : int
84 The number of samples required to satisfy the hysteresis level to reset the trigger
85 samples_per_frame : int
86 The number of samples per measurement frame
87 maximum_overlap : float
88 The fraction of the frame overlapping with the next frame
89 manual_accept : bool
90 If True, wait for an acceptance before returning data
91 trigger_enabled : bool
92 If True, data will only be returned after a trigger. If False, all data will be
93 returned in a "free run"
94 trigger_only_first : bool
95 If True, only the first frame requires a trigger, and all remaining frames will be
96 "free run"
97 wait_samples : int
98 The number of samples to wait before returning a frame; for example, to wait until a
99 system is at steady state
100 dtype : str, optional
101 A dtype designator in string format for the type of data in the buffer, by default
102 "float64"
103 starting_value : float, optional
104 Initial values in the buffer, by default np.nan
105 buffer_size_frame_multiplier : int, optional
106 Buffer size as specified by a multiplier on the frame size, by default 2
107 """
108 self.samples_per_frame = samples_per_frame
109 self.trigger_index = trigger_index
110 self.pretrigger_samples = int(pretrigger * samples_per_frame) if trigger_enabled else 0
111 self.positive_slope = positive_slope
112 self.trigger_level = trigger_level
113 self.hysteresis_level = hysteresis_level
114 self.hysteresis_samples = hysteresis_samples
115 self.samples_per_frame = samples_per_frame
116 self.overlap_samples = samples_per_frame - int(maximum_overlap * samples_per_frame)
117 self.manual_accept = manual_accept
118 self.waiting_for_accept = False
119 self._buffer = starting_value * np.ones(
120 (
121 num_channels,
122 int(np.ceil(buffer_size_frame_multiplier * samples_per_frame)),
123 ),
124 dtype=dtype,
125 )
126 self.buffer_size_frame_multiplier = buffer_size_frame_multiplier
127 self.wait_samples = wait_samples
128 self.last_trigger = self.overlap_samples - self.wait_samples
129 self.last_reset = self.overlap_samples + 1 - self.wait_samples
130 self.trigger_enabled = trigger_enabled
131 self.trigger_only_first = trigger_only_first
132 self.first_trigger = True
134 @property
135 def buffer_data(self):
136 """Gets the data currently in the buffer"""
137 return self._buffer
139 def add_data(self, data):
140 """Adds the provided data to the buffer
142 Parameters
143 ----------
144 data : np.ndarray
145 Data to add to the buffer
146 """
147 data = np.array(data)
148 self.last_trigger += data.shape[-1]
149 self.last_reset += data.shape[-1]
150 # Make sure the data will fit into the buffer
151 data = data[..., -self.buffer_data.shape[-1] :]
152 # Figure out how much we need to roll the buffer
153 self.buffer_data[:] = np.concatenate(
154 (self.buffer_data[..., data.shape[-1] :], data), axis=-1
155 )
157 def find_triggers(self):
158 """Goes through the buffer and finds triggers to denote a measurement frame"""
159 # print('Finding Triggers, first trigger {:}'.format(self.first_trigger))
160 if self.manual_accept and self.waiting_for_accept:
161 # print('Waiting for Accept')
162 return []
163 if self.trigger_enabled and (
164 (self.trigger_only_first and self.first_trigger) or not self.trigger_only_first
165 ):
166 # print('Getting trigger based on signal')
167 trigger_data = self.buffer_data[
168 self.trigger_index,
169 self.pretrigger_samples : self.samples_per_frame + self.pretrigger_samples,
170 ]
171 if self.positive_slope:
172 indices = (trigger_data[:-1] < self.trigger_level) & (
173 trigger_data[1:] > self.trigger_level
174 )
175 reset_indices = trigger_data < self.hysteresis_level
176 else:
177 indices = (trigger_data[:-1] > self.trigger_level) & (
178 trigger_data[1:] < self.trigger_level
179 )
180 reset_indices = trigger_data > self.hysteresis_level
181 if self.hysteresis_samples > 1:
182 zeros = ~reset_indices
183 iszero = np.concatenate(([0], np.equal(zeros, 0).view(np.int8), [0]))
184 absdiff = np.abs(np.diff(iszero))
185 ranges = np.where(absdiff == 1)[0].reshape(-1, 2)
186 reset_indices = np.array(
187 [r[-1] - 1 for r in ranges if r[1] - r[0] > self.hysteresis_samples - 2]
188 )
189 triggers = list(
190 self.buffer_size_frame_multiplier * self.samples_per_frame
191 - (np.where(indices)[0] + 1 + self.pretrigger_samples)
192 )
193 resets = np.concatenate(
194 [
195 [self.last_reset],
196 self.buffer_size_frame_multiplier * self.samples_per_frame
197 - reset_indices
198 - self.pretrigger_samples,
199 ]
200 )
202 final_triggers = []
204 while len(triggers) > 0:
205 trigger = triggers.pop(0)
206 # Check to see if the trigger is far enough away from the last one
207 if self.last_trigger - trigger < self.overlap_samples:
208 continue
209 # Check to see if there has been a reset since the last trigger
210 if not np.any((resets < self.last_trigger) & (resets > trigger)):
211 continue
212 if self.trigger_only_first and not self.first_trigger:
213 continue
214 final_triggers.append(trigger)
215 self.first_trigger = False
216 self.last_trigger = trigger
218 self.last_reset = resets.min()
220 if self.manual_accept and len(final_triggers) > 0:
221 self.last_trigger = self.overlap_samples
222 self.last_reset = self.overlap_samples - 1
223 self.waiting_for_accept = True
224 return [final_triggers[0]]
225 else:
226 return final_triggers
227 else:
228 # Get the next triggers that are in the data
229 # print('Getting trigger based on spacing')
230 last_trigger_rectified = (
231 self.buffer_size_frame_multiplier * self.samples_per_frame - self.last_trigger
232 )
233 triggers_available = int(
234 (self.samples_per_frame - last_trigger_rectified) / self.overlap_samples
235 )
236 final_triggers = [
237 self.last_trigger - (i + 1) * self.overlap_samples
238 for i in range(triggers_available)
239 ]
240 if len(final_triggers) > 0:
241 self.last_trigger = final_triggers[-1]
242 return final_triggers
244 def reset_trigger(self):
245 """Resets the last trigger in the buffer"""
246 self.last_trigger = self.overlap_samples - self.wait_samples
247 self.last_reset = self.overlap_samples - 1 - self.wait_samples
249 def accept(self):
250 """Manually accept the last frame"""
251 self.last_trigger = self.overlap_samples
252 self.last_reset = self.overlap_samples - 1
253 self.waiting_for_accept = False
255 def add_data_get_frame(self, data):
256 """Add data and get the next measurement frame"""
257 self.add_data(data)
258 # print('Last Trigger: {:}'.format(self.last_trigger))
259 triggers = self.find_triggers()
260 # print('Triggers: {:}'.format(triggers))
261 frame_indices = (
262 self.buffer_size_frame_multiplier * self.samples_per_frame
263 - np.array(triggers)[:, np.newaxis]
264 + np.arange(self.samples_per_frame)
265 - self.pretrigger_samples
266 ).astype(int)
267 return np.moveaxis(self.buffer_data[:, frame_indices], 1, 0)
269 def __getitem__(self, key):
270 return self._buffer[key]
272 def __setitem__(self, key, val):
273 self._buffer[key] = val
276class KurtosisBuffer:
277 """A buffer that computes a running kurtosis value"""
279 def __init__(self, n_channels: int, averages: int = 100) -> None:
280 # this will keep track of our place in the buffers (alternative to using np.roll, this is
281 # more efficient since we don't actually care what order the buffer is in)
282 self.idx = 0
283 self.averages = averages # number of frames to keep for kurtosis calculation
284 self.g0 = np.zeros((n_channels, averages)) # number of samples per frame
285 self.g1 = np.zeros((n_channels, averages)) # sum of samples per frame
286 self.g2 = np.zeros(
287 (n_channels, averages)
288 ) # sum of (second moments * samples/frame) per frame
289 self.g3 = np.zeros(
290 (n_channels, averages)
291 ) # sum of (third moments * samples/frame) per frame
292 self.g4 = np.zeros(
293 (n_channels, averages)
294 ) # sum of (fourth moments * samples/frame) per frame
296 def clear(self) -> None:
297 """Clears the kurtosis buffer"""
298 self.idx = 0
299 self.g0[:] = 0.0
300 self.g1[:] = 0.0
301 self.g2[:] = 0.0
302 self.g3[:] = 0.0
303 self.g4[:] = 0.0
305 def add_data(self, arr: np.ndarray, axis=-1) -> None:
306 """Adds data to the kurtosis buffer
308 Choi M, Sweetman B. Efficient Calculation of Statistical Moments for Structural Health
309 Monitoring. Structural Health Monitoring. 2009;9(1):13-24. doi:10.1177/1475921709341014
310 (https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance)
312 Implements a method of moments approach used for kurtosis calculation. Gamma values
313 are calculated using raw moments and sample length. These gamma values can be combined
314 additively as new data appears, at which point the raw moments can be backed out. Using
315 known relationships between raw moments and central moments, we can calculate kurtosis.
317 Parameters
318 ----------
319 arr : np.ndarray
320 A numpy array containing the data to add
321 axis : int, optional
322 The axis across which the buffer will be computed, by default -1
323 """
325 # Calculate gamma values of new data (raw moment * sample length, equivalent to sum of
326 # moments if time delta is constant)
327 self.g0[:, self.idx] = arr.shape[
328 axis
329 ] # gamma_0 is taken to be number of points (assuming constant time delta)
330 self.g1[:, self.idx] = np.sum(arr, axis=axis)
331 self.g2[:, self.idx] = np.sum(arr**2, axis=axis)
332 self.g3[:, self.idx] = np.sum(arr**3, axis=axis)
333 self.g4[:, self.idx] = np.sum(arr**4, axis=axis)
335 # increment our index (wrap around if buffer is full)
336 self.idx = (self.idx + 1) % self.averages
338 def get_kurtosis(self, fisher=False) -> None:
339 """Gets the current kurtosis values
341 Parameters
342 ----------
343 fisher : bool, optional
344 If True, the fisher kurtosis (excess kurtosis) is presented. If False, the regular
345 kurtosis is presented (e.g. 3 for a normal distribution), by default False
347 Returns
348 -------
349 kurtosis values for each channel
350 """
351 # sum the gamma values that are in the buffer
352 g0 = np.sum(self.g0, axis=-1)
353 g1 = np.sum(self.g1, axis=-1)
354 g2 = np.sum(self.g2, axis=-1)
355 g3 = np.sum(self.g3, axis=-1)
356 g4 = np.sum(self.g4, axis=-1)
358 # back out raw moments from gamma values
359 m1 = g1 / g0
360 m2 = g2 / g0
361 m3 = g3 / g0
362 m4 = g4 / g0
364 # compute central moments from raw moments
365 c2 = m2 - (m1**2)
366 # c3 = m3 - (3*m1*m2) + (2*(m1**3)) # not needed for kurtosis
367 c4 = m4 - (4 * m1 * m3) + (6 * (m1**2) * m2) - (3 * (m1**4))
369 # compute kurtosis
370 k = c4 / (c2**2)
371 return k - 3 if fisher else k
374class DataCollectorCommands(Enum):
375 """Commands that the Random Data Collector Process can accept"""
377 INITIALIZE_COLLECTOR = 1
378 FORCE_INITIALIZE_COLLECTOR = 2
379 ACQUIRE = 3
380 STOP = 4
381 ACCEPT = 5
382 SET_TEST_LEVEL = 6
383 ACCEPTED = 7
384 SHUTDOWN_ACHIEVED = 8
385 CLEAR_KURTOSIS_BUFFER = 9
388class AcquisitionType(Enum):
389 """Enumeration of different triggering strategies"""
391 FREE_RUN = 0
392 TRIGGER_EVERY_FRAME = 1
393 TRIGGER_FIRST_FRAME = 2
396class Acceptance(Enum):
397 """Enumeration of different acceptance strategies"""
399 MANUAL = 0
400 AUTOMATIC = 1
403class TriggerSlope(Enum):
404 """Enumeration of valid trigger slopes"""
406 POSITIVE = 0
407 NEGATIVE = 1
410class Window(Enum):
411 """Enumeration of valid window functions"""
413 RECTANGLE = 0
414 HANN = 1
415 HAMMING = 2
416 FLATTOP = 3
417 TUKEY = 4
418 BLACKMANHARRIS = 5
419 EXPONENTIAL = 6
420 EXPONENTIAL_FORCE = 7
423class CollectorMetadata:
424 """Metadata associated with the data collector"""
426 def __init__(
427 self,
428 num_channels,
429 response_channel_indices,
430 reference_channel_indices,
431 acquisition_type,
432 acceptance,
433 acceptance_function,
434 overlap_fraction,
435 trigger_channel_index,
436 trigger_slope,
437 trigger_level,
438 trigger_hysteresis,
439 trigger_hysteresis_samples,
440 pretrigger_fraction,
441 frame_size,
442 window,
443 window_parameter_1=0,
444 window_parameter_2=0,
445 window_parameter_3=0,
446 wait_samples=0,
447 kurtosis_buffer_length=None,
448 response_transformation_matrix=None,
449 reference_transformation_matrix=None,
450 ):
451 """Initializes data collector metadata
453 Parameters
454 ----------
455 num_channels : int
456 The number of channels in the data acquisition
457 response_channel_indices : np.ndarray
458 Indices associated with control or response channels
459 reference_channel_indices : np.ndarray
460 Indices associated with drive or reference channels
461 acquisition_type : AcquisitionType
462 The type of acquisition used by the collector
463 acceptance : AcceptanceType
464 The type of frame acceptance used by the collector
465 acceptance_function : tuple
466 A tuple containing a path to a Python module and a function name in that module
467 that is used to automatically determine if a frame should be accepted
468 overlap_fraction : float
469 The fraction of the frame to overlap in the data collection
470 trigger_channel_index : int
471 The index of the channel used as the trigger
472 trigger_slope : TriggerSlope
473 The slope of the trigger
474 trigger_level : float
475 The trigger level
476 trigger_hysteresis : float
477 The level below which the trigger must return before another trigger can be obtained
478 trigger_hysteresis_samples : int
479 The number of samples the trigger must be below the hysteresis level before another
480 trigger can be obtained
481 pretrigger_fraction : float
482 The fraction of the frame used as a pretrigger
483 frame_size : int
484 The number of samples in a measurement frame
485 window : Window
486 The window function used by the collector
487 window_parameter_1 : float, optional
488 Optional parameters required by the window function, by default 0
489 window_parameter_2 : float, optional
490 Optional parameters required by the window function, by default 0
491 window_parameter_3 : float, optional
492 Optional parameters required by the window function, by default 0
493 wait_samples : int, optional
494 Number of samples to wait before returning a frame, by default 0
495 kurtosis_buffer_length : int, optional
496 The number of samples in the running kurtosis calculation.
497 response_transformation_matrix : np.ndarray, optional
498 A transformation applied to the response channels, by default None
499 reference_transformation_matrix : np.ndarray, optional
500 A transformation applied to the reference channels, by default None
501 """
502 self.num_channels = num_channels
503 self.response_channel_indices = response_channel_indices
504 self.reference_channel_indices = reference_channel_indices
505 self.acquisition_type = acquisition_type
506 self.acceptance = acceptance
507 self.acceptance_function = acceptance_function
508 self.overlap_fraction = overlap_fraction
509 self.trigger_channel_index = trigger_channel_index
510 self.trigger_slope = trigger_slope
511 self.trigger_level = trigger_level
512 self.trigger_hysteresis = trigger_hysteresis
513 self.trigger_hysteresis_samples = trigger_hysteresis_samples
514 self.pretrigger_fraction = pretrigger_fraction
515 self.frame_size = frame_size
516 self.window = window
517 self.window_parameter_1 = window_parameter_1
518 self.window_parameter_2 = window_parameter_2
519 self.window_parameter_3 = window_parameter_3
520 self.response_transformation_matrix = response_transformation_matrix
521 self.reference_transformation_matrix = reference_transformation_matrix
522 self.wait_samples = wait_samples
523 self.kurtosis_buffer_length = kurtosis_buffer_length
525 def __eq__(self, other):
526 try:
527 return np.all(
528 [np.all(value == other.__dict__[field]) for field, value in self.__dict__.items()]
529 )
530 except (AttributeError, KeyError):
531 return False
534class DataCollectorProcess(AbstractMessageProcess):
535 """Class that takes data from the data_in_queue and distributes to the environment
537 This class keeps track of the test level used when acquiring data so the
538 data can be scaled back to full level for control. It will also skip
539 frames that are acquired while the system is ramping."""
541 def __init__(
542 self,
543 process_name: str,
544 command_queue: VerboseMessageQueue,
545 data_in_queue: mp.queues.Queue,
546 data_out_queues: List[mp.queues.Queue],
547 environment_command_queue: VerboseMessageQueue,
548 log_file_queue: mp.queues.Queue,
549 gui_update_queue: mp.queues.Queue,
550 environment_name,
551 ):
552 """
553 Constructs the data collector class
555 Parameters
556 ----------
557 process_name : str
558 A name to assign the process, primarily for logging purposes.
559 queues : RandomEnvironmentQueues
560 A list of Random Environment queues for communcation with other parts
561 of the environment and the controller
562 environment_name : str
563 The name of the environment that this process is generating signals for.
565 """
566 super().__init__(process_name, log_file_queue, command_queue, gui_update_queue)
567 self.map_command(DataCollectorCommands.INITIALIZE_COLLECTOR, self.initialize_collector)
568 self.map_command(
569 DataCollectorCommands.FORCE_INITIALIZE_COLLECTOR,
570 self.force_initialize_collector,
571 )
572 self.map_command(DataCollectorCommands.ACQUIRE, self.acquire)
573 self.map_command(DataCollectorCommands.STOP, self.stop)
574 self.map_command(DataCollectorCommands.ACCEPT, self.accept)
575 self.map_command(DataCollectorCommands.SET_TEST_LEVEL, self.set_test_level)
576 self.map_command(DataCollectorCommands.CLEAR_KURTOSIS_BUFFER, self.clear_kurtosis_buffer)
577 self.environment_command_queue = environment_command_queue
578 self.environment_name = environment_name
579 self.collector_metadata = None
580 self.frame_buffer = None
581 self.kurtosis_buffer = None
582 self.reference_window = None
583 self.response_window = None
584 self.window_correction_factor = None
585 self.acceptance_function = None
586 self.skip_frames = 0
587 self.test_level = None
588 self.data_in_queue = data_in_queue
589 self.data_out_queues = data_out_queues
590 self.last_frame = None
591 self.window_correction = None
592 if DEBUG:
593 self.received_data_index = 0
595 def initialize_collector(self, data: CollectorMetadata):
596 """Initializes the collector with the provided metadata
598 Parameters
599 ----------
600 data : CollectorMetadata
601 An object containing metadata to define the collector
602 """
603 if not self.collector_metadata == data:
604 self.force_initialize_collector(data)
606 def force_initialize_collector(self, data: CollectorMetadata):
607 """Initializes the collector with the provided metadata even if the
608 metadata is already equivalent.
610 Parameters
611 ----------
612 data : CollectorMetadata
613 An object containing metadata to define the collector
614 """
615 # Flush the outputs to make sure that there's nothing hanging out on
616 # the queue when we start up.
617 for queue in self.data_out_queues:
618 flush_queue(queue)
619 self.collector_metadata = data
620 self.frame_buffer = FrameBuffer(
621 self.collector_metadata.num_channels,
622 self.collector_metadata.trigger_channel_index,
623 self.collector_metadata.pretrigger_fraction,
624 self.collector_metadata.trigger_slope == TriggerSlope.POSITIVE,
625 self.collector_metadata.trigger_level,
626 self.collector_metadata.trigger_hysteresis,
627 self.collector_metadata.trigger_hysteresis_samples,
628 self.collector_metadata.frame_size,
629 self.collector_metadata.overlap_fraction,
630 self.collector_metadata.acceptance == Acceptance.MANUAL,
631 self.collector_metadata.acquisition_type != AcquisitionType.FREE_RUN,
632 self.collector_metadata.acquisition_type == AcquisitionType.TRIGGER_FIRST_FRAME,
633 self.collector_metadata.wait_samples,
634 )
635 if self.collector_metadata.kurtosis_buffer_length is not None:
636 self.kurtosis_buffer = KurtosisBuffer(
637 self.collector_metadata.num_channels,
638 self.collector_metadata.kurtosis_buffer_length,
639 )
640 if self.collector_metadata.acceptance_function is None:
641 self.acceptance_function = lambda x: True
642 else:
643 module = load_python_module(self.collector_metadata.acceptance_function[0])
644 self.acceptance_function = getattr(
645 module, self.collector_metadata.acceptance_function[1]
646 )
647 self.log("Loaded acceptance function")
648 if self.collector_metadata.window == Window.RECTANGLE:
649 self.reference_window = 1
650 self.response_window = 1
651 elif self.collector_metadata.window == Window.HANN:
652 self.reference_window = sig.get_window("hann", self.collector_metadata.frame_size)
653 self.response_window = self.reference_window.copy()
654 elif self.collector_metadata.window == Window.HAMMING:
655 self.reference_window = sig.get_window("hamming", self.collector_metadata.frame_size)
656 self.response_window = self.reference_window.copy()
657 elif self.collector_metadata.window == Window.FLATTOP:
658 self.reference_window = sig.get_window("flattop", self.collector_metadata.frame_size)
659 self.response_window = self.reference_window.copy()
660 elif self.collector_metadata.window == Window.TUKEY:
661 self.reference_window = sig.get_window(
662 ("tukey", self.collector_metadata.window_parameter_1),
663 self.collector_metadata.frame_size,
664 )
665 self.response_window = self.reference_window.copy()
666 elif self.collector_metadata.window == Window.BLACKMANHARRIS:
667 self.reference_window = sig.get_window(
668 "blackmanharris", self.collector_metadata.frame_size
669 )
670 self.response_window = self.reference_window.copy()
671 elif self.collector_metadata.window == Window.EXPONENTIAL:
672 self.reference_window = sig.get_window(
673 (
674 "exponential",
675 self.collector_metadata.window_parameter_1,
676 self.collector_metadata.window_parameter_2,
677 ),
678 self.collector_metadata.frame_size,
679 )
680 self.response_window = self.reference_window.copy()
681 elif self.collector_metadata.window == Window.EXPONENTIAL_FORCE:
682 self.reference_window = sig.get_window(
683 (
684 "exponential",
685 self.collector_metadata.window_parameter_2,
686 self.collector_metadata.window_parameter_3,
687 ),
688 self.collector_metadata.frame_size,
689 )
690 self.response_window = self.reference_window.copy()
691 non_pulse_samples = (
692 np.arange(self.collector_metadata.frame_size) + 1
693 ) / self.collector_metadata.frame_size > self.collector_metadata.window_parameter_1
694 self.reference_window[non_pulse_samples] = 0
695 else:
696 raise ValueError("Invalid Window Type")
697 self.window_correction = np.sqrt(1 / np.mean(self.response_window**2))
698 if DEBUG:
699 with open("debug_data/collector_metadata.pkl", "wb") as f:
700 pickle.dump(self.collector_metadata, f)
701 with open("debug_data/framebuffer.pkl", "wb") as f:
702 pickle.dump(self.frame_buffer, f)
703 self.received_data_index = 0
705 def acquire(self, data): # pylint: disable=unused-argument
706 """Acquires data from the data_in_queue and sends to the environment
708 This function will take data and scale it by the test level, or skip
709 sending the data if the test level is currently changing. It will
710 also apply the transformation matrices if they are defined.
712 It will stop itself if the last data is acquired.
714 Parameters
715 ----------
716 data : Ignored
717 Unused argument required due to the expectation that functions called
718 by the RandomDataCollector.run function will have one argument
719 accepting any data passed along with the instruction.
720 """
721 try:
722 acquisition_data, last_data = self.data_in_queue.get(timeout=10)
723 self.log(f"Acquired Data with shape {acquisition_data.shape} and Last Data {last_data}")
724 self.log(f"Data Average RMS: {rms_time(acquisition_data):0.4f}")
725 except mp.queues.Empty:
726 # Keep running until stopped
727 # self.log('No Incoming Data!')
728 self.command_queue.put(self.process_name, (DataCollectorCommands.ACQUIRE, None))
729 return
730 # Add data to buffer
731 self.log("Putting Data to Buffer")
732 output_frames = self.frame_buffer.add_data_get_frame(acquisition_data)
733 if DEBUG:
734 np.save(
735 f"debug_data/acquisition_data_{self.received_data_index:05d}.npy",
736 acquisition_data,
737 )
738 np.save(
739 f"debug_data/output_frames_{self.received_data_index:05d}.npy",
740 output_frames,
741 )
742 with open(
743 f"debug_data/framebuffer_{self.received_data_index:05d}.pkl",
744 "wb",
745 ) as f:
746 pickle.dump(self.frame_buffer, f)
747 self.received_data_index += 1
748 if output_frames.shape[0] > 0:
749 self.log(f"Measurement Frames Received ({output_frames.shape[0]})")
750 for frame in output_frames:
751 if self.skip_frames > 0:
752 self.skip_frames -= 1
753 self.log(f"Skipped Frame, {self.skip_frames} left to skip")
754 # Reset the buffer. It isn't clear if this is needed, and in the current
755 # implementation it breaks things...
756 # self.frame_buffer.reset_trigger()
757 continue
758 frame = np.copy(frame)
759 accepted = self.acceptance_function(frame)
760 response_frame = frame[self.collector_metadata.response_channel_indices]
761 reference_frame = frame[self.collector_metadata.reference_channel_indices]
762 if self.collector_metadata.response_transformation_matrix is not None:
763 response_frame = (
764 self.collector_metadata.response_transformation_matrix @ response_frame
765 )
766 if self.collector_metadata.reference_transformation_matrix is not None:
767 reference_frame = (
768 self.collector_metadata.reference_transformation_matrix @ reference_frame
769 )
770 self.log(
771 f"Received output from framebuffer with RMS: \n "
772 f"{rms_time(reference_frame, axis=-1)}"
773 )
774 # Apply window functions
775 response_frame *= self.response_window / self.test_level
776 reference_frame *= self.reference_window / self.test_level
777 if accepted and not self.frame_buffer.manual_accept:
778 self.gui_update_queue.put(
779 (self.environment_name, ("time_frame", (frame, True)))
780 )
781 self.log("Sending data")
782 if self.collector_metadata.kurtosis_buffer_length is not None:
783 self.kurtosis_buffer.add_data(frame)
784 self.gui_update_queue.put(
785 (
786 self.environment_name,
787 ("kurtosis", self.kurtosis_buffer.get_kurtosis()),
788 )
789 )
790 # Separate into response and reference
791 response_fft = rfft(response_frame, axis=-1) * self.window_correction
792 reference_fft = rfft(reference_frame, axis=-1) * self.window_correction
793 for queue in self.data_out_queues:
794 queue.put(copy.deepcopy((response_fft, reference_fft)))
795 self.log("Sent Data")
796 elif self.frame_buffer.manual_accept:
797 self.last_frame = frame
798 self.gui_update_queue.put(
799 (self.environment_name, ("time_frame", (frame, False)))
800 )
801 else:
802 self.gui_update_queue.put(
803 (self.environment_name, ("time_frame", (frame, False)))
804 )
805 # Keep running until stopped
806 if not last_data:
807 self.command_queue.put(self.process_name, (DataCollectorCommands.ACQUIRE, None))
808 else:
809 self.stop(None)
811 def accept(self, keep_frame):
812 """Accepts or rejects the data
814 Parameters
815 ----------
816 keep_frame : bool
817 If True, the frame will be accepted. If False, it will be rejected.
818 """
819 self.log(f"Received Accept Signal {keep_frame}")
820 self.frame_buffer.accept()
821 if keep_frame:
822 self.log("Sending data manually")
823 self.gui_update_queue.put(
824 (self.environment_name, ("time_frame", (self.last_frame, True)))
825 )
826 frame_fft = rfft(self.last_frame, axis=-1) * self.window_correction
827 # Separate into response and reference
828 reference_fft = frame_fft[self.collector_metadata.reference_channel_indices]
829 response_fft = frame_fft[self.collector_metadata.response_channel_indices]
830 for queue in self.data_out_queues:
831 queue.put(copy.deepcopy((response_fft, reference_fft)))
832 self.log("Sent Data")
833 self.last_frame = None
834 self.environment_command_queue.put(
835 self.process_name, (DataCollectorCommands.ACCEPTED, keep_frame)
836 )
838 def stop(self, data): # pylint: disable=unused-argument
839 """Stops acquiring data from the data_in_queue and flushes queues.
841 Parameters
842 ----------
843 data : Ignored
844 Unused argument required due to the expectation that functions called
845 by the RandomDataCollector.run function will have one argument
846 accepting any data passed along with the instruction.
847 """
848 sleep(0.05)
849 self.log("Stopping Data Collection")
850 for queue in self.data_out_queues:
851 flush_queue(queue)
852 self.command_queue.flush(self.process_name)
853 self.frame_buffer.reset_trigger()
854 self.environment_command_queue.put(
855 self.process_name, (DataCollectorCommands.SHUTDOWN_ACHIEVED, None)
856 )
858 def set_test_level(self, data):
859 """Updates the value of the current test level due and sets the number
860 of frames to skip.
862 Parameters
863 ----------
864 data : tuple
865 Tuple containing the number of frames to skip and the new test
866 level
868 """
869 self.skip_frames, self.test_level = data
870 self.log(
871 f"Setting Test Level to {self.test_level}, skipping next {self.skip_frames} frames"
872 )
874 def clear_kurtosis_buffer(self, data): # pylint: disable=unused-argument
875 """Clears the kurtosis buffer
877 Parameters
878 ----------
879 data : ignored
880 The argument is not used, but is required by the calling signature of functions that
881 get called via the command map.
882 """
883 if self.kurtosis_buffer is not None:
884 self.kurtosis_buffer.clear()
887def data_collector_process(
888 environment_name: str,
889 command_queue: VerboseMessageQueue,
890 data_in_queue: mp.queues.Queue,
891 data_out_queues: List[mp.queues.Queue],
892 environment_command_queue: VerboseMessageQueue,
893 log_file_queue: mp.queues.Queue,
894 gui_update_queue: mp.queues.Queue,
895 process_name: str = None,
896):
897 """Random vibration data collector process function called by multiprocessing
899 This function defines the Random Vibration Data Collector process that
900 gets run by the multiprocessing module when it creates a new process. It
901 creates a ModalDataCollectorProcess object and runs it.
903 Parameters
904 ----------
905 environment_name : str :
906 Name of the environment associated with this signal generation process
907 """
909 data_collector_instance = DataCollectorProcess(
910 environment_name + " Data Collector" if process_name is None else process_name,
911 command_queue,
912 data_in_queue,
913 data_out_queues,
914 environment_command_queue,
915 log_file_queue,
916 gui_update_queue,
917 environment_name,
918 )
920 data_collector_instance.run()