Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / data_physics_dp900_hardware.py: 0%

169 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Hardware definition that allows for the Data Physics DP900 Device to be run 

4with Rattlesnake. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24 

25import multiprocessing as mp 

26import time 

27from typing import List 

28 

29import numpy as np 

30 

31from .abstract_hardware import HardwareAcquisition, HardwareOutput 

32from .utilities import Channel, DataAcquisitionParameters, flush_queue 

33 

34BUFFER_SIZE_FACTOR = 3 

35SLEEP_FACTOR = 10 

36 

37 

38class DataPhysicsDP900Acquisition(HardwareAcquisition): 

39 """Class defining the interface between the controller and Data Physics 

40 DP900 hardware 

41 

42 This class defines the interfaces between the controller and the 

43 Data Physics hardware that runs their open API. It is run by the Acquisition 

44 process, and must define how to get data from the test hardware into the 

45 controller.""" 

46 

47 def __init__(self, dll_path: str, queue: mp.queues.Queue): 

48 """ 

49 Initializes the data physics hardware interface. 

50 

51 Parameters 

52 ---------- 

53 dll_path : str 

54 Path to the Dp900Matlab.dll file that defines 

55 queue : mp.queues.Queue 

56 Multiprocessing queue used to pass output data from the output task 

57 to the acquisition task because DP900 runs on a single processor 

58 

59 Returns 

60 ------- 

61 None. 

62 

63 """ 

64 from .data_physics_dp900_interface import DP900, DP900Coupling, DP900Status 

65 

66 self.DP900Coupling = DP900Coupling 

67 self.DP900Status = DP900Status 

68 self.output_active = False 

69 self.dp900 = DP900(dll_path) 

70 self.buffer_size = 2**24 

71 self.input_bnc_indices = [] 

72 self.output_bnc_indices = [] 

73 self.input_channel_table_indices = [] 

74 self.output_channel_table_indices = [] 

75 self.channel_sorting = None 

76 self.output_sorting = None 

77 self.data_acquisition_parameters = None 

78 self.output_data_queue = queue 

79 self.time_per_read = None 

80 self.last_write_time = None 

81 

82 def set_up_data_acquisition_parameters_and_channels( 

83 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

84 ): 

85 """ 

86 Initialize the hardware and set up channels and sampling properties 

87 

88 The function must create channels on the hardware corresponding to 

89 the channels in the test. It must also set the sampling rates. 

90 

91 Parameters 

92 ---------- 

93 test_data : DataAcquisitionParameters : 

94 A container containing the data acquisition parameters for the 

95 controller set by the user. 

96 channel_data : List[Channel] : 

97 A list of ``Channel`` objects defining the channels in the test 

98 

99 Returns 

100 ------- 

101 None. 

102 

103 """ 

104 # Store data acquisition parameters for later 

105 self.data_acquisition_parameters = test_data 

106 self.time_per_read = test_data.samples_per_read / test_data.sample_rate 

107 

108 # End the measurement if necessary 

109 if self.dp900.status == self.DP900Status.RUNNING: 

110 self.dp900.stop() 

111 if ( 

112 self.dp900.status == self.DP900Status.STOPPED 

113 or self.dp900.status == self.DP900Status.INIT 

114 ): 

115 self.dp900.end() 

116 if self.dp900.status == self.DP900Status.DISCONNECTED: 

117 self.dp900.connect("") 

118 

119 # Get the system information 

120 system_list = self.dp900.get_system_list(online=True) 

121 

122 # Get the channel information 

123 input_bncs = self.dp900.get_input_channel_bncs() 

124 output_bncs = self.dp900.get_output_channel_bncs() 

125 

126 # Set up defaults that will be overwritten 

127 self.output_active = False 

128 self.input_bnc_indices = [] 

129 self.output_bnc_indices = [] 

130 self.input_channel_table_indices = [] 

131 self.output_channel_table_indices = [] 

132 

133 input_couplings = [] 

134 input_ranges = [] 

135 input_sensitivities = [] 

136 output_ranges = [] 

137 output_sensitivities = [] 

138 all_bncs = [] 

