Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / data_physics_hardware.py: 22%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Hardware definition that allows for the Data Physics Quattro Device to be run 

4with Rattlesnake. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24 

25import multiprocessing as mp 

26import time 

27from typing import List 

28 

29import numpy as np 

30 

31from .abstract_hardware import HardwareAcquisition, HardwareOutput 

32from .data_physics_interface import DPQuattro, QuattroCoupling, QuattroStatus 

33from .utilities import Channel, DataAcquisitionParameters 

34 

35BUFFER_SIZE_FACTOR = 3 

36SLEEP_FACTOR = 10 

37 

38 

39class DataPhysicsAcquisition(HardwareAcquisition): 

40 """Class defining the interface between the controller and Data Physics 

41 hardware 

42 

43 This class defines the interfaces between the controller and the 

44 Data Physics hardware that runs their open API. It is run by the Acquisition 

45 process, and must define how to get data from the test hardware into the 

46 controller.""" 

47 

48 def __init__(self, dll_path: str, queue: mp.queues.Queue): 

49 """ 

50 Initializes the data physics hardware interface. 

51 

52 Parameters 

53 ---------- 

54 dll_path : str 

55 Path to the DpQuattro.dll file that defines 

56 queue : mp.queues.Queue 

57 Multiprocessing queue used to pass output data from the output task 

58 to the acquisition task because Quattro runs on a single processor 

59 

60 Returns 

61 ------- 

62 None. 

63 

64 """ 

65 self.quattro = DPQuattro(dll_path) 

66 self.buffer_size = 2**24 

67 self.input_channel_order = [] 

68 self.input_couplings = [QuattroCoupling.DC_DIFFERENTIAL] * 4 

69 self.input_ranges = [10.0] * 4 

70 self.input_sensitivities = [1.0] * 4 

71 self.output_channel_order = [] 

72 self.output_ranges = [10.0] * 2 

73 self.output_sensitivities = [1.0] * 2 

74 self.data_acquisition_parameters = None 

75 self.output_data_queue = queue 

76 self.read_data = None 

77 self.time_per_read = None 

78 

79 def set_up_data_acquisition_parameters_and_channels( 

80 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

81 ): 

82 """ 

83 Initialize the hardware and set up channels and sampling properties 

84 

85 The function must create channels on the hardware corresponding to 

86 the channels in the test. It must also set the sampling rates. 

87 

88 Parameters 

89 ---------- 

90 test_data : DataAcquisitionParameters : 

91 A container containing the data acquisition parameters for the 

92 controller set by the user. 

93 channel_data : List[Channel] : 

94 A list of ``Channel`` objects defining the channels in the test 

95 

96 Returns 

97 ------- 

98 None. 

99 

100 """ 

101 # Store data acquisition parameters for later 

102 self.data_acquisition_parameters = test_data 

103 self.time_per_read = test_data.samples_per_read / test_data.sample_rate 

104 

105 # End the measurement if necessary 

106 if self.quattro.status == QuattroStatus.RUNNING: 

107 self.quattro.stop() 

108 if ( 

109 self.quattro.status == QuattroStatus.STOPPED 

110 or self.quattro.status == QuattroStatus.INIT 

111 ): 

112 self.quattro.end() 

113 if self.quattro.status == QuattroStatus.DISCONNECTED: 

114 self.quattro.connect() 

115 

116 # Set up defaults that will be overwritten 

117 self.input_channel_order = [] 

118 self.input_couplings = [QuattroCoupling.DC_DIFFERENTIAL] * 4 

119 self.input_ranges = [10.0] * 4 

120 self.input_sensitivities = [1.0] * 4 

121 self.output_channel_order = [] 

122 self.output_ranges = [10.0] * 2 

123 self.output_sensitivities = [1.0] * 2 

124 

125 # Set the sample rate 

126 self.quattro.set_sample_rate(test_data.sample_rate) 

127 

128 # Set the buffer size ensuring that it is more that 4096 

129 # self.buffer_size = (BUFFER_SIZE_FACTOR+1)*max( 

130 # test_data.samples_per_write,test_data.samples_per_read) 

131 # if self.buffer_size < 8192: 

132 # self.buffer_size = 8192 

133 self.quattro.set_buffer_size(self.buffer_size) 

134 # print('Buffer Size: {:}'.format(self.buffer_size)) 

135 

136 # Set up channel parameters 

137 for channel in channel_data: 

138 # Figure out if the channel is an output channel or just acquisition 

139 is_output = not (channel.feedback_device is None) and not ( 

140 channel.feedback_device.strip() == "" 

141 ) 

142 

143 # Get the channel index from physical device 

144 channel_index = int(channel.physical_channel) - 1 

