Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / data_physics_interface.py: 18%

235 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Hardware definition that allows for the Data Physics Quattro Device to be run 

4with Rattlesnake. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24 

25import ctypes 

26from enum import Enum 

27 

28import numpy as np 

29from numpy.ctypeslib import ndpointer 

30 

31DEBUG = False 

32 

33if DEBUG: 

34 __log_file__ = "DataPhysics_Log.txt" 

35 

36 

37class QuattroStatus(Enum): 

38 """Valid Quattro statuses""" 

39 

40 DISCONNECTED = -1 

41 IDLE = 0 

42 INIT = 1 

43 RUNNING = 2 

44 STOPPED = 3 

45 

46 

47class QuattroCoupling(Enum): 

48 """Valid Quattro Couplings""" 

49 

50 AC_DIFFERENTIAL = 0 

51 DC_DIFFERENTIAL = 1 

52 AC_SINGLE_ENDED = 2 

53 DC_SINGLE_ENDED = 3 

54 AC_COUPLED_IEPE = 4 

55 

56 

57class DPQuattro: 

58 """An interface to the data physics C API for the quattro hardware""" 

59 

60 def __init__(self, library_path: str): 

61 """ 

62 Connects to the library 

63 

64 Parameters 

65 ---------- 

66 library_path : str 

67 Path to the DpQuattro.dll file that is used to run the Quattro device 

68 

69 Returns 

70 ------- 

71 None. 

72 

73 """ 

74 if DEBUG: 

75 self.log_file = open(__log_file__, "w", encoding="utf-8") 

76 self.input_channel_parameters = None 

77 self.output_channel_parameters = None 

78 self._api = ctypes.WinDLL(library_path) 

79 self._valid_sample_rates = np.array( 

80 [ 

81 10.24, 

82 12.5, 

83 12.8, 

84 13.1072, 

85 16, 

86 16.384, 

87 20, 

88 20.48, 

89 25, 

90 25.6, 

91 32, 

92 32.768, 

93 40, 

94 40.96, 

95 50, 

96 51.2, 

97 64, 

98 65.536, 

99 80, 

100 81.92, 

101 100, 

102 102.4, 

103 128, 

104 160, 

105 163.84, 

106 200, 

107 204.8, 

108 256, 

109 320, 

110 327.68, 

111 400, 

112 409.6, 

113 512, 

114 640, 

115 800, 

116 819.2, 

117 1024, 

118 1280, 

119 1600, 

120 1638.4, 

121 2048, 

122 2560, 

123 3200, 

124 4096, 

125 5120, 

126 6400, 

127 8192, 

128 10240, 

129 12800, 

130 20480, 

131 25600, 

132 40960, 

133 51200, 

134 102400, 

135 204800, 

136 ] 

137 ) 

138 self._valid_input_ranges = np.array([0.1, 1.0, 10.0, 20.0]) 

139 self._valid_output_ranges = np.array([2.0, 10.0]) 

140 self._num_inputs = 0 

141 self._num_outputs = 0 

142 

143 # Set up prototypes for the various function calls 

144 # Comments are from the quattro API header file 

145 # DPCOMM_API int IsHwConnected(); 

146 self._api.IsHwConnected.restype = ctypes.c_int 

147 # DPCOMM_API int Connect(); 

148 self._api.Connect.restype = ctypes.c_int 

149 # DPCOMM_API int Disconnect(); 

150 self._api.Disconnect.restype = ctypes.c_int 

151 # DPCOMM_API int SetSampleRate(double sampleRate); 

152 self._api.SetSampleRate.argtypes = (ctypes.c_double,) 

153 # DPCOMM_API int IsLicensed(); 

154 self._api.IsLicensed.restype = ctypes.c_int 

155 # DPCOMM_API int Init(); 

156 self._api.Init.restype = ctypes.c_int 

157 # DPCOMM_API int SetInpParams( 

158 # int* coupling, float* sensitivity, float* range, int numInps); 

159 self._api.SetInpParams.argtypes = ( 

160 ndpointer(ctypes.c_int), 

161 ndpointer(ctypes.c_float), 

162 ndpointer(ctypes.c_float), 

163 ctypes.c_int, 

164 ) 

165 # DPCOMM_API int SetOutParams(float* sensitivity, float* range, int numOuts); 