139 systems = [] 

140 

141 # Set up channel parameters 

142 for ct_index, channel in enumerate(channel_data): 

143 system = channel.physical_device 

144 if system not in system_list: 

145 raise ValueError( 

146 f"System {system} is not a valid system. Must be one of {system_list}" 

147 ) 

148 if system not in systems: 

149 systems.append(system) 

150 if len(systems) > 1: 

151 raise ValueError( 

152 "Multi-chassis tests are not currently supported in Rattlesnake" 

153 ) 

154 # Figure out if the channel is an output channel or just acquisition 

155 is_output = not (channel.feedback_device is None) and not ( 

156 channel.feedback_device.strip() == "" 

157 ) 

158 

159 # Get the channel index from the bnc number 

160 if is_output: 

161 self.output_active = True 

162 all_bncs.append(int(channel.physical_channel)) 

163 bnc_index = np.flatnonzero(output_bncs == int(channel.physical_channel)) 

164 if len(bnc_index) > 1: 

165 raise ValueError( 

166 f"More than one matching channel for BNC {channel.physical_channel} " 

167 "(how did this happen?)" 

168 ) 

169 if len(bnc_index) < 1: 

170 raise ValueError( 

171 f"BNC {channel.physical_channel} was not found in the list of output " 

172 f"BNCs {output_bncs}. Please run DP900Config to correctly set input " 

173 f"and output channels." 

174 ) 

175 bnc_index = bnc_index[0] 

176 self.output_bnc_indices.append(bnc_index) 

177 self.output_channel_table_indices.append(ct_index) 

178 output_ranges.append(float(channel.maximum_value)) 

179 output_sensitivities.append(1) 

180 else: 

181 all_bncs.append(int(channel.physical_channel)) 

182 bnc_index = np.flatnonzero(input_bncs == int(channel.physical_channel)) 

183 if len(bnc_index) > 1: 

184 raise ValueError( 

185 f"More than one matching channel for BNC {channel.physical_channel} " 

186 f"(how did this happen?)" 

187 ) 

188 if len(bnc_index) < 1: 

189 raise ValueError( 

190 f"BNC {channel.physical_channel} was not found in the list of input " 

191 f"BNCs {input_bncs}. Please run DP900Config to correctly set input " 

192 f"and output channels." 

193 ) 

194 bnc_index = bnc_index[0] 

195 self.input_bnc_indices.append(bnc_index) 

196 self.input_channel_table_indices.append(ct_index) 

197 input_ranges.append(float(channel.maximum_value)) 

198 input_sensitivities.append(float(channel.sensitivity)) 

199 # Set the values in the arrays appropriately given the channel index 

200 if channel.coupling.lower() in ["ac differential", "ac diff", "ac"]: 

201 input_couplings.append(self.DP900Coupling.AC_DIFFERENTIAL) 

202 elif channel.coupling.lower() in ["dc differential", "dc diff", "dc"]: 

203 input_couplings.append(self.DP900Coupling.DC_DIFFERENTIAL) 

204 elif channel.coupling.lower() in [ 

205 "ac single ended", 

206 "ac single-ended", 

207 "ac single", 

208 ]: 

209 input_couplings.append(self.DP900Coupling.AC_SINGLE_ENDED) 

210 elif channel.coupling.lower() in [ 

211 "dc single ended", 

212 "dc single-ended", 

213 "dc single", 

214 ]: 

215 input_couplings.append(self.DP900Coupling.DC_SINGLE_ENDED) 

216 elif channel.coupling.lower() in ["iepe", "icp", "ac icp", "ccld"]: 

217 input_couplings.append(self.DP900Coupling.AC_COUPLED_IEPE) 

218 

219 self.dp900.set_system_list(systems) 

220 

221 # Set the sample rate 

222 self.dp900.set_sample_rate(test_data.sample_rate) 

223 

224 # Set the buffer size 

225 self.dp900.set_buffer_size(self.buffer_size) 

226 # print('Buffer Size: {:}'.format(self.buffer_size)) 

227 

228 # Now we need to re-order the items to pass to the parameter setup 

229 # functions 