145 

146 # Track the channel order so we can rearrange measurements upon read 

147 self.input_channel_order.append(channel_index) 

148 

149 # Set the values in the arrays appropriately given the channel index 

150 if channel.coupling.lower() in ["ac differential", "ac diff", "ac"]: 

151 self.input_couplings[channel_index] = QuattroCoupling.AC_DIFFERENTIAL 

152 elif channel.coupling.lower() in ["dc differential", "dc diff", "dc"]: 

153 self.input_couplings[channel_index] = QuattroCoupling.DC_DIFFERENTIAL 

154 elif channel.coupling.lower() in [ 

155 "ac single ended", 

156 "ac single-ended", 

157 "ac single", 

158 ]: 

159 self.input_couplings[channel_index] = QuattroCoupling.AC_SINGLE_ENDED 

160 elif channel.coupling.lower() in [ 

161 "dc single ended", 

162 "dc single-ended", 

163 "dc single", 

164 ]: 

165 self.input_couplings[channel_index] = QuattroCoupling.DC_SINGLE_ENDED 

166 elif channel.coupling.lower() in ["iepe", "icp", "ac icp", "ccld"]: 

167 self.input_couplings[channel_index] = QuattroCoupling.AC_COUPLED_IEPE 

168 self.input_sensitivities[channel_index] = ( 

169 1.0 if is_output else float(channel.sensitivity) / 1000 

170 ) 

171 self.input_ranges[channel_index] = 10.0 if is_output else float(channel.maximum_value) 

172 

173 # Set up the output 

174 if is_output: 

175 channel_index = int(channel.feedback_channel) - 1 

176 self.output_channel_order.append(channel_index) 

177 self.output_ranges[channel_index] = float(channel.maximum_value) 

178 

179 # Now send the data to the quattro device 

180 self.quattro.setup_input_parameters( 

181 self.input_couplings, self.input_sensitivities, self.input_ranges 

182 ) 

183 self.quattro.setup_output_parameters(self.output_sensitivities, self.output_ranges) 

184 

185 def start(self): 

186 """Method to start acquiring data from the hardware""" 

187 self.read_data = [] 

188 self.quattro.initialize() 

189 self.quattro.start() 

190 

191 def read(self) -> np.ndarray: 

192 """Method to read a frame of data from the hardware that returns 

193 an appropriately sized np.ndarray""" 

194 while ( 

195 np.sum([data.shape[-1] for data in self.read_data]) 

196 < self.data_acquisition_parameters.samples_per_read 

197 ): 

198 # Check if we need to output anything 

199 self.get_and_write_output_data() 

200 # Check how many samples are available 

201 samples_available = self.quattro.get_available_input_data_samples() 

202 # print('{:} Samples Available to Read'.format(samples_available)) 

203 # Read that many data samples and put it to the "read_data" array 

204 # Make sure we rearrange the channels correctly per the rattlesnake 

205 # channel table using self.input_channel_order 

206 if samples_available > 0: 

207 self.read_data.append( 

208 self.quattro.read_input_data(samples_available)[self.input_channel_order] 

209 ) 

210 # Pause for a bit to allow more samples to accumulate 

211 time.sleep(self.time_per_read / SLEEP_FACTOR) 

212 # After we finish getting enough samples for a read, we can split the 

213 # read data into the number of samples requested, and put the remainder 

214 # as the start of the next self.read_data list. 

215 read_data = np.concatenate(self.read_data, axis=-1) 

216 self.read_data = [read_data[:, self.data_acquisition_parameters.samples_per_read :]] 

217 return read_data[:, : self.data_acquisition_parameters.samples_per_read] 

218 

219 def read_remaining(self) -> np.ndarray: 

220 """Method to read the rest of the data on the acquisition from the hardware 

221 that returns an appropriately sized np.ndarray""" 

222 # Check if we need to output anything 

223 self.get_and_write_output_data() 

224 # Check how many samples are available 

225 samples_available = 0 

226 # Wait until some arrive 

227 while samples_available == 0: 

228 samples_available = self.quattro.get_available_input_data_samples() 

229 # Read that many data samples and put it to the "read_data" array 

230 # Make sure we rearrange the channels correctly per the rattlesnake 

231 # channel table using self.input_channel_order 

232 self.read_data.append( 

233 self.quattro.read_input_data(samples_available)[self.input_channel_order] 

234 ) 

235 # Pause for a bit to allow more samples to accumulate 

236 time.sleep(self.time_per_read / SLEEP_FACTOR) 

237 # After we finish getting enough samples for a read, we can split the 

238 # read data into the number of samples requested, and put the remainder 

239 # as the start of the next self.read_data list. 

