Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / state_space_virtual_hardware.py: 37%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Synthetic "hardware" that allows the responses to be simulated by integrating 

4linear equations of motion using state space matrices, A, B, C, and D. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24 

25import multiprocessing as mp 

26import os 

27import time 

28from typing import List 

29 

30import numpy as np 

31import scipy.signal as signal 

32from scipy.io import loadmat 

33 

34from .abstract_hardware import HardwareAcquisition, HardwareOutput 

35from .utilities import Channel, DataAcquisitionParameters, flush_queue 

36 

37 

38class StateSpaceAcquisition(HardwareAcquisition): 

39 """Class defining the interface between the controller and synthetic acquisition 

40 

41 This class defines the interfaces between the controller and the 

42 data acquisition portion of the hardware. In this case, the hardware is 

43 actually simulated by integrating state space matrices, A, B, C, and D. 

44 It is run by the Acquisition process, and must define how to get data from 

45 the test hardware into the controller. 

46 """ 

47 

48 def __init__(self, state_space_file: str, queue: mp.queues.Queue): 

49 """Loads in the state space file and sets initial parameters to null values 

50 

51 

52 Parameters 

53 ---------- 

54 state_space_file : str : 

55 Path to the file containing state space matrices A, B, C, and D. 

56 queue : mp.queues.Queue 

57 A queue that passes input data from the StateSpaceOutput class to 

58 this class. Normally, this data transfer would occur through 

59 the physical test object: the exciters would excite the test object 

60 with the specified excitation and the Acquisition would record the 

61 responses to that excitation. In the synthetic case, we need to 

62 pass the output data to the acquisition which does the integration. 

63 

64 """ 

65 _, extension = os.path.splitext(state_space_file) 

66 

67 if extension.lower() == ".npz": 

68 data = np.load(state_space_file) 

69 elif extension.lower() == ".mat": 

70 data = loadmat(state_space_file) 

71 else: 

72 raise ValueError( 

73 f"Unknown extension to file {state_space_file}, " 

74 f"should be .npz or .mat, not {extension}" 

75 ) 

76 self.system = signal.StateSpace(data["A"], data["B"], data["C"], data["D"]) 

77 self.times = None 

78 self.state = np.zeros(data["A"].shape[0]) 

79 self.frame_time = None 

80 self.queue = queue 

81 self.force_buffer = None 

82 self.integration_oversample = None 

83 self.acquisition_delay = None 

84 self.response_channels: np.ndarray 

85 self.response_channels = None 

86 

87 def set_up_data_acquisition_parameters_and_channels( 

88 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

89 ): 

90 """ 

91 Initialize the hardware and set up channels and sampling properties 

92 

93 The function must create channels on the hardware corresponding to 

94 the channels in the test. It must also set the sampling rates. 

95 

96 Parameters 

97 ---------- 

98 test_data : DataAcquisitionParameters : 

99 A container containing the data acquisition parameters for the 

100 controller set by the user. 

101 channel_data : List[Channel] : 

102 A list of ``Channel`` objects defining the channels in the test 

103 

104 Returns 

105 ------- 

106 None. 

107 

108 """ 

109 self.create_response_channels(channel_data) 

110 self.set_parameters(test_data) 

111 

112 def create_response_channels(self, channel_data: List[Channel]): 

113 """Method to set up response channels 

114 

115 This function takes channels from the supplied list of channels and 

116 extracts the mode shape coefficients corresponding to those channels. 

117 

118 Parameters 

119 ---------- 

120 channel_data : List[Channel] : 

121 A list of ``Channel`` objects defining the channels in the test 

122 

123 """ 

124 # print('{:} Channels'.format(len(channel_data))) 

125 self.response_channels = np.array( 

126 [ 

127 channel.feedback_device is None or channel.feedback_device == "" 

128 for channel in channel_data 

129 ], 

130 dtype="bool", 

131 ) 

132 # Need to add a signal buffer in case the write size is not equal to 

133 # the read size 

134 self.force_buffer = np.zeros((0, np.sum(~self.response_channels))) 

135 

136 def set_parameters(self, test_data: DataAcquisitionParameters): 

137 """Method to set up sampling rate and other test parameters 

138 

139 For the synthetic case, we will set up the integration parameters using 

140 the sample rates provided. 

141 

142 Parameters 

143 ---------- 

144 test_data : DataAcquisitionParameters : 

145 A container containing the data acquisition parameters for the 

146 controller set by the user. 

147 

148 """ 

149 self.integration_oversample = test_data.output_oversample 

150 # Need to get one more sample than you would think because lsim doesn't bridge the gap 

151 # between integrations 

152 self.times = np.arange(test_data.samples_per_read * self.integration_oversample + 1) / ( 

153 test_data.sample_rate * self.integration_oversample 

154 ) 

155 self.frame_time = test_data.samples_per_read / test_data.sample_rate 

156 self.acquisition_delay = test_data.samples_per_write / test_data.output_oversample 

157 

158 def start(self): 

159 """Method to start acquiring data. 

160 

161 For the synthetic case, it simply initializes the state of the system to zero""" 