230 input_sorting = np.argsort(self.input_bnc_indices) 

231 input_channels = np.array(self.input_bnc_indices)[input_sorting] + 1 

232 input_couplings = np.array(input_couplings)[input_sorting] 

233 input_ranges = np.array(input_ranges)[input_sorting] 

234 input_sensitivities = np.array(input_sensitivities)[input_sorting] 

235 self.dp900.setup_input_parameters( 

236 input_couplings, input_channels, input_sensitivities, input_ranges 

237 ) 

238 

239 if self.output_active: 

240 self.output_sorting = np.argsort(self.output_bnc_indices) 

241 output_channels = np.array(self.output_bnc_indices)[self.output_sorting] + 1 

242 output_ranges = np.array(output_ranges)[self.output_sorting] 

243 output_sensitivities = np.array(output_sensitivities)[self.output_sorting] 

244 

245 # Now send the data to the dp900 device 

246 self.dp900.setup_output_parameters(output_sensitivities, output_ranges, output_channels) 

247 

248 # Since the outputs are at the end, we want to adjust the sorting to 

249 # put the outputs at the end 

250 # all_bncs = np.array(all_bncs) 

251 # all_bncs[self.output_channel_table_indices] += 100000000 

252 

253 # We should then be able to use this channel sorting to unsort the 

254 # read data 

255 self.channel_sorting = np.argsort(all_bncs) 

256 

257 self.dp900.set_save_recording(False) 

258 

259 def start(self): 

260 """Method to start acquiring data from the hardware""" 

261 self.dp900.init() 

262 self.dp900.start() 

263 

264 def read(self) -> np.ndarray: 

265 """Method to read a frame of data from the hardware that returns 

266 an appropriately sized np.ndarray""" 

267 while ( 

268 self.dp900.get_available_input_data_samples() 

269 < self.data_acquisition_parameters.samples_per_read 

270 ): 

271 # Check if we need to output anything 

272 if self.output_active: 

273 self.get_and_write_output_data() 

274 # Pause for a bit to allow more samples to accumulate 

275 time.sleep(self.time_per_read / SLEEP_FACTOR) 

276 # Read the data now that we have enough samples 

277 read_data = self.dp900.read_input_data(self.data_acquisition_parameters.samples_per_read) 

278 # Now we need to sort the data correctly to give it back to the channel table 

279 read_data[self.channel_sorting] = read_data.copy() 

280 return read_data 

281 

282 def read_remaining(self) -> np.ndarray: 

283 """Method to read the rest of the data on the acquisition from the hardware 

284 that returns an appropriately sized np.ndarray""" 

285 # Check if we need to output anything 

286 if self.output_active: 

287 self.get_and_write_output_data() 

288 # Check how many samples are available 

289 samples_available = 0 

290 # Wait until some arrive 

291 while samples_available == 0: 

292 samples_available = self.dp900.get_available_input_data_samples() 

293 # Pause for a bit to allow more samples to accumulate 

294 time.sleep(self.time_per_read / SLEEP_FACTOR) 

295 # Read that many data samples and put it to the "read_data" array 

296 # Make sure we rearrange the channels correctly per the rattlesnake 

297 # channel table using self.input_channel_order 

298 read_data = self.dp900.read_input_data(samples_available) 

299 read_data[self.channel_sorting] = read_data.copy() 

300 return read_data 

301 

302 def stop(self): 

303 """Method to stop the acquisition""" 

304 self.dp900.stop() 

305 self.dp900.end() 

306 flush_queue(self.output_data_queue) 

307 

308 def close(self): 

309 """Method to close down the hardware""" 

310 self.dp900.disconnect() 

311 flush_queue(self.output_data_queue) 

312 

313 def get_acquisition_delay(self) -> int: 

314 """Get the number of samples between output and acquisition 

315 

316 This function is designed to handle buffering done in the output 

317 hardware, ensuring that all data written to the output is read by the 

318 acquisition. If a output hardware has a buffer, there may be a non- 

319 negligable delay between when output is written to the device and 

320 actually played out from the device.""" 