240 read_data = np.concatenate(self.read_data, axis=-1) 

241 self.read_data = [read_data[:, self.data_acquisition_parameters.samples_per_read :]] 

242 read_data = np.concatenate(self.read_data, axis=-1) 

243 return read_data 

244 

245 def stop(self): 

246 """Method to stop the acquisition""" 

247 self.quattro.stop() 

248 self.quattro.end() 

249 

250 def close(self): 

251 """Method to close down the hardware""" 

252 self.quattro.disconnect() 

253 

254 def get_acquisition_delay(self) -> int: 

255 """Get the number of samples between output and acquisition 

256 

257 This function is designed to handle buffering done in the output 

258 hardware, ensuring that all data written to the output is read by the 

259 acquisition. If a output hardware has a buffer, there may be a non- 

260 negligable delay between when output is written to the device and 

261 actually played out from the device.""" 

262 return BUFFER_SIZE_FACTOR * self.data_acquisition_parameters.samples_per_write 

263 

264 def get_and_write_output_data(self, block: bool = False): 

265 """ 

266 Checks to see if there is any data on the output queue that needs to be 

267 written to the hardware. 

268 

269 Parameters 

270 ---------- 

271 block : bool, optional 

272 If True, this function will wait until the data appears with a timeout 

273 of 10 seconds. Otherwise it will simply return if there is no 

274 data available. The default is False. 

275 

276 Raises 

277 ------ 

278 RuntimeError 

279 Raised if the timeout occurs while waiting for data while blocking 

280 

281 Returns 

282 ------- 

283 None. 

284 

285 """ 

286 samples_on_buffer = self.quattro.get_total_output_samples_on_buffer() 

287 # print( 

288 # f"{samples_on_buffer} Samples on Output Buffer, " 

289 # f"(<{self.data_acquisition_parameters.samples_per_write} to output more)" 

290 # ) 

291 if not block and samples_on_buffer >= self.data_acquisition_parameters.samples_per_write: 

292 # print('Too much data on buffer, not putting new data') 

293 return 

294 try: 

295 data = self.output_data_queue.get(block, timeout=10) 

296 # print('Got New Data from queue') 

297 except mp.queues.Empty as e: 

298 # print('Did not get new data from queue') 

299 if block: 

300 raise RuntimeError( 

301 "Did not receive output in a reasonable amount of time, check " 

302 "output process and output hardware for issues" 

303 ) from e 

304 # Otherwise just return because there's no data available 

305 return 

306 # If we did get output, we need to put it into a numpy array that we can 

307 # send to the daq 

308 outputs = np.zeros((2, self.data_acquisition_parameters.samples_per_write)) 

309 outputs[self.output_channel_order] = data 

310 # Send the outputs to the daq 

311 self.quattro.write_output_data(outputs) 

312 return 

313 

314 

315class DataPhysicsOutput(HardwareOutput): 

316 """Abstract class defining the interface between the controller and output 

317 

318 This class defines the interfaces between the controller and the 

319 output or source portion of the hardware. It is run by the Output 

320 process, and must define how to get write data to the hardware from the 

321 control system""" 

322 

323 def __init__(self, queue: mp.queues.Queue): 

324 """ 

325 Initializes the hardware by simply storing the data passing queue 

326 

327 Parameters 

328 ---------- 

329 queue : mp.queues.Queue 

330 Queue used to pass data from output to acquisition 

331 

332 Returns 

333 ------- 

334 None. 

335 

336 """ 

337 self.queue = queue 

338 

339 def set_up_data_output_parameters_and_channels( 

340 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

341 ): 

342 """ 

343 Initialize the hardware and set up sources and sampling properties 

344 

345 The function must create channels on the hardware corresponding to 

346 the sources in the test. It must also set the sampling rates. 

347 

348 Parameters 

349 ---------- 

350 test_data : DataAcquisitionParameters : 

351 A container containing the data acquisition parameters for the 

352 controller set by the user. 

353 channel_data : List[Channel] : 

354 A list of ``Channel`` objects defining the channels in the test 

355 

356 Returns 

357 ------- 

358 None. 

359 

360 """ 

361 

362 def start(self): 

363 """Method to start outputting data to the hardware""" 

364 

365 def write(self, data): 

366 """Method to write a np.ndarray with a frame of data to the hardware""" 

367 self.queue.put(data) 

368 

369 def stop(self): 

370 """Method to stop the output""" 

371 

372 def close(self): 

373 """Method to close down the hardware""" 

374 

375 def ready_for_new_output(self) -> bool: 

376 """Method that returns true if the hardware should accept a new signal 

377 

378 Returns ``True`` if the data-passing queue is empty.""" 

379 return self.queue.empty()