1# -*- coding: utf-8 -*-
2"""
3Synthetic "hardware" that allows the responses to be simulated by integrating
4linear equations of motion using state space matrices, A, B, C, and D.
5
6Rattlesnake Vibration Control Software
7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
9Government retains certain rights in this software.
10
11This program is free software: you can redistribute it and/or modify
12it under the terms of the GNU General Public License as published by
13the Free Software Foundation, either version 3 of the License, or
14(at your option) any later version.
15
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License for more details.
20
21You should have received a copy of the GNU General Public License
22along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""
24
25import multiprocessing as mp
26import os
27import time
28from typing import List
29
30import numpy as np
31import scipy.signal as signal
32from scipy.io import loadmat
33
34from .abstract_hardware import HardwareAcquisition, HardwareOutput
35from .utilities import Channel, DataAcquisitionParameters, flush_queue
36
37
38class StateSpaceAcquisition(HardwareAcquisition):
39 """Class defining the interface between the controller and synthetic acquisition
40
41 This class defines the interfaces between the controller and the
42 data acquisition portion of the hardware. In this case, the hardware is
43 actually simulated by integrating state space matrices, A, B, C, and D.
44 It is run by the Acquisition process, and must define how to get data from
45 the test hardware into the controller.
46 """
47
48 def __init__(self, state_space_file: str, queue: mp.queues.Queue):
49 """Loads in the state space file and sets initial parameters to null values
50
51
52 Parameters
53 ----------
54 state_space_file : str :
55 Path to the file containing state space matrices A, B, C, and D.
56 queue : mp.queues.Queue
57 A queue that passes input data from the StateSpaceOutput class to
58 this class. Normally, this data transfer would occur through
59 the physical test object: the exciters would excite the test object
60 with the specified excitation and the Acquisition would record the
61 responses to that excitation. In the synthetic case, we need to
62 pass the output data to the acquisition which does the integration.
63
64 """
65 _, extension = os.path.splitext(state_space_file)
66
67 if extension.lower() == ".npz":
68 data = np.load(state_space_file)
69 elif extension.lower() == ".mat":
70 data = loadmat(state_space_file)
71 else:
72 raise ValueError(
73 f"Unknown extension to file {state_space_file}, "
74 f"should be .npz or .mat, not {extension}"
75 )
76 self.system = signal.StateSpace(data["A"], data["B"], data["C"], data["D"])
77 self.times = None
78 self.state = np.zeros(data["A"].shape[0])
79 self.frame_time = None
80 self.queue = queue
81 self.force_buffer = None
82 self.integration_oversample = None
83 self.acquisition_delay = None
84 self.response_channels: np.ndarray
85 self.response_channels = None
86
87 def set_up_data_acquisition_parameters_and_channels(
88 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
89 ):
90 """
91 Initialize the hardware and set up channels and sampling properties
92
93 The function must create channels on the hardware corresponding to
94 the channels in the test. It must also set the sampling rates.
95
96 Parameters
97 ----------
98 test_data : DataAcquisitionParameters :
99 A container containing the data acquisition parameters for the
100 controller set by the user.
101 channel_data : List[Channel] :
102 A list of ``Channel`` objects defining the channels in the test
103
104 Returns
105 -------
106 None.
107
108 """
109 self.create_response_channels(channel_data)
110 self.set_parameters(test_data)
111
112 def create_response_channels(self, channel_data: List[Channel]):
113 """Method to set up response channels
114
115 This function takes channels from the supplied list of channels and
116 extracts the mode shape coefficients corresponding to those channels.
117
118 Parameters
119 ----------
120 channel_data : List[Channel] :
121 A list of ``Channel`` objects defining the channels in the test
122
123 """
124 # print('{:} Channels'.format(len(channel_data)))
125 self.response_channels = np.array(
126 [
127 channel.feedback_device is None or channel.feedback_device == ""
128 for channel in channel_data
129 ],
130 dtype="bool",
131 )
132 # Need to add a signal buffer in case the write size is not equal to
133 # the read size
134 self.force_buffer = np.zeros((0, np.sum(~self.response_channels)))
135
136 def set_parameters(self, test_data: DataAcquisitionParameters):
137 """Method to set up sampling rate and other test parameters
138
139 For the synthetic case, we will set up the integration parameters using
140 the sample rates provided.
141
142 Parameters
143 ----------
144 test_data : DataAcquisitionParameters :
145 A container containing the data acquisition parameters for the
146 controller set by the user.
147
148 """
149 self.integration_oversample = test_data.output_oversample
150 # Need to get one more sample than you would think because lsim doesn't bridge the gap
151 # between integrations
152 self.times = np.arange(test_data.samples_per_read * self.integration_oversample + 1) / (
153 test_data.sample_rate * self.integration_oversample
154 )
155 self.frame_time = test_data.samples_per_read / test_data.sample_rate
156 self.acquisition_delay = test_data.samples_per_write / test_data.output_oversample
157
158 def start(self):
159 """Method to start acquiring data.
160
161 For the synthetic case, it simply initializes the state of the system to zero"""
162 self.state[:] = 0
163
164 def get_acquisition_delay(self) -> int:
165 """
166 Get the number of samples between output and acquisition.
167
168 This function returns the number of samples that need to be read to
169 ensure that the last output is read by the acquisition. If there is
170 buffering in the output, this delay should be adjusted accordingly.
171
172 Returns
173 -------
174 int
175 Number of samples between when a dataset is written to the output
176 and when it has finished playing.
177
178 """
179 return self.acquisition_delay
180
181 def read(self):
182 """Method to read a frame of data from the hardware
183
184 This function gets the force from the output queue and adds it to the
185 buffer of time signals that represents the force. It then integrates
186 a frame of time and sends it to the acquisition.
187
188 Returns
189 -------
190 read_data :
191 2D Data read from the controller with shape ``n_channels`` x
192 ``n_samples``
193
194 """
195 start_time = time.time()
196 while self.force_buffer.shape[0] < self.times.size:
197 try:
198 forces = self.queue.get(timeout=self.frame_time)
199 except mp.queues.Empty:
200 # If we don't get an output in time, this likely means output
201 # has stopped so just put zeros.
202 forces = np.zeros((self.force_buffer.shape[-1], self.times.size))
203 self.force_buffer = np.concatenate((self.force_buffer, forces.T), axis=0)
204
205 # Now extract a force that is the correct size
206 this_force = self.force_buffer[: self.times.size]
207 # And leave the rest for next time
208 # Note we have to keep the last force sample still on the
209 # buffer because it will be the next force sample we use
210 self.force_buffer = self.force_buffer[self.times.size - 1 :]
211
212 _, sys_out, x_out = signal.lsim(self.system, this_force, self.times, self.state)
213
214 self.state[:] = x_out[-1]
215
216 integration_time = time.time() - start_time
217 remaining_time = self.frame_time - integration_time
218 if remaining_time > 0.0:
219 time.sleep(remaining_time)
220
221 # We don't want to return the last sample because it
222 # will be the initial state for the next sample
223 return sys_out.T[..., : -1 : self.integration_oversample]
224
225 def read_remaining(self):
226 """Method to read the rest of the data on the acquisition
227
228 This function simply returns one sample of zeros.
229
230 Returns
231 -------
232 read_data :
233 2D Data read from the controller with shape ``n_channels`` x
234 ``n_samples``
235 """
236 return np.zeros((len(self.response_channels), 1))
237
238 def stop(self):
239 """Method to stop the acquisition.
240
241 This simply sets the state to zero."""
242 self.state[:] = 0
243
244 def close(self):
245 """Method to close down the hardware"""
246
247
248class StateSpaceOutput(HardwareOutput):
249 """Class defining the interface between the controller and synthetic output
250
251 Note that the only thing that this class does is pass data to the acquisition
252 hardware task which actually performs the integration. Therefore, many of
253 the functions here are actually empty."""
254
255 def __init__(self, queue: mp.queues.Queue):
256 """
257 Initializes the hardware by simply storing the data passing queue.
258
259 Parameters
260 ----------
261 queue : mp.queues.Queue
262 Queue used to pass data from output to acquisition for integration.
263 See ``StateSpaceAcquisition.__init__``
264
265 """
266 self.queue = queue
267
268 def set_up_data_output_parameters_and_channels(
269 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
270 ):
271 """
272 Initialize the hardware and set up sources and sampling properties
273
274 This does nothing for the synthetic hardware
275
276 Parameters
277 ----------
278 test_data : DataAcquisitionParameters :
279 A container containing the data acquisition parameters for the
280 controller set by the user.
281 channel_data : List[Channel] :
282 A list of ``Channel`` objects defining the channels in the test
283
284 Returns
285 -------
286 None.
287
288 """
289
290 def start(self):
291 """Method to start acquiring data
292
293 Does nothing for synthetic hardware."""
294
295 def write(self, data: np.ndarray):
296 """Method to write a frame of data
297
298 For the synthetic excitation, this simply puts the data into the data-
299 passing queue.
300
301 Parameters
302 ----------
303 data : np.ndarray
304 Data to write to the output.
305
306 """
307 self.queue.put(data)
308
309 def stop(self):
310 """Method to stop the acquisition
311
312 Does nothing for synthetic hardware."""
313 flush_queue(self.queue)
314
315 def close(self):
316 """Method to close down the hardware
317
318 Does nothing for synthetic hardware."""
319
320 def ready_for_new_output(self):
321 """Signals that the hardware is ready for new output
322
323 Returns ``True`` if the data-passing queue is empty.
324 """
325 return self.queue.empty()