166 self._api.SetOutParams.argtypes = ( 

167 ndpointer(ctypes.c_float), 

168 ndpointer(ctypes.c_float), 

169 ctypes.c_int, 

170 ) 

171 # DPCOMM_API int SetTacParams( 

172 # int* coupling, float* holdOffTime, float* hysteresis, float* preScaler, float* PPR, 

173 # float* smoothing, float* speedRatio, float* trigLevel, int* trigSlope, int numTacs); 

174 # DPCOMM_API int Start(); 

175 self._api.Start.restype = ctypes.c_int 

176 # DPCOMM_API int Stop(); 

177 self._api.Stop.restype = ctypes.c_int 

178 # DPCOMM_API int End(); 

179 self._api.End.restype = ctypes.c_int 

180 # DPCOMM_API int SerialNumber(); 

181 self._api.SerialNumber.restype = ctypes.c_int 

182 # DPCOMM_API int GetData(float* outputBuf, int dataType, int length); 

183 self._api.GetData.argtypes = ( 

184 ndpointer(ctypes.c_float), 

185 ctypes.c_int, 

186 ctypes.c_int, 

187 ) 

188 self._api.GetData.restype = ctypes.c_int 

189 # DPCOMM_API int GetAvailableDataLength(); 

190 self._api.GetAvailableDataLength.restype = ctypes.c_int 

191 # # DPCOMM_API int GetAvailableOutData(); 

192 # self._api.GetAvailableOutData.restype = ctypes.c_int 

193 # DPCOMM_API int GetTotalSamplesInOutputBuffer(); 

194 self._api.GetTotalSamplesInOutputBuffer.restype = ctypes.c_int 

195 # DPCOMM_API int PutOutData(float* outputBuf, int length); 

196 self._api.PutOutData.argtypes = (ndpointer(ctypes.c_float), ctypes.c_int) 

197 self._api.PutOutData.restype = ctypes.c_int 

198 # DPCOMM_API char* GetErrorList(); 

199 self._api.GetErrorList.restype = ctypes.c_char_p 

200 # DPCOMM_API int SetCBufSize(int buffSz); 

201 self._api.SetCBufSize.argtypes = (ctypes.c_int,) 

202 # DPCOMM_API int GetCBufSize(); 

203 self._api.GetCBufSize.restype = ctypes.c_int 

204 

205 if self.is_hardware_connected(): 

206 self.status = QuattroStatus.IDLE 

207 else: 

208 self.status = QuattroStatus.DISCONNECTED 

209 

210 def connect(self): 

211 """Connects to the hardware""" 

212 if not self.is_hardware_connected(): 

213 if DEBUG: 

214 self.log_file.write("Calling Connect\n") 

215 success = self._api.Connect() 

216 else: 

217 raise RuntimeError("Hardware is already connected") 

218 if not success: 

219 self.raise_error() 

220 else: 

221 self.status = QuattroStatus.IDLE 

222 

223 def disconnect(self): 

224 """Disconnects from the hardware""" 

225 if self.is_hardware_connected(): 

226 if DEBUG: 

227 self.log_file.write("Calling Disconnect\n") 

228 success = self._api.Disconnect() 

229 else: 

230 raise RuntimeError("Hardware is not connected") 

231 if not success: 

232 self.raise_error() 

233 else: 

234 self.status = QuattroStatus.DISCONNECTED 

235 

236 def is_hardware_connected(self): 

237 """Check if the hardware is connected or not""" 

238 if DEBUG: 

239 self.log_file.write("Calling IsHwConnected\n") 

240 return bool(self._api.IsHwConnected()) 

241 

242 def get_raw_error_list(self): 

243 """Gets the raw bytes of the error list from the hardware""" 

244 if DEBUG: 

245 self.log_file.write("Calling GetErrorList\n") 

246 return self._api.GetErrorList() 

247 

248 def get_error_list(self): 

249 """Gets the decoded error list from the hardware""" 

250 if DEBUG: 

251 self.log_file.write("Calling GetErrorList\n") 

252 data = self._api.GetErrorList() 

253 return data.decode() 

254 

255 def set_sample_rate(self, sample_rate): 

