1# -*- coding: utf-8 -*-
2"""
3Rattlesnake Vibration Control Software
4Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
5(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
6Government retains certain rights in this software.
7
8This program is free software: you can redistribute it and/or modify
9it under the terms of the GNU General Public License as published by
10the Free Software Foundation, either version 3 of the License, or
11(at your option) any later version.
12
13This program is distributed in the hope that it will be useful,
14but WITHOUT ANY WARRANTY; without even the implied warranty of
15MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16GNU General Public License for more details.
17
18You should have received a copy of the GNU General Public License
19along with this program. If not, see <https://www.gnu.org/licenses/>.
20"""
21
22import copy
23import multiprocessing as mp
24from enum import Enum
25
26import numpy as np
27
28from .abstract_message_process import AbstractMessageProcess
29from .signal_generation import SignalGenerator
30from .utilities import VerboseMessageQueue, flush_queue, rms_time
31
32TEST_LEVEL_THRESHOLD = 1.01
33
34DEBUG = False
35if DEBUG:
36 from glob import glob
37
38 FILE_OUTPUT = "debug_data/signal_generation_{:}.npz"
39
40
41class SignalGenerationCommands(Enum):
42 """Commands that the Random Vibration Signal Generation Process can accept"""
43
44 INITIALIZE_PARAMETERS = 0
45 INITIALIZE_SIGNAL_GENERATOR = 1
46 GENERATE_SIGNALS = 2
47 START_SHUTDOWN = 3
48 SHUTDOWN = 4
49 MUTE = 5
50 ADJUST_TEST_LEVEL = 6
51 SET_TEST_LEVEL = 7
52 SHUTDOWN_ACHIEVED = 8
53
54
55class SignalGenerationMetadata:
56 """General metadata required to define the signal generation process"""
57
58 def __init__(
59 self,
60 samples_per_write,
61 level_ramp_samples,
62 output_transformation_matrix=None,
63 new_signal_sample_threshold=None,
64 disabled_signals=None,
65 ):
66 self.ramp_samples = level_ramp_samples
67 self.output_transformation_matrix = output_transformation_matrix
68 self.samples_per_write = samples_per_write
69 self.new_signal_sample_threshold = (
70 self.samples_per_write
71 if new_signal_sample_threshold is None
72 else new_signal_sample_threshold
73 )
74 self.disabled_signals = [] if disabled_signals is None else disabled_signals
75
76 def __eq__(self, other):
77 try:
78 return np.all(
79 [np.all(value == other.__dict__[field]) for field, value in self.__dict__.items()]
80 )
81 except (AttributeError, KeyError):
82 return False
83
84
85class SignalGenerationProcess(AbstractMessageProcess):
86 """Class encapsulating the signal generation process for random vibration
87
88 This class handles the signal generation of the Random Vibration environment.
89 It accepts data from the data analysis process and transforms that data into
90 time histories that are put into the data_out_queue."""
91
92 def __init__(
93 self,
94 process_name: str,
95 command_queue: VerboseMessageQueue,
96 data_in_queue: mp.queues.Queue,
97 data_out_queue: mp.queues.Queue,
98 environment_command_queue: VerboseMessageQueue,
99 log_file_queue: mp.queues.Queue,
100 gui_update_queue: mp.queues.Queue,
101 environment_name: str,
102 ):
103 """
104 Class encapsulating the signal generation process for random vibration
105
106 Parameters
107 ----------
108 process_name : str
109 A name to assign the process, primarily for logging purposes.
110 queues : RandomEnvironmentQueues
111 A list of Random Environment queues for communcation with other parts
112 of the environment and the controller
113 environment_name : str
114 The name of the environment that this process is generating signals for.
115
116 """
117 super().__init__(process_name, log_file_queue, command_queue, gui_update_queue)
118 self.map_command(SignalGenerationCommands.INITIALIZE_PARAMETERS, self.initialize_parameters)
119 self.map_command(
120 SignalGenerationCommands.INITIALIZE_SIGNAL_GENERATOR,
121 self.initialize_signal_generator,
122 )
123 self.map_command(SignalGenerationCommands.GENERATE_SIGNALS, self.generate_signals)
124 self.map_command(SignalGenerationCommands.START_SHUTDOWN, self.start_shutdown)
125 self.map_command(SignalGenerationCommands.SHUTDOWN, self.shutdown)
126 self.map_command(SignalGenerationCommands.MUTE, self.mute)
127 self.map_command(SignalGenerationCommands.ADJUST_TEST_LEVEL, self.adjust_test_level)
128 self.map_command(SignalGenerationCommands.SET_TEST_LEVEL, self.set_test_level)
129 self.environment_name = environment_name
130 self.data_in_queue = data_in_queue
131 self.data_out_queue = data_out_queue
132 self.environment_command_queue = environment_command_queue
133 self.ramp_samples = None
134 self.output_transformation_matrix = None
135 self.samples_per_write = None
136 self.new_signal_sample_threshold = None
137 self.test_level_target = 1.0
138 self.current_test_level = 0.0
139 self.test_level_change = 0.0
140 self.signal_remainder = None
141 self.startup = True
142 self.shutdown_flag = False
143 self.done_generating = False
144 self.signal_generator = None
145 self.disabled_signals = None
146
147 def initialize_parameters(self, data: SignalGenerationMetadata):
148 """Stores environment signal processing parameters to the process
149
150 Parameters
151 ----------
152 data : Metadata :
153 Signal processing parameters for the environment
154
155 """
156 self.log("Initializing Test Parameters")
157 self.ramp_samples = data.ramp_samples
158 self.output_transformation_matrix = (
159 None
160 if data.output_transformation_matrix is None
161 else np.linalg.pinv(data.output_transformation_matrix)
162 )
163 self.samples_per_write = data.samples_per_write
164 self.new_signal_sample_threshold = data.new_signal_sample_threshold
165 self.disabled_signals = data.disabled_signals
166
167 def initialize_signal_generator(self, signal_generator: SignalGenerator):
168 """Stores the signal generator that will generate the signals
169
170 Parameters
171 ----------
172 signal_generator : SignalGenerator
173 A SignalGenerator object used by the process to generate signals
174 """
175 self.signal_generator = signal_generator
176 self.signal_remainder = None
177
178 def generate_signals(self, data):
179 """Function to handle generation of signals for controlling the environment.
180
181 Parameters
182 ----------
183 data : None
184 Unused argument required due to the expectation that functions called
185 by the RandomSignalGenerationProcess.generate_signals function will have one argument
186 accepting any data passed along with the instruction.
187
188 """
189 # Check to make sure that the signal generator is defined
190 if self.signal_generator is None:
191 raise RuntimeError("Signal Generator object not yet defined!")
192 # Check to see if we are just starting up
193 if self.startup:
194 self.log("Starting up output")
195 # Check if we are ready to output immediately, otherwise, wait for
196 # data to come in.
197 if not self.signal_generator.ready_for_next_output:
198 self.log("Waiting for Input Data")
199 try:
200 data = self.data_in_queue.get(timeout=10)
201 except mp.queues.Empty:
202 self.gui_update_queue.put(
203 (
204 "error",
205 (
206 f"{self.process_name} Error",
207 f"{self.process_name} timed out while waiting for first "
208 "set of parameters",
209 ),
210 )
211 )
212 return
213 self.signal_generator.update_parameters(*data)
214 self.startup = False
215 # Check and see if there is any data in the queue that can be used to
216 # update the signal generator
217 update_data = flush_queue(self.data_in_queue)
218 if len(update_data) > 0:
219 # Assign the most recent data to the signal generator
220 self.log("Got Updated Parameters")
221 self.signal_generator.update_parameters(*update_data[-1])
222 if (
223 (
224 self.signal_remainder is None
225 or self.signal_remainder.shape[-1] < self.new_signal_sample_threshold
226 )
227 and not self.done_generating
228 and self.signal_generator.ready_for_next_output
229 ):
230 self.log("Generating Frame of Data")
231 new_signal, self.done_generating = self.signal_generator.generate_frame()
232 self.log(f"Generated Signal with RMS \n {rms_time(new_signal, axis=-1)}")
233 # During the first run through, signal_remainder will be None
234 if self.signal_remainder is None:
235 self.signal_remainder = new_signal
236 else:
237 # Otherwise we just concatenate the new data at the end
238 self.signal_remainder = np.concatenate((self.signal_remainder, new_signal), axis=-1)
239 # Now check if we need to send it to the output task
240 if (
241 self.data_out_queue.empty()
242 and self.signal_remainder is not None
243 and self.signal_remainder.shape[-1] > 0
244 ):
245 self.log("Outputting Data")
246 # Determine if this is the last output. This will be the last output
247 # if the shutdown flag is set and the current test level is zero,
248 # but also if we are done generating and there is no more signal in
249 # the measurement frame.
250 signal_to_output = self.signal_remainder[..., : self.samples_per_write]
251 self.signal_remainder = self.signal_remainder[..., self.samples_per_write :]
252 last_run = (self.shutdown_flag and self.current_test_level == 0.0) or (
253 self.done_generating and self.signal_remainder.shape[-1] == 0
254 )
255 self.output(signal_to_output, last_run)
256 # Run again
257 if last_run:
258 self.log("Received Last Run, Shutting Down")
259 self.shutdown()
260 return
261 self.command_queue.put(self.process_name, (SignalGenerationCommands.GENERATE_SIGNALS, None))
262
263 def output(self, write_data, last_signal=False):
264 """Puts data to the data_out_queue and handles test level changes
265
266 This function keeps track of the environment test level and scales the
267 output signals accordingly prior to placing them into the data_out_queue.
268 This function also handles the ramping between two test levels.
269
270 Parameters
271 ----------
272 write_data : np.ndarray
273 A numpy array containing the signals to be written.
274
275 last_signal :
276 Specifies if the signal being written is the last signal that will
277 be generated due to the signal generation shutting down. This is
278 passed to the output task to tell it that there will be no more
279 signals from this environment until it is restarted. (Default value
280 = False)
281 """
282 # np.savez('test_data/signal_generation_initial_output_data_check.npz',
283 # write_data = write_data)
284 # Perform the output transformation if necessary
285 if len(self.disabled_signals) > 0:
286 write_data = write_data.copy()
287 write_data[self.disabled_signals] = 0
288 if self.output_transformation_matrix is not None:
289 self.log("Applying Transformation")
290 write_data = self.output_transformation_matrix @ write_data
291 # Compute the test_level scaling for this dataset
292 if self.test_level_change == 0.0:
293 test_level = self.current_test_level
294 self.log(f"Test Level at {test_level}")
295 else:
296 test_level = (
297 self.current_test_level
298 + (np.arange(write_data.shape[-1]) + 1) * self.test_level_change
299 )
300 # Compute distance in steps from the target
301 # test_level and find where it is near the target
302 full_level_index = np.nonzero(
303 abs(test_level - self.test_level_target) / abs(self.test_level_change)
304 < TEST_LEVEL_THRESHOLD
305 )[0]
306 # Check if any are
307 if len(full_level_index) > 0:
308 # If so, set all test_levels after that one to the target test_level
309 test_level[full_level_index[0] + 1 :] = self.test_level_target
310 # And update that our current test_level is now the target test_level
311 self.current_test_level = self.test_level_target
312 self.test_level_change = 0.0
313 else:
314 # Otherwise, our current test_level is the last entry in the test_level scaling
315 self.current_test_level = test_level[-1]
316 self.log(f"Test level from {test_level[0]} to {test_level[-1]}")
317 # Write the test level-scaled data to the task
318 self.log("Sending data to data_out queue")
319 # self.write_index += 1
320 # np.savez('signal_generation_output_data_check_{:}.npz'.format(self.write_index),
321 # write_data = write_data,test_level = test_level)
322 self.log(f"Sending Output with RMS \n {rms_time(write_data * test_level, axis=-1)}")
323 if DEBUG:
324 num_files = len(glob(FILE_OUTPUT.format("*")))
325 np.savez(
326 FILE_OUTPUT.format(num_files),
327 write_data=write_data * test_level,
328 last_signal=last_signal,
329 )
330 self.data_out_queue.put((copy.deepcopy(write_data * test_level), last_signal))
331
332 def mute(self, data): # pylint: disable=unused-argument
333 """Immediately mute the signal generation task
334
335 This function should primarily only be called at the beginning of an
336 analysis to ensure the system ramps up from zero excitation. Muting
337 the signal generation during excitation will shock load the exciters and
338 test article and may damage those hardware.
339
340 Parameters
341 ----------
342 data : None
343 Unused argument required due to the expectation that functions called
344 by the SignalGenerationProcess.run function will have one argument
345 accepting any data passed along with the instruction.
346
347 """
348 self.current_test_level = 0.0
349 self.test_level_target = 0.0
350 self.test_level_change = 0.0
351
352 def set_test_level(self, data):
353 """Immediately set the level of the signal generation task
354
355 This function should primarily only be called at the beginning of an
356 analysis to ensure the system ramps up from zero excitation. Setting
357 the signal generation during excitation will shock load the exciters and
358 test article and may damage those hardware.
359
360 Parameters
361 ----------
362 data : level
363 The level of the signal generator
364
365 """
366 self.current_test_level = data
367 self.test_level_target = data
368 self.test_level_change = 0.0
369
370 def adjust_test_level(self, data):
371 """Sets a new target test level and computes the test level change per sample
372
373 Parameters
374 ----------
375 data : float:
376 The new test level target scale factor.
377
378 """
379 self.test_level_target = data
380 self.test_level_change = (
381 self.test_level_target - self.current_test_level
382 ) / self.ramp_samples
383 if self.test_level_change != 0.0:
384 self.log(
385 f"Changed test level from {self.current_test_level} to {self.test_level_target}, "
386 f"{self.test_level_change} change per sample"
387 )
388
389 def start_shutdown(self, data): # pylint: disable=unused-argument
390 """Starts the shutdown process for the signal generation process
391
392 This will set the shutdown flag to true and adjust the test level to
393 zero. Note that this does not immediately stop the generation because
394 the actual test level will take some time to ramp down to zero as to
395 not shock load the test system.
396
397 Parameters
398 ----------
399 data : None
400 Unused argument required due to the expectation that functions called
401 by the RandomSignalGenerationProcess.Run function will have one argument
402 accepting any data passed along with the instruction.
403
404 """
405 if self.shutdown_flag or self.startup:
406 # This means we weren't supposed to shutdown, it was an extra signal.
407 return
408 self.shutdown_flag = True
409 self.adjust_test_level(0.0)
410 # Get any commands that might be in the queue currently
411 commands = self.command_queue.flush(self.process_name)
412 commands = [command[0] for command in commands]
413 # Put the run command back onto the stack.
414 if SignalGenerationCommands.GENERATE_SIGNALS in commands:
415 self.command_queue.put(
416 self.process_name, (SignalGenerationCommands.GENERATE_SIGNALS, None)
417 )
418
419 def shutdown(self):
420 """Performs final cleanup operations when the system has shut down
421
422 This function is called when the signal generation has been instructed
423 to shut down and the test level has reached zero. The signal generation
424 is the first process in the Random Vibration environment to stop when
425 shutdown is called, so it notifies the environment process to stop the
426 acquisition and analysis tasks because it is no longer generating signals
427 """
428 self.log("Shutting Down Signal Generation")
429 self.command_queue.flush(self.process_name)
430 # Tell the other processes to shut down as well
431 self.environment_command_queue.put(
432 self.process_name, (SignalGenerationCommands.SHUTDOWN_ACHIEVED, None)
433 )
434 self.startup = True
435 self.shutdown_flag = False
436 self.done_generating = False
437
438
439def signal_generation_process(
440 environment_name: str,
441 command_queue: VerboseMessageQueue,
442 data_in_queue: mp.queues.Queue,
443 data_out_queue: mp.queues.Queue,
444 environment_command_queue: VerboseMessageQueue,
445 log_file_queue: mp.queues.Queue,
446 gui_update_queue: mp.queues.Queue,
447 process_name: str = None,
448):
449 """Signal generation process function called by multiprocessing
450
451 This function defines the Signal Generation process that
452 gets run by the multiprocessing module when it creates a new process. It
453 creates a SignalGenerationProcess object and runs it.
454
455 Parameters
456 ----------
457 environment_name : str :
458 Name of the environment associated with this signal generation process
459 """
460
461 signal_generation_instance = SignalGenerationProcess(
462 (environment_name + " Signal Generation" if process_name is None else process_name),
463 command_queue,
464 data_in_queue,
465 data_out_queue,
466 environment_command_queue,
467 log_file_queue,
468 gui_update_queue,
469 environment_name,
470 )
471
472 signal_generation_instance.run()