162 self.state[:] = 0 

163 

164 def get_acquisition_delay(self) -> int: 

165 """ 

166 Get the number of samples between output and acquisition. 

167 

168 This function returns the number of samples that need to be read to 

169 ensure that the last output is read by the acquisition. If there is 

170 buffering in the output, this delay should be adjusted accordingly. 

171 

172 Returns 

173 ------- 

174 int 

175 Number of samples between when a dataset is written to the output 

176 and when it has finished playing. 

177 

178 """ 

179 return self.acquisition_delay 

180 

181 def read(self): 

182 """Method to read a frame of data from the hardware 

183 

184 This function gets the force from the output queue and adds it to the 

185 buffer of time signals that represents the force. It then integrates 

186 a frame of time and sends it to the acquisition. 

187 

188 Returns 

189 ------- 

190 read_data : 

191 2D Data read from the controller with shape ``n_channels`` x 

192 ``n_samples`` 

193 

194 """ 

195 start_time = time.time() 

196 while self.force_buffer.shape[0] < self.times.size: 

197 try: 

198 forces = self.queue.get(timeout=self.frame_time) 

199 except mp.queues.Empty: 

200 # If we don't get an output in time, this likely means output 

201 # has stopped so just put zeros. 

202 forces = np.zeros((self.force_buffer.shape[-1], self.times.size)) 

203 self.force_buffer = np.concatenate((self.force_buffer, forces.T), axis=0) 

204 

205 # Now extract a force that is the correct size 

206 this_force = self.force_buffer[: self.times.size] 

207 # And leave the rest for next time 

208 # Note we have to keep the last force sample still on the 

209 # buffer because it will be the next force sample we use 

210 self.force_buffer = self.force_buffer[self.times.size - 1 :] 

211 

212 _, sys_out, x_out = signal.lsim(self.system, this_force, self.times, self.state) 

213 

214 self.state[:] = x_out[-1] 

215 

216 integration_time = time.time() - start_time 

217 remaining_time = self.frame_time - integration_time 

218 if remaining_time > 0.0: 

219 time.sleep(remaining_time) 

220 

221 # We don't want to return the last sample because it 

222 # will be the initial state for the next sample 

223 return sys_out.T[..., : -1 : self.integration_oversample] 

224 

225 def read_remaining(self): 

226 """Method to read the rest of the data on the acquisition 

227 

228 This function simply returns one sample of zeros. 

229 

230 Returns 

231 ------- 

232 read_data : 

233 2D Data read from the controller with shape ``n_channels`` x 

234 ``n_samples`` 

235 """ 

236 return np.zeros((len(self.response_channels), 1)) 

237 

238 def stop(self): 

239 """Method to stop the acquisition. 

240 

241 This simply sets the state to zero.""" 

242 self.state[:] = 0 

243 

244 def close(self): 

245 """Method to close down the hardware""" 

246 

247 

248class StateSpaceOutput(HardwareOutput): 

249 """Class defining the interface between the controller and synthetic output 

250 

251 Note that the only thing that this class does is pass data to the acquisition 

252 hardware task which actually performs the integration. Therefore, many of 

253 the functions here are actually empty.""" 

254 

255 def __init__(self, queue: mp.queues.Queue): 

256 """ 

257 Initializes the hardware by simply storing the data passing queue. 

258 

259 Parameters 

260 ---------- 

261 queue : mp.queues.Queue 

262 Queue used to pass data from output to acquisition for integration. 

263 See ``StateSpaceAcquisition.__init__`` 

264 

265 """ 

266 self.queue = queue 

267 

268 def set_up_data_output_parameters_and_channels( 

269 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

270 ): 

271 """ 

272 Initialize the hardware and set up sources and sampling properties 

273 

274 This does nothing for the synthetic hardware 

275 

276 Parameters 

277 ---------- 

278 test_data : DataAcquisitionParameters : 

279 A container containing the data acquisition parameters for the 

280 controller set by the user. 

281 channel_data : List[Channel] : 

282 A list of ``Channel`` objects defining the channels in the test 

283 

284 Returns 

285 ------- 

286 None. 

287 

288 """ 

289 

290 def start(self): 

291 """Method to start acquiring data 

292 

293 Does nothing for synthetic hardware.""" 

294 

295 def write(self, data: np.ndarray): 

296 """Method to write a frame of data 

297 

298 For the synthetic excitation, this simply puts the data into the data- 

299 passing queue. 

300 

301 Parameters 

302 ---------- 

303 data : np.ndarray 

304 Data to write to the output. 

305 

306 """ 

307 self.queue.put(data) 

308 

309 def stop(self): 

310 """Method to stop the acquisition 

311 

312 Does nothing for synthetic hardware.""" 

313 flush_queue(self.queue) 

314 

315 def close(self): 

316 """Method to close down the hardware 

317 

318 Does nothing for synthetic hardware.""" 

319 

320 def ready_for_new_output(self): 

321 """Signals that the hardware is ready for new output 

322 

323 Returns ``True`` if the data-passing queue is empty. 

324 """ 

325 return self.queue.empty()