256 """Sets the sample rate of the hardware 

257 

258 Parameters 

259 ---------- 

260 sample_rate : float 

261 The desired sample rate of the data acquisiiton system 

262 

263 Raises 

264 ------ 

265 ValueError 

266 If the sample rate is not valid 

267 """ 

268 close_rates = np.isclose(self._valid_sample_rates, sample_rate) 

269 close_sample_rates = self._valid_sample_rates[close_rates] 

270 if len(close_sample_rates) == 0: 

271 raise ValueError( 

272 f"Sample Rate {sample_rate} is not valid. Valid sample rates are " 

273 f"{', '.join([f'{v:0.2f}' for v in self._valid_sample_rates])}" 

274 ) 

275 elif len(close_sample_rates) > 1: 

276 raise ValueError( 

277 f"Multiple Sample Rates are close to the specified rate ({sample_rate}, " 

278 f"{close_sample_rates}). This shouldn't happen!" 

279 ) 

280 if DEBUG: 

281 self.log_file.write("Calling SetSampleRate\n") 

282 success = self._api.SetSampleRate(ctypes.c_double(close_sample_rates[0])) 

283 if not success: 

284 self.raise_error() 

285 

286 def is_licensed(self): 

287 """Checks the licensing of the hardware 

288 

289 Returns 

290 ------- 

291 bool 

292 Returns True if the hardware is licensed 

293 """ 

294 if DEBUG: 

295 self.log_file.write("Calling IsLicensed\n") 

296 return bool(self._api.IsLicensed()) 

297 

298 def initialize(self): 

299 """Initializes the data acquisition system 

300 

301 Raises 

302 ------ 

303 RuntimeError 

304 if the hardware is not currently in the idle state 

305 """ 

306 if self.status == QuattroStatus.IDLE: 

307 if DEBUG: 

308 self.log_file.write("Calling Init\n") 

309 success = self._api.Init() 

310 if not success: 

311 self.raise_error() 

312 else: 

313 self.status = QuattroStatus.INIT 

314 else: 

315 raise RuntimeError( 

316 f"Hardware status must be IDLE to initialize. " 

317 f"Current status is {self.status.name}." 

318 ) 

319 

320 def setup_input_parameters(self, coupling_array, sensitivity_array, range_array): 

321 """Sets up the acquisition channels for the data acquisition system 

322 

323 Parameters 

324 ---------- 

325 coupling_array : np.ndarray 

326 An array of coupling values for the data acquisition system 

327 sensitivity_array : np.ndarray 

328 An array of sensitivity values for the data acquisition system 

329 range_array : np.ndarray 

330 An array of ranges for the data acquisition system 

331 

332 Raises 

333 ------ 

334 ValueError 

335 if any invalid values are passed or the arrays are not the same size 

336 """ 

337 # Set up the channel arrays 

338 if len(coupling_array) != len(sensitivity_array): 

339 raise ValueError("Coupling array must have same size as Sensitivity Array") 

340 if len(range_array) != len(sensitivity_array): 

341 raise ValueError("Range array must have same size as Sensitivity Array") 

342 self._num_inputs = len(coupling_array) 

343 coupling_array = np.array( 

344 [int(coupling.value) for coupling in coupling_array], dtype=np.int32 

345 ) 

346 sensitivity_array = np.array([float(val) for val in sensitivity_array], dtype=np.float32) 

347 validated_range_array = [] 

348 for rng in range_array: 

349 close_ranges = self._valid_input_ranges[np.isclose(self._valid_input_ranges, rng)] 

350 if len(close_ranges) == 0: 

351 raise ValueError( 

352 f"Range {rng} is not valid. Valid sample rates are " 

353 f"{', '.join([f'{v:0.1f}' for v in self._valid_input_ranges])}" 

354 ) 

355 elif len(close_ranges) > 1: 

356 raise ValueError( 

357 f"Multiple Ranges are close to the specified rate ({rng}, {close_ranges}). " 

358 f"This shouldn't happen!" 

359 ) 

360 validated_range_array.append(close_ranges[0]) 

361 validated_range_array = np.array(validated_range_array, dtype=np.float32) 

362 # Call the API function 

363 if DEBUG: 

364 self.log_file.write("Calling SetInpParams\n") 

