Coverage for / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / time_environment.py: 46%
334 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
1# -*- coding: utf-8 -*-
2"""
3This file defines a Time Signal Generator environment where a signal can be
4loaded and played directly to the output devices. This is perhaps the simplest
5control type that might be implemented by the controller, so start here when
6designing new control types.
8Rattlesnake Vibration Control Software
9Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
10(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
11Government retains certain rights in this software.
13This program is free software: you can redistribute it and/or modify
14it under the terms of the GNU General Public License as published by
15the Free Software Foundation, either version 3 of the License, or
16(at your option) any later version.
18This program is distributed in the hope that it will be useful,
19but WITHOUT ANY WARRANTY; without even the implied warranty of
20MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21GNU General Public License for more details.
23You should have received a copy of the GNU General Public License
24along with this program. If not, see <https://www.gnu.org/licenses/>.
25"""
27import copy
28import multiprocessing as mp
29import multiprocessing.sharedctypes # pylint: disable=unused-import
30from multiprocessing.queues import Queue
32import netCDF4 as nc4
33import numpy as np
34import openpyxl
35from qtpy import QtCore, QtWidgets, uic
37from .abstract_environment import AbstractEnvironment, AbstractMetadata, AbstractUI
38from .environments import (
39 ControlTypes,
40 environment_definition_ui_paths,
41 environment_run_ui_paths,
42)
43from .ui_utilities import load_time_history, multiline_plotter
44from .utilities import (
45 DataAcquisitionParameters,
46 GlobalCommands,
47 VerboseMessageQueue,
48 db2scale,
49 rms_time,
50)
52CONTROL_TYPE = ControlTypes.TIME
53TEST_LEVEL_THRESHOLD = 1.01
54MAX_RESPONSES_TO_PLOT = 20
55MAX_SAMPLES_TO_PLOT = 100000
58class TimeQueues:
59 """A set of queues used by the Time environment"""
61 def __init__(
62 self,
63 environment_command_queue: VerboseMessageQueue,
64 gui_update_queue: mp.queues.Queue,
65 controller_communication_queue: VerboseMessageQueue,
66 data_in_queue: mp.queues.Queue,
67 data_out_queue: mp.queues.Queue,
68 log_file_queue: VerboseMessageQueue,
69 ):
70 """
71 Creates a namespace to store all the queues used by the Time Environment
73 Parameters
74 ----------
75 environment_command_queue : VerboseMessageQueue
76 Queue from which the environment will receive instructions.
77 gui_update_queue : mp.queues.Queue
78 Queue to which the environment will put GUI updates.
79 controller_communication_queue : VerboseMessageQueue
80 Queue to which the environment will put global contorller instructions.
81 data_in_queue : mp.queues.Queue
82 Queue from which the environment will receive data from acquisition.
83 data_out_queue : mp.queues.Queue
84 Queue to which the environment will write data for output.
85 log_file_queue : VerboseMessageQueue
86 Queue to which the environment will write log file messages.
87 """
88 self.environment_command_queue = environment_command_queue
89 self.gui_update_queue = gui_update_queue
90 self.controller_communication_queue = controller_communication_queue
91 self.data_in_queue = data_in_queue
92 self.data_out_queue = data_out_queue
93 self.log_file_queue = log_file_queue
96class TimeParameters(AbstractMetadata):
97 """Storage container for parameters used by the Time Environment"""
99 def __init__(self, sample_rate, output_signal, cancel_rampdown_time):
100 """
101 Container to hold signal processing parameters for the Time environment
103 Parameters
104 ----------
105 sample_rate : int
106 Number of samples per second that the controller runs.
107 output_signal : np.ndarray
108 Signal that will be generated by the sources, 2D array (n_outputs x
109 n_samples)
110 cancel_rampdown_time : float
111 Time used to decay the signal to zero if the environment is stopped.
112 Prevents "hard stops" from damaging equipment.
114 """
115 self.sample_rate = sample_rate
116 self.output_signal = output_signal
117 self.cancel_rampdown_time = cancel_rampdown_time
119 @property
120 def signal_samples(self):
121 """The number of samples in the signal"""
122 return self.output_signal.shape[-1]
124 @property
125 def output_channels(self):
126 """The number of output channels in the signal"""
127 return self.output_signal.shape[0]
129 @property
130 def signal_time(self):
131 """The length of the signal in seconds"""
132 return self.signal_samples / self.sample_rate
134 @property
135 def cancel_rampdown_samples(self):
136 """The number of samples required to ramp down the signal when cancelled"""
137 return int(self.cancel_rampdown_time * self.sample_rate)
139 def store_to_netcdf(
140 self, netcdf_group_handle: nc4._netCDF4.Group # pylint: disable=c-extension-no-member
141 ):
142 """
143 Stores parameters to a netCDF group so they can be recovered.
145 Parameters
146 ----------
147 netcdf_group_handle : nc4._netCDF4.Group
148 Reference to the netCDF4 group in which the environment data should
149 be stored.
151 """
152 netcdf_group_handle.cancel_rampdown_time = self.cancel_rampdown_time
153 # Save the output signal
154 netcdf_group_handle.createDimension("output_channels", self.output_channels)
155 netcdf_group_handle.createDimension("signal_samples", self.signal_samples)
156 var = netcdf_group_handle.createVariable(
157 "output_signal", "f8", ("output_channels", "signal_samples")
158 )
159 var[...] = self.output_signal
161 @classmethod
162 def from_ui(cls, ui):
163 """Creates a TimeParameters object from the user interface
165 Parameters
166 ----------
167 ui : TimeUI
168 A Time User Interface
170 Returns
171 -------
172 test_parameters : TimeParameters
173 Parameters corresponding to the data in the user interface.
175 """
176 return cls(
177 sample_rate=ui.definition_widget.output_sample_rate_display.value(),
178 output_signal=ui.signal,
179 cancel_rampdown_time=ui.definition_widget.cancel_rampdown_selector.value(),
180 )
183class TimeUI(AbstractUI):
184 """Class defining the user interface for a Random Vibration environment.
186 This class will contain two main UIs, the environment definition and run.
187 The widgets corresponding to these interfaces are stored in TabWidgets in
188 the main UI.
190 This class defines all the call backs and user interface operations required
191 for the Time environment."""
193 def __init__(
194 self,
195 environment_name: str,
196 definition_tabwidget: QtWidgets.QTabWidget,
197 system_id_tabwidget: QtWidgets.QTabWidget, # pylint: disable=unused-argument
198 test_predictions_tabwidget: QtWidgets.QTabWidget, # pylint: disable=unused-argument
199 run_tabwidget: QtWidgets.QTabWidget,
200 environment_command_queue: VerboseMessageQueue,
201 controller_communication_queue: VerboseMessageQueue,
202 log_file_queue: Queue,
203 ):
204 """
205 Constructs a Time User Interfae
207 Given the tab widgets from the main interface as well as communication
208 queues, this class assembles the user interface components specific to
209 the Time Environment
211 Parameters
212 ----------
213 definition_tabwidget : QtWidgets.QTabWidget
214 QTabWidget containing the environment subtabs on the Control
215 Definition main tab
216 system_id_tabwidget : QtWidgets.QTabWidget
217 QTabWidget containing the environment subtabs on the System
218 Identification main tab. The Time Environment has no system
219 identification step, so this is not used.
220 test_predictions_tabwidget : QtWidgets.QTabWidget
221 QTabWidget containing the environment subtabs on the Test Predictions
222 main tab. The Time Environment has no system identification
223 step, so this is not used.
224 run_tabwidget : QtWidgets.QTabWidget
225 QTabWidget containing the environment subtabs on the Run
226 main tab.
227 environment_command_queue : VerboseMessageQueue
228 Queue for sending commands to the Random Vibration Environment
229 controller_communication_queue : VerboseMessageQueue
230 Queue for sending global commands to the controller
231 log_file_queue : Queue
232 Queue where log file messages can be written.
234 """
235 super().__init__(
236 environment_name,
237 environment_command_queue,
238 controller_communication_queue,
239 log_file_queue,
240 )
241 # Add the page to the control definition tabwidget
242 self.definition_widget = QtWidgets.QWidget()
243 uic.loadUi(environment_definition_ui_paths[CONTROL_TYPE], self.definition_widget)
244 definition_tabwidget.addTab(self.definition_widget, self.environment_name)
245 # Add the page to the run tabwidget
246 self.run_widget = QtWidgets.QWidget()
247 uic.loadUi(environment_run_ui_paths[CONTROL_TYPE], self.run_widget)
248 run_tabwidget.addTab(self.run_widget, self.environment_name)
250 # Set up some persistent data
251 self.data_acquisition_parameters = None
252 self.environment_parameters = None
253 self.signal = None
254 self.physical_output_names = None
255 self.physical_measurement_names = None
256 self.show_signal_checkboxes = None
257 self.plot_data_items = {}
259 self.complete_ui()
260 self.connect_callbacks()
262 # Complete the profile commands
263 self.command_map["Set Test Level"] = self.change_test_level_from_profile
264 self.command_map["Set Repeat"] = self.set_repeat_from_profile
265 self.command_map["Set No Repeat"] = self.set_norepeat_from_profile
267 def collect_environment_definition_parameters(self) -> TimeParameters:
268 """Collect the parameters from the user interface defining the environment
270 Returns
271 -------
272 TimeParameters
273 A metadata or parameters object containing the parameters defining
274 the corresponding environment.
275 """
276 return TimeParameters.from_ui(self)
278 def complete_ui(self):
279 """Helper Function to continue setting up the user interface"""
280 # Set common look and feel for plots
281 plot_widgets = [
282 self.definition_widget.signal_display_plot,
283 self.run_widget.output_signal_plot,
284 self.run_widget.response_signal_plot,
285 ]
286 for plot_widget in plot_widgets:
287 plot_item = plot_widget.getPlotItem()
288 plot_item.showGrid(True, True, 0.25)
289 plot_item.enableAutoRange()
290 plot_item.getViewBox().enableAutoRange(enable=True)
292 def connect_callbacks(self):
293 """Helper function to connect callbacks to functions in the class"""
294 self.definition_widget.load_signal_button.clicked.connect(self.load_signal)
295 self.run_widget.start_test_button.clicked.connect(self.start_control)
296 self.run_widget.stop_test_button.clicked.connect(self.stop_control)
298 def initialize_data_acquisition(self, data_acquisition_parameters: DataAcquisitionParameters):
299 """Update the user interface with data acquisition parameters
301 This function is called when the Data Acquisition parameters are
302 initialized. This function should set up the environment user interface
303 accordingly.
305 Parameters
306 ----------
307 data_acquisition_parameters : DataAcquisitionParameters :
308 Container containing the data acquisition parameters, including
309 channel table and sampling information.
311 """
312 self.log("Initializing Data Acquisition")
313 self.signal = None
314 # Get channel information
315 channels = data_acquisition_parameters.channel_list
316 num_measurements = len([channel for channel in channels if channel.feedback_device is None])
317 num_output = len([channel for channel in channels if channel.feedback_device is not None])
318 self.physical_output_names = [
319 f"{'' if channel.channel_type is None else channel.channel_type} "
320 f"{channel.node_number}{channel.node_direction}"
321 for channel in channels
322 if channel.feedback_device
323 ]
324 self.physical_measurement_names = [
325 f"{'' if channel.channel_type is None else channel.channel_type} "
326 "{channel.node_number}{channel.node_direction}"
327 for channel in channels
328 if channel.feedback_device is None
329 ]
330 # Add rows to the signal table
331 self.definition_widget.signal_information_table.setRowCount(num_output)
332 self.show_signal_checkboxes = []
333 for i, name in enumerate(self.physical_output_names):
334 item = QtWidgets.QTableWidgetItem()
335 item.setText(name)
336 item.setFlags(item.flags() ^ QtCore.Qt.ItemIsEditable)
337 self.definition_widget.signal_information_table.setItem(i, 1, item)
338 checkbox = QtWidgets.QCheckBox()
339 checkbox.setChecked(True)
340 checkbox.stateChanged.connect(self.show_signal)
341 self.show_signal_checkboxes.append(checkbox)
342 self.definition_widget.signal_information_table.setCellWidget(i, 0, checkbox)
343 item = QtWidgets.QTableWidgetItem()
344 item.setText("0.0")
345 item.setFlags(item.flags() ^ QtCore.Qt.ItemIsEditable)
346 self.definition_widget.signal_information_table.setItem(i, 2, item)
347 item = QtWidgets.QTableWidgetItem()
348 item.setText("0.0")
349 item.setFlags(item.flags() ^ QtCore.Qt.ItemIsEditable)
350 self.definition_widget.signal_information_table.setItem(i, 3, item)
351 # Fill in the info at the bottom
352 self.definition_widget.sample_rate_display.setValue(data_acquisition_parameters.sample_rate)
353 self.definition_widget.output_sample_rate_display.setValue(
354 data_acquisition_parameters.sample_rate * data_acquisition_parameters.output_oversample
355 )
356 self.definition_widget.output_channels_display.setValue(num_output)
358 # Clear the signal plot
359 self.definition_widget.signal_display_plot.getPlotItem().clear()
360 self.run_widget.output_signal_plot.getPlotItem().clear()
361 self.run_widget.response_signal_plot.getPlotItem().clear()
363 # Set initial lines
364 self.plot_data_items["output_signal_definition"] = multiline_plotter(
365 np.arange(2),
366 np.zeros((num_output, 2)),
367 widget=self.definition_widget.signal_display_plot,
368 other_pen_options={"width": 1},
369 names=self.physical_output_names,
370 )
371 self.plot_data_items["output_signal_measurement"] = multiline_plotter(
372 np.arange(2),
373 np.zeros((num_output, 2)),
374 widget=self.run_widget.output_signal_plot,
375 other_pen_options={"width": 1},
376 names=self.physical_output_names,
377 )
378 self.plot_data_items["response_signal_measurement"] = multiline_plotter(
379 np.arange(2),
380 np.zeros(
381 (
382 (
383 num_measurements
384 if num_measurements < MAX_RESPONSES_TO_PLOT
385 else MAX_RESPONSES_TO_PLOT
386 ),
387 2,
388 )
389 ),
390 widget=self.run_widget.response_signal_plot,
391 other_pen_options={"width": 1},
392 names=self.physical_measurement_names,
393 )
395 self.data_acquisition_parameters = data_acquisition_parameters
397 def load_signal(self, clicked, filename=None): # pylint: disable=unused-argument
398 """Loads a time signal using a dialog or the specified filename
400 Parameters
401 ----------
402 clicked :
403 The clicked event that triggered the callback.
404 filename :
405 File name defining the specification for bypassing the callback when
406 loading from a file (Default value = None).
408 """
409 if filename is None:
410 filename, _ = QtWidgets.QFileDialog.getOpenFileName(
411 self.definition_widget,
412 "Select Signal File",
413 filter="Numpy or Mat (*.npy *.npz *.mat)",
414 )
415 if filename == "":
416 return
417 self.definition_widget.signal_file_name_display.setText(filename)
418 self.signal = load_time_history(
419 filename, self.definition_widget.output_sample_rate_display.value()
420 )
421 self.definition_widget.signal_samples_display.setValue(self.signal.shape[-1])
422 self.definition_widget.signal_time_display.setValue(
423 self.signal.shape[-1] / self.definition_widget.output_sample_rate_display.value()
424 )
425 maxs = np.max(np.abs(self.signal), axis=-1)
426 rmss = rms_time(self.signal, axis=-1)
427 for i, (mx, rms) in enumerate(zip(maxs, rmss)):
428 self.definition_widget.signal_information_table.item(i, 2).setText(f"{mx:0.2f}")
429 self.definition_widget.signal_information_table.item(i, 3).setText(f"{rms:0.2f}")
430 self.show_signal()
432 def show_signal(self):
433 """Shows the signal on the user interface"""
434 for curve, signal, check_box in zip(
435 self.plot_data_items["output_signal_definition"],
436 self.signal,
437 self.show_signal_checkboxes,
438 ):
439 if check_box.isChecked():
440 x = (
441 np.arange(signal.shape[-1])
442 / self.definition_widget.output_sample_rate_display.value()
443 )
444 curve.setData(x, signal)
445 else:
446 curve.setData((0, 0), (0, 0))
448 def initialize_environment(self) -> AbstractMetadata:
449 """Update the user interface with environment parameters
451 This function is called when the Environment parameters are initialized.
452 This function should set up the user interface accordingly. It must
453 return the parameters class of the environment that inherits from
454 AbstractMetadata.
456 Returns
457 -------
458 environment_parameters : TimeParameters
459 A TimeParameters object that contains the parameters
460 defining the environment.
461 """
462 self.log("Initializing Environment Parameters")
463 data = self.collect_environment_definition_parameters()
464 # Make sure everything is defined
465 if data.output_signal is None:
466 raise ValueError("Output Signal is not defined!")
467 # Initialize the correct sizes of the arrays
468 for plot_items in [
469 self.plot_data_items["output_signal_measurement"],
470 self.plot_data_items["response_signal_measurement"],
471 ]:
472 for curve in plot_items:
473 curve.setData(
474 np.arange(
475 (
476 data.output_signal.shape[-1]
477 // self.data_acquisition_parameters.output_oversample
478 * 2
479 if data.output_signal.shape[-1]
480 // self.data_acquisition_parameters.output_oversample
481 * 2
482 < MAX_SAMPLES_TO_PLOT
483 else MAX_SAMPLES_TO_PLOT
484 )
485 )
486 / self.data_acquisition_parameters.sample_rate,
487 np.zeros(
488 (
489 data.output_signal.shape[-1]
490 // self.data_acquisition_parameters.output_oversample
491 * 2
492 if data.output_signal.shape[-1]
493 // self.data_acquisition_parameters.output_oversample
494 * 2
495 < MAX_SAMPLES_TO_PLOT
496 else MAX_SAMPLES_TO_PLOT
497 )
498 ),
499 )
500 self.environment_parameters = data
501 return data
503 def retrieve_metadata(
504 self, netcdf_handle: nc4._netCDF4.Dataset # pylint: disable=c-extension-no-member
505 ):
506 """Collects environment parameters from a netCDF dataset.
508 This function retrieves parameters from a netCDF dataset that was written
509 by the controller during streaming. It must populate the widgets
510 in the user interface with the proper information.
512 This function is the "read" counterpart to the store_to_netcdf
513 function in the TimeParameters class, which will write
514 parameters to the netCDF file to document the metadata.
516 Note that the entire dataset is passed to this function, so the function
517 should collect parameters pertaining to the environment from a Group
518 in the dataset sharing the environment's name, e.g.
520 ``group = netcdf_handle.groups[self.environment_name]``
521 ``self.definition_widget.parameter_selector.setValue(group.parameter)``
523 Parameters
524 ----------
525 netcdf_handle : nc4._netCDF4.Dataset :
526 The netCDF dataset from which the data will be read. It should have
527 a group name with the enviroment's name.
528 """
529 group = netcdf_handle.groups[self.environment_name]
530 self.signal = group.variables["output_signal"][...].data
531 self.definition_widget.cancel_rampdown_selector.setValue(group.cancel_rampdown_time)
532 maxs = np.max(np.abs(self.signal), axis=-1)
533 rmss = rms_time(self.signal, axis=-1)
534 for i, (mx, rms) in enumerate(zip(maxs, rmss)):
535 self.definition_widget.signal_information_table.item(i, 2).setText(f"{mx:0.2f}")
536 self.definition_widget.signal_information_table.item(i, 3).setText(f"{rms:0.2f}")
537 self.show_signal()
539 def start_control(self):
540 """Starts running the environment"""
541 self.run_widget.stop_test_button.setEnabled(True)
542 self.run_widget.start_test_button.setEnabled(False)
543 self.run_widget.test_level_selector.setEnabled(False)
544 self.run_widget.repeat_signal_checkbox.setEnabled(False)
545 self.controller_communication_queue.put(
546 self.log_name, (GlobalCommands.START_ENVIRONMENT, self.environment_name)
547 )
548 self.environment_command_queue.put(
549 self.log_name,
550 (
551 GlobalCommands.START_ENVIRONMENT,
552 (
553 db2scale(self.run_widget.test_level_selector.value()),
554 self.run_widget.repeat_signal_checkbox.isChecked(),
555 ),
556 ),
557 )
558 self.controller_communication_queue.put(
559 self.log_name, (GlobalCommands.AT_TARGET_LEVEL, self.environment_name)
560 )
562 def stop_control(self):
563 """Stops running the environment"""
564 self.environment_command_queue.put(self.log_name, (GlobalCommands.STOP_ENVIRONMENT, None))
566 def change_test_level_from_profile(self, test_level):
567 """Sets the test level from a profile instruction
569 Parameters
570 ----------
571 test_level :
572 Value to set the test level to.
573 """
574 self.run_widget.test_level_selector.setValue(int(test_level))
576 def set_repeat_from_profile(self, data): # pylint: disable=unused-argument
577 """Sets the the signal to repeat from a profile instruction
579 Parameters
580 ----------
581 data : Ignored
582 Parameter is ignored but required by the ``command_map``
584 """
585 self.run_widget.repeat_signal_checkbox.setChecked(True)
587 def set_norepeat_from_profile(self, data): # pylint: disable=unused-argument
588 """Sets the the signal to not repeat from a profile instruction
590 Parameters
591 ----------
592 data : Ignored
593 Parameter is ignored but required by the ``command_map``
595 """
596 self.run_widget.repeat_signal_checkbox.setChecked(False)
598 def update_gui(self, queue_data):
599 """Update the graphical interface for the environment
601 Parameters
602 ----------
603 queue_data :
604 A 2-tuple consisting of ``(message,data)`` pairs where the message
605 denotes what to change and the data contains the information needed
606 to be displayed.
607 """
608 message, data = queue_data
609 if message == "time_data":
610 response_data, output_data = data
611 for curve, this_data in zip(
612 self.plot_data_items["response_signal_measurement"], response_data
613 ):
614 x, y = curve.getData()
615 y = np.concatenate((y[this_data.size :], this_data[-x.size :]), axis=0)
616 curve.setData(x, y)
617 # Display the data
618 for curve, this_output in zip(
619 self.plot_data_items["output_signal_measurement"], output_data
620 ):
621 x, y = curve.getData()
622 y = np.concatenate((y[this_output.size :], this_output[-x.size :]), axis=0)
623 curve.setData(x, y)
624 elif message == "enable":
625 widget = None
626 for parent in [self.definition_widget, self.run_widget]:
627 try:
628 widget = getattr(parent, data)
629 break
630 except AttributeError:
631 continue
632 if widget is None:
633 raise ValueError(f"Cannot Enable Widget {data}: not found in UI")
634 widget.setEnabled(True)
635 elif message == "disable":
636 widget = None
637 for parent in [self.definition_widget, self.run_widget]:
638 try:
639 widget = getattr(parent, data)
640 break
641 except AttributeError:
642 continue
643 if widget is None:
644 raise ValueError(f"Cannot Disable Widget {data}: not found in UI")
645 widget.setEnabled(False)
646 else:
647 widget = None
648 for parent in [self.definition_widget, self.run_widget]:
649 try:
650 widget = getattr(parent, message)
651 break
652 except AttributeError:
653 continue
654 if widget is None:
655 raise ValueError(f"Cannot Update Widget {message}: not found in UI")
656 if isinstance(widget, QtWidgets.QDoubleSpinBox):
657 widget.setValue(data)
658 elif isinstance(widget, QtWidgets.QSpinBox):
659 widget.setValue(data)
660 elif isinstance(widget, QtWidgets.QLineEdit):
661 widget.setText(data)
662 elif isinstance(widget, QtWidgets.QListWidget):
663 widget.clear()
664 widget.addItems([f"{d:.3f}" for d in data])
666 @staticmethod
667 def create_environment_template(
668 environment_name: str, workbook: openpyxl.workbook.workbook.Workbook
669 ):
670 """Creates a template worksheet in an Excel workbook defining the
671 environment.
673 This function creates a template worksheet in an Excel workbook that
674 when filled out could be read by the controller to re-create the
675 environment.
677 This function is the "write" counterpart to the
678 ``set_parameters_from_template`` function in the ``TimeUI`` class,
679 which reads the values from the template file to populate the user
680 interface.
682 Parameters
683 ----------
684 environment_name : str :
685 The name of the environment that will specify the worksheet's name
686 workbook : openpyxl.workbook.workbook.Workbook :
687 A reference to an ``openpyxl`` workbook.
689 """
690 worksheet = workbook.create_sheet(environment_name)
691 worksheet.cell(1, 1, "Control Type")
692 worksheet.cell(1, 2, "Time")
693 worksheet.cell(
694 1,
695 4,
696 "Note: Replace cells with hash marks (#) to provide the requested parameters.",
697 )
698 worksheet.cell(2, 1, "Signal File")
699 worksheet.cell(2, 2, "# Path to the file that contains the time signal that will be output")
700 worksheet.cell(3, 1, "Cancel Rampdown Time")
701 worksheet.cell(
702 3,
703 2,
704 "# Time for the environment to ramp to zero if the environment is cancelled.",
705 )
707 def set_parameters_from_template(self, worksheet: openpyxl.worksheet.worksheet.Worksheet):
708 """
709 Collects parameters for the user interface from the Excel template file
711 This function reads a filled out template worksheet to create an
712 environment. Cells on this worksheet contain parameters needed to
713 specify the environment, so this function should read those cells and
714 update the UI widgets with those parameters.
716 This function is the "read" counterpart to the
717 ``create_environment_template`` function in the ``TimeUI`` class,
718 which writes a template file that can be filled out by a user.
721 Parameters
722 ----------
723 worksheet : openpyxl.worksheet.worksheet.Worksheet
724 An openpyxl worksheet that contains the environment template.
725 Cells on this worksheet should contain the parameters needed for the
726 user interface.
728 """
729 self.load_signal(None, worksheet.cell(2, 2).value)
730 self.definition_widget.cancel_rampdown_selector.setValue(float(worksheet.cell(3, 2).value))
733class TimeEnvironment(AbstractEnvironment):
734 """Environment defined by a generated time history signal"""
736 def __init__(
737 self,
738 environment_name: str,
739 queue_container: TimeQueues,
740 acquisition_active: mp.sharedctypes.Synchronized,
741 output_active: mp.sharedctypes.Synchronized,
742 ):
743 """
744 Time History Generation Environment Constructor
746 This function fills out the command map and initializes parameters to
747 zero or null.
749 Parameters
750 ----------
751 environment_name : str
752 Name of the environment.
753 queue_container : TimeQueues
754 Container of queues used by the Time Environment.
756 """
757 super().__init__(
758 environment_name,
759 queue_container.environment_command_queue,
760 queue_container.gui_update_queue,
761 queue_container.controller_communication_queue,
762 queue_container.log_file_queue,
763 queue_container.data_in_queue,
764 queue_container.data_out_queue,
765 acquisition_active,
766 output_active,
767 )
768 self.queue_container = queue_container
769 # Define command map
770 self.command_map[GlobalCommands.START_ENVIRONMENT] = self.run_environment
771 # Persistent data
772 self.data_acquisition_parameters = None
773 self.environment_parameters = None
774 self.startup = True
775 self.shutdown_flag = False
776 self.current_test_level = 0.0
777 self.target_test_level = 0.0
778 self.test_level_target = 0.0
779 self.test_level_change = 0.0
780 self.repeat = False
781 self.signal_remainder = None
782 self.output_channels = None
783 self.measurement_channels = None
785 def initialize_data_acquisition_parameters(
786 self, data_acquisition_parameters: DataAcquisitionParameters
787 ):
788 """Initialize the data acquisition parameters in the environment.
790 The environment will receive the global data acquisition parameters from
791 the controller, and must set itself up accordingly.
793 Parameters
794 ----------
795 data_acquisition_parameters : DataAcquisitionParameters :
796 A container containing data acquisition parameters, including
797 channels active in the environment as well as sampling parameters.
798 """
799 self.log("Initializing Data Acquisition Parameters")
800 self.data_acquisition_parameters = data_acquisition_parameters
801 self.measurement_channels = [
802 index
803 for index, channel in enumerate(self.data_acquisition_parameters.channel_list)
804 if channel.feedback_device is None
805 ]
806 self.output_channels = [
807 index
808 for index, channel in enumerate(self.data_acquisition_parameters.channel_list)
809 if channel.feedback_device is not None
810 ]
812 def initialize_environment_test_parameters(self, environment_parameters: TimeParameters):
813 """
814 Initialize the environment parameters specific to this environment
816 The environment will recieve parameters defining itself from the
817 user interface and must set itself up accordingly.
819 Parameters
820 ----------
821 environment_parameters : TimeParameters
822 A container containing the parameters defining the environment
824 """
825 self.log("Initializing Environment Parameters")
826 self.environment_parameters = environment_parameters
828 def run_environment(self, data):
829 """Runs the time history environment.
831 This function handles start up, running, and shutting down the environment
833 During startup, the function will initialize a buffer that it will
834 write from that consists of the output signal.
836 When running, it will collect data that comes in from the ``data_in_queue``
837 and tell the GUI to display it. This will also tell
838 the environment whether or not the signal is the last signal and that
839 it should shut down. The function determines if a
840 new signal is required by checking if the data_out_queue is empty.
841 If it is empty, we write the next portion of the buffer.
842 If the buffer runs dry and the signal is not repeating, it will set the
843 last_signal flag which tells the process to begin shutting down. If
844 shutdown is occuring, the process continues to get data until the last
845 signal is received.
847 Parameters
848 ----------
849 data : Tuple
850 A tuple containing the test level to run the environment at and
851 a boolean specifying whether or not to repeat the signal.
853 """
854 if self.startup:
855 if data is not None:
856 self.current_test_level, self.repeat = data
857 self.log(f"Test Level set to {self.current_test_level}")
858 self.signal_remainder = self.environment_parameters.output_signal
859 self.startup = False
860 # See if any data has come in
861 try:
862 acquisition_data, last_acquisition = self.queue_container.data_in_queue.get_nowait()
863 measurement_data = acquisition_data[self.measurement_channels]
864 output_data = acquisition_data[self.output_channels]
865 self.queue_container.gui_update_queue.put(
866 (self.environment_name, ("time_data", (measurement_data, output_data)))
867 )
868 except mp.queues.Empty:
869 last_acquisition = False
870 # See if we need to output data
871 if self.queue_container.data_out_queue.empty():
872 last_signal = False
873 # See if there is enough in the remainder
874 if (
875 self.data_acquisition_parameters.samples_per_write > self.signal_remainder.shape[-1]
876 and self.repeat
877 ):
878 self.signal_remainder = np.concatenate(
879 (self.signal_remainder, self.environment_parameters.output_signal),
880 axis=-1,
881 )
882 elif (
883 self.data_acquisition_parameters.samples_per_write
884 >= self.signal_remainder.shape[-1]
885 and not self.repeat
886 ) or self.current_test_level == 0.0:
887 last_signal = True
888 self.output(
889 self.signal_remainder[:, : self.data_acquisition_parameters.samples_per_write],
890 last_signal,
891 )
892 self.signal_remainder = self.signal_remainder[
893 :, self.data_acquisition_parameters.samples_per_write :
894 ]
895 if last_signal:
896 # Wait until we get the last signal from the acquisition
897 while not last_acquisition:
898 self.log("Waiting for Last Acquisition")
899 acquisition_data, last_acquisition = self.queue_container.data_in_queue.get()
900 measurement_data = acquisition_data[self.measurement_channels]
901 output_data = acquisition_data[self.output_channels]
902 self.queue_container.gui_update_queue.put(
903 (
904 self.environment_name,
905 ("time_data", (measurement_data, output_data)),
906 )
907 )
908 self.shutdown()
909 return
910 self.queue_container.environment_command_queue.put(
911 self.environment_name, (GlobalCommands.START_ENVIRONMENT, None)
912 )
914 def output(self, write_data, last_signal=False):
915 """Puts data to the data_out_queue and handles test level changes
917 This function keeps track of the environment test level and scales the
918 output signals accordingly prior to placing them into the data_out_queue.
919 This function also handles the ramping between two test levels.
921 Parameters
922 ----------
923 write_data : np.ndarray
924 A numpy array containing the signals to be written.
926 last_signal :
927 Specifies if the signal being written is the last signal that will
928 be generated due to the signal generation shutting down. This is
929 passed to the output task to tell it that there will be no more
930 signals from this environment until it is restarted. (Default value
931 = False)
932 """
933 # Perform the output transformation if necessary
934 # Compute the test_level scaling for this dataset
935 if self.test_level_change == 0.0:
936 test_level = self.current_test_level
937 self.log(f"Test Level at {test_level}")
938 else:
939 test_level = (
940 self.current_test_level
941 + (np.arange(write_data.shape[-1]) + 1) * self.test_level_change
942 )
943 # Compute distance in steps from the target test_level
944 # and find where it is near the target
945 full_level_index = np.nonzero(
946 abs(test_level - self.test_level_target) / abs(self.test_level_change)
947 < TEST_LEVEL_THRESHOLD
948 )[0]
949 # Check if any are
950 if len(full_level_index) > 0:
951 # If so, set all test_levels after that one to the target test_level
952 test_level[full_level_index[0] + 1 :] = self.test_level_target
953 # And update that our current test_level is now the target test_level
954 self.current_test_level = self.test_level_target
955 self.test_level_change = 0.0
956 else:
957 # Otherwise, our current test_level is the last entry in the test_level scaling
958 self.current_test_level = test_level[-1]
959 self.log(f"Test level from {test_level[0]} to {test_level[-1]}")
960 # Write the test level-scaled data to the task
961 self.log("Sending data to data_out queue")
962 self.queue_container.data_out_queue.put(
963 (copy.deepcopy(write_data * test_level), last_signal)
964 )
966 def stop_environment(self, data):
967 """Stops the environment by setting the test level to zero.
969 Parameters
970 ----------
971 data : Ignored
972 This parameter is not used by the function but is required for the
973 ``command_map`` calling signature.
975 """
976 self.adjust_test_level(0.0)
978 def adjust_test_level(self, data):
979 """Adjusts the test level of the signal
981 Parameters
982 ----------
983 data :
984 New target test level
986 """
987 self.test_level_target = data
988 self.test_level_change = (
989 self.test_level_target - self.current_test_level
990 ) / self.environment_parameters.cancel_rampdown_samples
991 if self.test_level_change != 0.0:
992 self.log(
993 f"Changed test level to {self.test_level_target} from "
994 f"{self.current_test_level}, {self.test_level_change} change per sample"
995 )
997 def shutdown(self):
998 """Performs final cleanup operations when the system has shut down
1000 This function is called when the environment has been instructed
1001 to shut down and the last acquisition data has been received. The signal generation
1002 is the first process in the Random Vibration environment to stop when
1003 shutdown is called, so it notifies the environment process to stop the
1004 acquisition and analysis tasks because it is no longer generating signals
1006 """
1007 self.log("Shutting Down Time History Generation")
1008 self.queue_container.environment_command_queue.flush(self.environment_name)
1009 # Enable the volume controls
1010 self.queue_container.gui_update_queue.put(
1011 (self.environment_name, ("enable", "test_level_selector"))
1012 )
1013 self.queue_container.gui_update_queue.put(
1014 (self.environment_name, ("enable", "repeat_signal_checkbox"))
1015 )
1016 self.queue_container.gui_update_queue.put(
1017 (self.environment_name, ("enable", "start_test_button"))
1018 )
1019 self.queue_container.gui_update_queue.put(
1020 (self.environment_name, ("disable", "stop_test_button"))
1021 )
1022 self.startup = True
1025def time_process(
1026 environment_name: str,
1027 input_queue: VerboseMessageQueue,
1028 gui_update_queue: Queue,
1029 controller_communication_queue: VerboseMessageQueue,
1030 log_file_queue: Queue,
1031 data_in_queue: Queue,
1032 data_out_queue: Queue,
1033 acquisition_active: mp.sharedctypes.Synchronized,
1034 output_active: mp.sharedctypes.Synchronized,
1035):
1036 """Time signal generation environment process function called by multiprocessing
1038 This function defines the environment process that
1039 gets run by the multiprocessing module when it creates a new process. It
1040 creates a TimeEnviornment object and runs it.
1042 Parameters
1043 ----------
1044 environment_name : str :
1045 Name of the environment
1046 input_queue : VerboseMessageQueue :
1047 Queue containing instructions for the environment
1048 gui_update_queue : Queue :
1049 Queue where GUI updates are put
1050 controller_communication_queue : Queue :
1051 Queue for global communications with the controller
1052 log_file_queue : Queue :
1053 Queue for writing log file messages
1054 data_in_queue : Queue :
1055 Queue from which data will be read by the environment
1056 data_out_queue : Queue :
1057 Queue to which data will be written that will be output by the hardware.
1059 """
1061 # Create vibration queues
1062 queue_container = TimeQueues(
1063 input_queue,
1064 gui_update_queue,
1065 controller_communication_queue,
1066 data_in_queue,
1067 data_out_queue,
1068 log_file_queue,
1069 )
1071 process_class = TimeEnvironment(
1072 environment_name, queue_container, acquisition_active, output_active
1073 )
1074 process_class.run()