321 return BUFFER_SIZE_FACTOR * self.data_acquisition_parameters.samples_per_write 

322 

323 def get_and_write_output_data(self, block: bool = False): 

324 """ 

325 Checks to see if there is any data on the output queue that needs to be 

326 written to the hardware. 

327 

328 Parameters 

329 ---------- 

330 block : bool, optional 

331 If True, this function will wait until the data appears with a timeout 

332 of 10 seconds. Otherwise it will simply return if there is no 

333 data available. The default is False. 

334 

335 Raises 

336 ------ 

337 RuntimeError 

338 Raised if the timeout occurs while waiting for data while blocking 

339 

340 Returns 

341 ------- 

342 None. 

343 

344 """ 

345 samples_on_buffer = self.dp900.get_total_output_samples_on_buffer() 

346 write_threshold = 3 * self.data_acquisition_parameters.samples_per_write 

347 # print('{:} Samples on Output Buffer, (<{:} to output more)'.format( 

348 # samples_on_buffer,write_threshold)) 

349 # TODO: Uncomment this 

350 if not block and samples_on_buffer >= write_threshold: 

351 # print('Too much data on buffer, not putting new data') 

352 return 

353 try: 

354 data = self.output_data_queue.get(block, timeout=10) 

355 # print('Got New Data from queue') 

356 except mp.queues.Empty as e: 

357 # print('Did not get new data from queue') 

358 if block: 

359 raise RuntimeError( 

360 "Did not receive output in a reasonable amount of time, check output " 

361 "process and output hardware for issues" 

362 ) from e 

363 # Otherwise just return because there's no data available 

364 return 

365 # If we did get output, we need to put it into a numpy array that we can 

366 # send to the daq 

367 this_write_time = time.time() 

368 outputs = data[self.output_sorting] 

369 # if self.last_write_time is not None: 

370 # print('Time since last write: {:}'.format(this_write_time - self.last_write_time)) 

371 self.last_write_time = this_write_time 

372 # Send the outputs to the daq 

373 self.dp900.write_output_data(outputs) 

374 return 

375 

376 

377class DataPhysicsDP900Output(HardwareOutput): 

378 """Abstract class defining the interface between the controller and output 

379 

380 This class defines the interfaces between the controller and the 

381 output or source portion of the hardware. It is run by the Output 

382 process, and must define how to get write data to the hardware from the 

383 control system""" 

384 

385 def __init__(self, queue: mp.queues.Queue): 

386 """ 

387 Initializes the hardware by simply storing the data passing queue 

388 

389 Parameters 

390 ---------- 

391 queue : mp.queues.Queue 

392 Queue used to pass data from output to acquisition 

393 

394 Returns 

395 ------- 

396 None. 

397 

398 """ 

399 self.queue = queue 

400 

401 def set_up_data_output_parameters_and_channels( 

402 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

403 ): 

404 """ 

405 Initialize the hardware and set up sources and sampling properties 

406 

407 The function must create channels on the hardware corresponding to 

408 the sources in the test. It must also set the sampling rates. 

409 

410 Parameters 

411 ---------- 

412 test_data : DataAcquisitionParameters : 

413 A container containing the data acquisition parameters for the 

414 controller set by the user. 

415 channel_data : List[Channel] : 

416 A list of ``Channel`` objects defining the channels in the test 

417 

418 Returns 

419 ------- 

420 None. 

421 

422 """ 

423 

424 def start(self): 

425 """Method to start outputting data to the hardware""" 

426 # TODO: Remove this 

427 # self.last_check_time = time.time() 

428 

429 def write(self, data): 

430 """Method to write a np.ndarray with a frame of data to the hardware""" 

431 self.queue.put(data) 

432 

433 def stop(self): 

434 """Method to stop the output""" 

435 flush_queue(self.queue) 

436 

437 def close(self): 

438 """Method to close down the hardware""" 

439 flush_queue(self.queue) 

440 

441 def ready_for_new_output(self) -> bool: 

442 """Method that returns true if the hardware should accept a new signal 

443 

444 Returns ``True`` if the data-passing queue is empty.""" 

445 return self.queue.empty()