365 success = self._api.SetInpParams( 

366 coupling_array, 

367 sensitivity_array, 

368 validated_range_array, 

369 ctypes.c_int(self._num_inputs), 

370 ) 

371 if not success: 

372 self.raise_error() 

373 

374 def setup_output_parameters(self, sensitivity_array, range_array): 

375 """Sets up the drive channels on the data acquisition system 

376 

377 Parameters 

378 ---------- 

379 sensitivity_array : np.ndarray 

380 An array of sensitivities to apply to the output channels 

381 range_array : np.ndarray 

382 An array of ranges to use for the output channels 

383 

384 Raises 

385 ------ 

386 ValueError 

387 if invalid values are passed or arrays are not the same size 

388 """ 

389 if len(range_array) != len(sensitivity_array): 

390 raise ValueError("Range array must have same size as Sensitivity Array") 

391 self._num_outputs = len(sensitivity_array) 

392 sensitivity_array = np.array([float(val) for val in sensitivity_array], dtype=np.float32) 

393 validated_range_array = [] 

394 for rng in range_array: 

395 close_ranges = self._valid_output_ranges[np.isclose(self._valid_output_ranges, rng)] 

396 if len(close_ranges) == 0: 

397 raise ValueError( 

398 f"Range {rng} is not valid. Valid sample rates are " 

399 f"{', '.join([f'{v:0.1f}' for v in self._valid_output_ranges])}" 

400 ) 

401 elif len(close_ranges) > 1: 

402 raise ValueError( 

403 f"Multiple Ranges are close to the specified rate ({rng}, {close_ranges}). " 

404 f"This shouldn't happen!" 

405 ) 

406 validated_range_array.append(close_ranges[0]) 

407 validated_range_array = np.array(validated_range_array, dtype=np.float32) 

408 # Call the API function 

409 if DEBUG: 

410 self.log_file.write("Calling SetOutParams\n") 

411 success = self._api.SetOutParams( 

412 sensitivity_array, validated_range_array, ctypes.c_int(self._num_outputs) 

413 ) 

414 if not success: 

415 self.raise_error() 

416 

417 def raise_error(self): 

418 """Raises an error any writes the error list to the log file""" 

419 if DEBUG: 

420 self.log_file.write(f"DP Error: {self.get_error_list()}\n") 

421 # raise RuntimeError(self.get_error_list()) 

422 

423 def start(self): 

424 """Starts the data acquisiiton system 

425 

426 Raises 

427 ------ 

428 RuntimeError 

429 if the status is not either initialized or stopped 

430 """ 

431 if self.status in [QuattroStatus.INIT, QuattroStatus.STOPPED]: 

432 if DEBUG: 

433 self.log_file.write("Calling Start\n") 

434 success = self._api.Start() 

435 if not success: 

436 self.raise_error() 

437 else: 

438 self.status = QuattroStatus.RUNNING 

439 else: 

440 raise RuntimeError( 

441 f"Current hardware status is {self.status.name}. Hardware must be " 

442 f"initialized or stopped prior to starting a measurement" 

443 ) 

444 

445 def stop(self): 

446 """Stops the data acquisition system 

447 

448 Raises 

449 ------ 

450 RuntimeError 

451 If the data acquisition system is not currently in the running state 

452 """ 

453 if self.status == QuattroStatus.RUNNING: 

454 if DEBUG: 

455 self.log_file.write("Calling Stop\n") 

456 success = self._api.Stop() 

457 if not success: 

458 self.raise_error() 

459 else: 

460 self.status = QuattroStatus.STOPPED 

461 else: 

462 raise RuntimeError( 

463 f"Current hardware status is {self.status.name}. Hardware must be running prior " 

464 f"to stopping a measurement" 

465 ) 

466 

467 def end(self): 

468 """Shuts down the data acquisition system""" 

469 if self.status in [QuattroStatus.STOPPED, QuattroStatus.INIT]: 

470 if DEBUG: 

471 self.log_file.write("Calling End\n") 

472 success = self._api.End() 

473 if not success: 

474 self.raise_error() 

475 else: 

476 self.status = QuattroStatus.IDLE 

477 

478 @property 

479 def serial_number(self): 

480 """Gets the serial number of the hardware""" 

481 if DEBUG: 

482 self.log_file.write("Warning, this gives out the wrong value!\n") 

