1"""
2Synthetic "hardware" that allows the responses to be simulated by integrating
3linear equations of motion.
4
5Rattlesnake Vibration Control Software
6Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
7(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
8Government retains certain rights in this software.
9
10This program is free software: you can redistribute it and/or modify
11it under the terms of the GNU General Public License as published by
12the Free Software Foundation, either version 3 of the License, or
13(at your option) any later version.
14
15This program is distributed in the hope that it will be useful,
16but WITHOUT ANY WARRANTY; without even the implied warranty of
17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18GNU General Public License for more details.
19
20You should have received a copy of the GNU General Public License
21along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""
23
24import multiprocessing as mp
25import time
26from typing import List
27
28import numpy as np
29import scipy.signal as signal
30
31from .abstract_hardware import HardwareAcquisition, HardwareOutput
32from .utilities import Channel, DataAcquisitionParameters, flush_queue
33
34_direction_map = {
35 "X+": 1,
36 "X": 1,
37 "+X": 1,
38 "Y+": 2,
39 "Y": 2,
40 "+Y": 2,
41 "Z+": 3,
42 "Z": 3,
43 "+Z": 3,
44 "RX+": 4,
45 "RX": 4,
46 "+RX": 4,
47 "RY+": 5,
48 "RY": 5,
49 "+RY": 5,
50 "RZ+": 6,
51 "RZ": 6,
52 "+RZ": 6,
53 "X-": -1,
54 "-X": -1,
55 "Y-": -2,
56 "-Y": -2,
57 "Z-": -3,
58 "-Z": -3,
59 "RX-": -4,
60 "-RX": -4,
61 "RY-": -5,
62 "-RY": -5,
63 "RZ-": -6,
64 "-RZ": -6,
65 "": 0,
66 None: 0,
67}
68
69DEBUG = False
70
71if DEBUG:
72 from glob import glob
73
74 FILE_OUTPUT = "debug_data/sdynpy_hardware_{:}.npz"
75
76
77class SDynPySystemAcquisition(HardwareAcquisition):
78 """Class defining the interface between the controller and synthetic acquisition
79
80 This class defines the interfaces between the controller and the data
81 acquisition portion of the hardware. In this case, the hardware is simulated
82 by integrating state space matrices derived from a SDynPy system object.
83 It is run by the acquisition process, and must define how to get data from
84 the test hardware into the controller.
85 """
86
87 def __init__(self, system_file: str, queue: mp.queues.Queue, sleep: bool = True):
88 """
89 Loads in the SDynPy system file and sets initial parameters to null
90 values.
91
92 Parameters
93 ----------
94 system_file : str
95 Path to the file containing state space the SDynPy system object
96 queue : mp.queues.Queue
97 A queue that passes input data from the SDynPySystemOutput class to
98 this class. Normally, this data transfer would occur through
99 the physical test object: the exciters would excite the test object
100 with the specified excitation and the Acquisition would record the
101 responses to that excitation. In the synthetic case, we need to
102 pass the output data to the acquisition which does the integration.
103 sleep : bool
104 If True, the integrator will wait the amount of time the calculation
105 would have took if it were real life, which adds a realistic delay
106 to simulate an actual measurement. If False, the integration will
107 proceed as fast as possible.
108
109 Returns
110 -------
111 None.
112
113 """
114 self.sdynpy_system_data = {key: val for key, val in np.load(system_file).items()}
115 self.system = None
116 self.times = None
117 self.state = None
118 self.frame_time = None
119 self.queue = queue
120 self.force_buffer = None
121 self.integration_oversample = None
122 self.response_channels: np.ndarray
123 self.output_channels: np.ndarray
124 self.response_channels = None
125 self.output_channels = None
126 self.acquisition_delay = None
127 self.sleep = sleep
128 # Create a dictionary of channels for faster lookup
129 self.channel_indices = {
130 tuple([abs(v) for v in val]): index
131 for index, val in enumerate(self.sdynpy_system_data["coordinate"])
132 }
133
134 def set_up_data_acquisition_parameters_and_channels(
135 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
136 ):
137 """
138 Initialize the hardware and set up channels and sampling properties
139
140 The function must create channels on the hardware corresponding to
141 the channels in the test. It must also set the sampling rates.
142
143 Parameters
144 ----------
145 test_data : DataAcquisitionParameters :
146 A container containing the data acquisition parameters for the
147 controller set by the user.
148 channel_data : List[Channel] :
149 A list of ``Channel`` objects defining the channels in the test
150
151 Returns
152 -------
153 None.
154
155 """
156 self.create_response_channels(channel_data)
157 self.set_parameters(test_data)
158
159 def create_response_channels(self, channel_data: List[Channel]):
160 """Method to set up response channels
161
162 This function takes channels from the supplied list of channels and
163 extracts the mode shape coefficients corresponding to those channels.
164
165 Parameters
166 ----------
167 channel_data : List[Channel] :
168 A list of ``Channel`` objects defining the channels in the test
169
170 """
171 # pylint: disable=invalid-name
172 # print('{:} Channels'.format(len(channel_data)))
173 self.response_channels = np.array(
174 [
175 channel.feedback_device is None or channel.feedback_device == ""
176 for channel in channel_data
177 ],
178 dtype="bool",
179 )
180 self.output_channels = ~self.response_channels
181 # Need to add a signal buffer in case the write size is not equal to
182 # the read size
183 self.force_buffer = np.zeros((0, np.sum(~self.response_channels)))
184
185 # Figure out which channels go with which indices
186 channel_indices = []
187 channel_signs = []
188 for channel in channel_data:
189 node_number = int(channel.node_number)
190 direction = _direction_map[channel.node_direction]
191 channel_index = self.channel_indices[(node_number, abs(direction))]
192 channel_indices.append(channel_index)
193 channel_signs.append(
194 np.sign(direction)
195 * np.sign(self.sdynpy_system_data["coordinate"][channel_index]["direction"])
196 )
197 channel_indices = np.array(channel_indices)
198 channel_signs = np.array(channel_signs)
199
200 # Now we need to actually go through and set up the A, B, C, D state matrices
201 M = self.sdynpy_system_data["mass"]
202 C = self.sdynpy_system_data["damping"]
203 K = self.sdynpy_system_data["stiffness"]
204
205 # Now we need to pull out the transformation matrices
206 phi = self.sdynpy_system_data["transformation"][channel_indices, :]
207 # Multiply by the signs
208 phi *= channel_signs[:, np.newaxis]
209
210 # Separate into responses and excitations; here input is into the system
211 phi_excitation = phi[self.output_channels, :].copy()
212 phi_response = phi[self.response_channels, :].copy()
213
214 # Set up some other parameters
215 ndofs = M.shape[0]
216 tdofs_response = phi_response.shape[0]
217 tdofs_input = phi_excitation.shape[0]
218
219 # Assembly the full state matrices
220
221 # A = [[ 0, I],
222 # [M^-1*K,M^-1*C]]
223
224 A_state = np.block(
225 [
226 [np.zeros((ndofs, ndofs)), np.eye(ndofs)],
227 [-np.linalg.solve(M, K), -np.linalg.solve(M, C)],
228 ]
229 )
230
231 # B = [[ 0],
232 # [ M^-1]]
233
234 B_state = np.block(
235 [[np.zeros((ndofs, tdofs_input))], [np.linalg.solve(M, phi_excitation.T)]]
236 )
237
238 # C = [[ I, 0], # Displacements
239 # [ 0, I], # Velocities
240 # [M^-1*K,M^-1*C], # Accelerations
241 # [ 0, 0]] # Forces
242
243 C_all = np.block(
244 [
245 [phi_response, np.zeros((tdofs_response, ndofs))],
246 [np.zeros((tdofs_response, ndofs)), phi_response],
247 [
248 -phi_response @ np.linalg.solve(M, K),
249 -phi_response @ np.linalg.solve(M, C),
250 ],
251 [np.zeros((tdofs_input, ndofs)), np.zeros((tdofs_input, ndofs))],
252 ]
253 )
254
255 # D = [[ 0], # Displacements
256 # [ 0], # Velocities
257 # [ M^-1], # Accelerations
258 # [ I]] # Forces
259
260 D_all = np.block(
261 [
262 [np.zeros((tdofs_response, tdofs_input))],
263 [np.zeros((tdofs_response, tdofs_input))],
264 [phi_response @ np.linalg.solve(M, phi_excitation.T)],
265 [np.eye(tdofs_input)],
266 ]
267 )
268
269 # Split into different types
270 displacement_indices = np.arange(tdofs_response)
271 velocity_indices = np.arange(tdofs_response) + tdofs_response
272 acceleration_indices = np.arange(tdofs_response) + 2 * tdofs_response
273 force_indices = np.arange(tdofs_input) + 3 * tdofs_response
274
275 C_disp = C_all[displacement_indices]
276 C_vel = C_all[velocity_indices]
277 C_accel = C_all[acceleration_indices]
278 C_force = C_all[force_indices]
279
280 D_disp = D_all[displacement_indices]
281 D_vel = D_all[velocity_indices]
282 D_accel = D_all[acceleration_indices]
283 D_force = D_all[force_indices]
284
285 # Now assemble the full response C and D matrices based on the data type
286 C_response = []
287 D_response = []
288 response_index = 0
289 for i, channel in enumerate(channel_data):
290 if self.output_channels[i]:
291 continue
292 if channel.channel_type.lower() in ["disp", "displacement"]:
293 C_response.append(C_disp[response_index])
294 D_response.append(D_disp[response_index])
295 elif channel.channel_type.lower() in ["vel", "velocity"]:
296 C_response.append(C_vel[response_index])
297 D_response.append(D_vel[response_index])
298 elif channel.channel_type.lower() in ["accel", "acceleration", "acc"]:
299 C_response.append(C_accel[response_index])
300 D_response.append(D_accel[response_index])
301 else:
302 print(f"Unknown Channel Type for Channel {i + 1}: {channel.channel_type}")
303 C_response.append(C_disp[response_index])
304 D_response.append(D_disp[response_index])
305 response_index += 1
306 C_response = np.array(C_response)
307 D_response = np.array(D_response)
308
309 # Now assemble the final C and D matrices
310 C_state = np.empty((len(channel_data), C_response.shape[-1]))
311 C_state[self.response_channels, :] = C_response
312 C_state[self.output_channels, :] = C_force
313 D_state = np.empty((len(channel_data), D_response.shape[-1]))
314 D_state[self.response_channels, :] = D_response
315 D_state[self.output_channels, :] = D_force
316 self.system = signal.StateSpace(A_state, B_state, C_state, D_state)
317 self.state = np.zeros(A_state.shape[0])
318 # np.savez('SDynPy_State.npz', A=A_state, B=B_state, C = C_state, D = D_state)
319
320 def set_parameters(self, test_data: DataAcquisitionParameters):
321 """Method to set up sampling rate and other test parameters
322
323 For the synthetic case, we will set up the integration parameters using
324 the sample rates provided.
325
326 Parameters
327 ----------
328 test_data : DataAcquisitionParameters :
329 A container containing the data acquisition parameters for the
330 controller set by the user.
331
332 """
333 self.integration_oversample = test_data.output_oversample
334 # Need to get one more sample than you would think because lsim doesn't bridge the gap
335 # between integrations
336 self.times = np.arange(test_data.samples_per_read * self.integration_oversample + 1) / (
337 test_data.sample_rate * self.integration_oversample
338 )
339 self.frame_time = test_data.samples_per_read / test_data.sample_rate
340 self.acquisition_delay = test_data.samples_per_write / test_data.output_oversample
341
342 def start(self):
343 """Method to start acquiring data.
344
345 For the synthetic case, it simply initializes the state of the system to zero"""
346 self.state[:] = 0
347
348 def get_acquisition_delay(self) -> int:
349 """
350 Get the number of samples between output and acquisition.
351
352 This function returns the number of samples that need to be read to
353 ensure that the last output is read by the acquisition. If there is
354 buffering in the output, this delay should be adjusted accordingly.
355
356 Returns
357 -------
358 int
359 Number of samples between when a dataset is written to the output
360 and when it has finished playing.
361
362 """
363 return self.acquisition_delay
364
365 def read(self):
366 """Method to read a frame of data from the hardware
367
368 This function gets the force from the output queue and adds it to the
369 buffer of time signals that represents the force. It then integrates
370 a frame of time and sends it to the acquisition.
371
372 Returns
373 -------
374 read_data :
375 2D Data read from the controller with shape ``n_channels`` x
376 ``n_samples``
377
378 """
379 start_time = time.time()
380 while self.force_buffer.shape[0] < self.times.size:
381 try:
382 forces = self.queue.get(timeout=self.frame_time)
383 except mp.queues.Empty:
384 # If we don't get an output in time, this likely means output has stopped
385 # so just put zeros.
386 print("Warning! SDynPy integrator ran out of samples!")
387 forces = np.zeros((self.force_buffer.shape[-1], self.times.size))
388 self.force_buffer = np.concatenate((self.force_buffer, forces.T), axis=0)
389
390 # Now extract a force that is the correct size
391 this_force = self.force_buffer[: self.times.size]
392 # And leave the rest for next time
393 # Note we have to keep the last force sample still on the
394 # buffer because it will be the next force sample we use
395 self.force_buffer = self.force_buffer[self.times.size - 1 :]
396
397 _, sys_out, x_out = signal.lsim(self.system, this_force, self.times, self.state)
398
399 self.state[:] = x_out[-1]
400
401 if DEBUG:
402 num_files = len(glob(FILE_OUTPUT.format("*")))
403 np.savez(
404 FILE_OUTPUT.format(num_files),
405 force_in=this_force.T,
406 response_out_full_resolution=sys_out.T[..., : -1 : self.integration_oversample],
407 response_out_downsampled=sys_out.T[..., :-1],
408 )
409
410 integration_time = time.time() - start_time
411 remaining_time = self.frame_time - integration_time
412 if remaining_time > 0.0 and self.sleep:
413 time.sleep(remaining_time)
414
415 # We don't want to return the last sample because it
416 # will be the initial state for the next sample
417 return sys_out.T[..., : -1 : self.integration_oversample]
418
419 def read_remaining(self):
420 """Method to read the rest of the data on the acquisition
421
422 This function simply returns one sample of zeros.
423
424 Returns
425 -------
426 read_data :
427 2D Data read from the controller with shape ``n_channels`` x
428 ``n_samples``
429 """
430 return np.zeros((len(self.response_channels), 1))
431
432 def stop(self):
433 """Method to stop the acquisition.
434
435 This simply sets the state to zero."""
436 self.state[:] = 0
437
438 def close(self):
439 """Method to close down the hardware"""
440
441
442class SDynPySystemOutput(HardwareOutput):
443 """Class defining the interface between the controller and synthetic output
444
445 Note that the only thing that this class does is pass data to the acquisition
446 hardware task which actually performs the integration. Therefore, many of
447 the functions here are actually empty."""
448
449 def __init__(self, queue: mp.queues.Queue):
450 """
451 Initializes the hardware by simply storing the data passing queue.
452
453 Parameters
454 ----------
455 queue : mp.queues.Queue
456 Queue used to pass data from output to acquisition for integration.
457 See ``StateSpaceAcquisition.__init__``
458
459 """
460 self.queue = queue
461
462 def set_up_data_output_parameters_and_channels(
463 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
464 ):
465 """
466 Initialize the hardware and set up sources and sampling properties
467
468 This does nothing for the synthetic hardware
469
470 Parameters
471 ----------
472 test_data : DataAcquisitionParameters :
473 A container containing the data acquisition parameters for the
474 controller set by the user.
475 channel_data : List[Channel] :
476 A list of ``Channel`` objects defining the channels in the test
477
478 Returns
479 -------
480 None.
481
482 """
483
484 def start(self):
485 """Method to start acquiring data
486
487 Does nothing for synthetic hardware."""
488
489 def write(self, data: np.ndarray):
490 """Method to write a frame of data
491
492 For the synthetic excitation, this simply puts the data into the data-
493 passing queue.
494
495 Parameters
496 ----------
497 data : np.ndarray
498 Data to write to the output.
499
500 """
501 self.queue.put(data)
502
503 def stop(self):
504 """Method to stop the acquisition
505
506 Does nothing for synthetic hardware."""
507 flush_queue(self.queue)
508
509 def close(self):
510 """Method to close down the hardware
511
512 Does nothing for synthetic hardware."""
513
514 def ready_for_new_output(self):
515 """Signals that the hardware is ready for new output
516
517 Returns ``True`` if the data-passing queue is empty.
518 """
519 return self.queue.empty()