483 if DEBUG: 

484 self.log_file.write("Calling SerialNumber\n") 

485 return self._api.SerialNumber() 

486 

487 def set_buffer_size(self, buffer_size): 

488 """Sets the buffer size of the hardware""" 

489 if DEBUG: 

490 self.log_file.write("Calling SetCBufSize\n") 

491 success = self._api.SetCBufSize(ctypes.c_int(buffer_size)) 

492 if not success: 

493 self.raise_error() 

494 

495 def get_buffer_size(self): 

496 """Gets the buffer size of the hardware""" 

497 if DEBUG: 

498 self.log_file.write("Calling GetCBufSize\n") 

499 return self._api.GetCBufSize() 

500 

501 def get_available_input_data_samples(self): 

502 """Gets the number of samples available on the data acquisiiton system""" 

503 if DEBUG: 

504 self.log_file.write("Calling GetAvailableDataLength\n") 

505 samples = self._api.GetAvailableDataLength() 

506 if DEBUG: 

507 self.log_file.write(f"{samples} Samples Available\n") 

508 return samples 

509 

510 # def get_output_samples_on_buffer(self): 

511 # self.log_file.write('Calling GetAvailableOutData\n') 

512 # samples = self._api.GetAvailableOutData() 

513 # self.log_file.write('{:} Output Samples Available\n'.format(samples)) 

514 # return samples 

515 

516 def get_total_output_samples_on_buffer(self): 

517 """Gets the total number of samples on the output buffer""" 

518 if DEBUG: 

519 self.log_file.write("Calling GetTotalSamplesInOutputBuffer\n") 

520 samples = self._api.GetTotalSamplesInOutputBuffer() 

521 if DEBUG: 

522 self.log_file.write(f"{samples} Output Samples Available\n") 

523 return samples 

524 

525 def read_input_data(self, num_samples, newest_data=False): 

526 """Reads data from the acquisition channels 

527 

528 Parameters 

529 ---------- 

530 num_samples : int 

531 The number of samples to read 

532 newest_data : bool, optional 

533 Determines whether to read the newest acquired data (True) or the oldest (False), by 

534 default False 

535 

536 Returns 

537 ------- 

538 np.ndarray 

539 The read data in a num_channels x num_samples array 

540 """ 

541 read_array = np.zeros(self._num_inputs * num_samples, dtype=np.float32) 

542 read_type = ctypes.c_int(0 if newest_data else 1) 

543 if DEBUG: 

544 self.log_file.write(f"Calling GetData with length {ctypes.c_int(num_samples)}\n") 

545 success = self._api.GetData(read_array, read_type, ctypes.c_int(num_samples)) 

546 if not success: 

547 self.raise_error() 

548 return read_array.reshape((self._num_inputs, num_samples)) 

549 

550 def write_output_data(self, output_data): 

551 """Puts output data to the hardware output buffer 

552 

553 Parameters 

554 ---------- 

555 output_data : np.ndarray 

556 The signals to be written to the hardware in a num_outputs x num_samples array 

557 

558 Raises 

559 ------ 

560 ValueError 

561 If the output data is not shaped correctly 

562 """ 

563 if output_data.ndim != 2: 

564 raise ValueError("`output_data` should have 2 dimensions (num_outputs x num_samples)") 

565 if output_data.shape[0] != self._num_outputs: 

566 raise ValueError( 

567 f"`output_data` must have number of rows equal to the number of " 

568 f"outputs ({self._num_outputs})" 

569 ) 

570 num_samples = output_data.shape[-1] 

571 this_output_data = np.zeros(np.prod(output_data.shape), dtype=np.float32) 

572 this_output_data[:] = output_data.flatten().astype(np.float32) 

573 # self.log_file.write(this_output_data.shape, num_samples, self._num_outputs) 

574 if DEBUG: 

575 self.log_file.write(f"Calling PutOutData with length {ctypes.c_int(num_samples)}\n") 

576 _ = self._api.PutOutData(this_output_data, ctypes.c_int(num_samples)) 

577 # if not success: 

578 # self.raise_error() 

579 

580 def __del__(self): 

581 """Closes the hardware automatically when the interface is deleted or garbage collected""" 

582 if DEBUG: 

583 self.log_file.close()