Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / data_physics_dp900_interface.py: 0%

368 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Hardware definition that allows for the Data Physics DP900 series hardware to 

4be run with Rattlesnake. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24 

25import ctypes 

26import datetime 

27from ctypes import c_bool, c_char_p, c_double, c_float, c_int 

28from enum import Enum 

29 

30import numpy as np 

31from numpy.ctypeslib import ndpointer 

32 

33DEBUG = False 

34 

35if DEBUG: 

36 __log_file__ = "DataPhysics_Log.txt" 

37 _PRINT_MESSAGE = True 

38 _WRITE_MESSAGE = True 

39 if _WRITE_MESSAGE: 

40 log_file = open(__log_file__, "w", encoding="utf-8") 

41 

42 def debug_fn(message): 

43 """Reports the message via print or log file 

44 

45 Parameters 

46 ---------- 

47 message : str 

48 The message to be reported 

49 """ 

50 now = datetime.datetime.now() 

51 if _PRINT_MESSAGE: 

52 print(f"{now} -- {message}") 

53 if _WRITE_MESSAGE: 

54 log_file.write(f"{now} -- {message}\n") 

55 

56 

57class DP900Status(Enum): 

58 """Valid DP900 statuses""" 

59 

60 DISCONNECTED = -1 

61 IDLE = 0 

62 INIT = 1 

63 RUNNING = 2 

64 STOPPED = 3 

65 

66 

67class DP900Coupling(Enum): 

68 """Valid DP900 Couplings""" 

69 

70 AC_DIFFERENTIAL = 0 

71 DC_DIFFERENTIAL = 1 

72 AC_SINGLE_ENDED = 2 

73 DC_SINGLE_ENDED = 3 

74 AC_COUPLED_IEPE = 4 

75 

76 

77class DP900: 

78 """An interface to the data physics C API for the DP900 hardware""" 

79 

80 def __init__(self, library_path: str): 

81 """ 

82 Connects to the library 

83 

84 Parameters 

85 ---------- 

86 library_path : str 

87 Path to the Dp900Matlab.dll file that is used to run the DP900 device 

88 

89 Returns 

90 ------- 

91 None. 

92 

93 """ 

94 self._api = ctypes.WinDLL(library_path) 

95 self._valid_input_ranges = np.array([0.1, 0.3, 1.0, 3.0, 10.0, 30.0]) 

96 self._valid_output_ranges = np.array([2.0, 10.0]) 

97 self._num_inputs = 0 

98 self._num_outputs = 0 

99 

100 # Set up prototypes for the various function calls 

101 # Define the argument and return types for each function 

102 

103 # DP900_API int IsHwConnected(); 

104 # self._api.IsHwConnected.argtypes = [] 

105 # self._api.IsHwConnected.restype = c_int 

106 

107 # DP900_API int Connect(char* testName); 

108 self._api.Connect.argtypes = [c_char_p] 

109 self._api.Connect.restype = c_int 

110 

111 # DP900_API int Disconnect(); 

112 self._api.Disconnect.argtypes = [] 

113 self._api.Disconnect.restype = c_int 

114 

115 # DP900_API int SetSampleRate(double sampleRate); 

116 self._api.SetSampleRate.argtypes = [c_double] 

117 self._api.SetSampleRate.restype = c_int 

118 

119 # DP900_API int Init(); 

120 self._api.Init.argtypes = [] 

121 self._api.Init.restype = c_int 

122 

123 # DP900_API int SetInpParams( 

124 # int* coupling, int* chNum, float* sensitivity, float* range, int numInps); 

125 self._api.SetInpParams.argtypes = [ 

126 ndpointer(c_int), 

127 ndpointer(c_int), 

128 ndpointer(c_float), 

129 ndpointer(c_float), 

130 c_int, 

131 ] 

132 self._api.SetInpParams.restype = c_int 

133 

134 # DP900_API int SetOutParams( 

135 # float* sensitivity, float* range, int* chNums,int numOuts); 

136 self._api.SetOutParams.argtypes = [ 

137 ndpointer(c_float), 

138 ndpointer(c_float), 

139 ndpointer(c_int), 

140 c_int, 

141 ] 

142 self._api.SetOutParams.restype = c_int 

143 

144 # DP900_API int Start(); 

145 self._api.Start.argtypes = [] 

146 self._api.Start.restype = c_int 

147 

148 # DP900_API int Stop(); 

149 self._api.Stop.argtypes = [] 

150 self._api.Stop.restype = c_int 

151 

152 # DP900_API int End(); 

153 self._api.End.argtypes = [] 

154 self._api.End.restype = c_int 

155 

156 # DP900_API int EmergencyEnd(); 

157 self._api.EmergencyEnd.argtypes = [] 

158 self._api.EmergencyEnd.restype = c_int 

159 

160 # DP900_API char* GetSystemList(bool online); 

161 self._api.GetSystemList.argtypes = [c_bool] 

162 self._api.GetSystemList.restype = c_char_p 

163 

164 # DP900_API char* GetTestList(); 

165 self._api.GetTestList.argtypes = [] 

166 self._api.GetTestList.restype = c_char_p 

167 

168 # DP900_API int SetSystemList(char* sysList); 

169 self._api.SetSystemList.argtypes = [c_char_p] 

170 self._api.SetSystemList.restype = c_int 

171 

172 # DP900_API int SaveTest(char* testName); 

173 self._api.SaveTest.argtypes = [c_char_p] 

174 self._api.SaveTest.restype = c_int 

175 

176 # DP900_API int DeleteTest(char* testName); 

177 self._api.DeleteTest.argtypes = [c_char_p] 

178 self._api.DeleteTest.restype = c_int 

179 

180 # DP900_API int GetData(float* outputBuf, int dataType, int length); 

181 self._api.GetData.argtypes = [ndpointer(c_float), c_int, c_int] 

182 self._api.GetData.restype = c_int 

183 

184 # DP900_API int GetAvailableDataLength(); 

185 self._api.GetAvailableDataLength.argtypes = [] 

186 self._api.GetAvailableDataLength.restype = c_int 

187 

188 # DP900_API int GetSpaceInOutBuffer(); 

189 self._api.GetSpaceInOutBuffer.argtypes = [] 

190 self._api.GetSpaceInOutBuffer.restype = c_int 

191 

192 # DP900_API int GetTotalSamplesInOutputBuffer(); 

193 self._api.GetTotalSamplesInOutputBuffer.argtypes = [] 

194 self._api.GetTotalSamplesInOutputBuffer.restype = c_int 

195 

196 # DP900_API int PutOutData(float* outputBuf, int length); 

197 self._api.PutOutData.argtypes = [ndpointer(c_float), c_int] 

198 self._api.PutOutData.restype = c_int 

199 

200 # DP900_API char* GetErrorList(); 

201 self._api.GetErrorList.argtypes = [] 

202 self._api.GetErrorList.restype = c_char_p 

203 

204 # DP900_API int SetCBufSize(int buffSz); 

205 self._api.SetCBufSize.argtypes = [c_int] 

206 self._api.SetCBufSize.restype = c_int 

207 

208 # DP900_API int SetSaveRecording(int bEnabled); 

209 self._api.SetSaveRecording.argtypes = [c_int] 

210 self._api.SetSaveRecording.restype = c_int 

211 

212 # DP900_API int GetNumInpsAvailable(); 

213 self._api.GetNumInpsAvailable.argtypes = [] 

214 self._api.GetNumInpsAvailable.restype = c_int 

215 

216 # DP900_API int GetNumInpsSelected(); 

217 self._api.GetNumInpsSelected.argtypes = [] 

218 self._api.GetNumInpsSelected.restype = c_int 

219 

220 # DP900_API int GetNumOutsAvailable(); 

221 self._api.GetNumOutsAvailable.argtypes = [] 

222 self._api.GetNumOutsAvailable.restype = c_int 

223 

224 # DP900_API int GetNumOutsSelected(); 

225 self._api.GetNumOutsSelected.argtypes = [] 

226 self._api.GetNumOutsSelected.restype = c_int 

227 

228 # DP900_API int GetCBufSize(); 

229 self._api.GetCBufSize.argtypes = [] 

230 self._api.GetCBufSize.restype = c_int 

231 

232 # DP900_API int GetInputChannelBNCs(int* bncs); 

233 self._api.GetInputChannelBNCs.argtypes = [ndpointer(c_int)] 

234 self._api.GetInputChannelBNCs.restype = c_int 

235 

236 # DP900_API int GetOutputChannelBNCs(int* bncs); 

237 self._api.GetOutputChannelBNCs.argtypes = [ndpointer(c_int)] 

238 self._api.GetOutputChannelBNCs.restype = c_int 

239 

240 # if self.is_hardware_connected(): 

241 # self.status = DP900Status.IDLE 

242 # else: 

243 self.status = DP900Status.DISCONNECTED 

244 

245 @property 

246 def num_outputs(self): 

247 """Gets the number of output channels""" 

248 return self._num_outputs 

249 

250 @property 

251 def num_inputs(self): 

252 """Gets the number of acquisition channels""" 

253 return self._num_inputs 

254 

255 def raise_error(self): 

256 """ 

257 Collects the data physics error and raises it in Python. 

258 

259 Raises 

260 ------ 

261 RuntimeError 

262 DESCRIPTION. 

263 

264 Returns 

265 ------- 

266 None. 

267 

268 """ 

269 error = self.get_error_list() 

270 if DEBUG: 

271 debug_fn(f"DP Error: {error}") 

272 raise RuntimeError(f"DP Error: {error}") 

273 

274 # def is_hw_connected(self): 

275 # """ 

276 # Checks if the hardware is connected 

277 

278 # Returns 

279 # ------- 

280 # bool 

281 # Returns True if the hardware is already connected, otherwise returns 

282 # False. 

283 

284 # """ 

285 # if DEBUG: 

286 # debug_fn('Calling IsHwConnected\n') 

287 # return bool(self._api.IsHwConnected()) 

288 

289 def connect(self, test_name): 

290 """ 

291 Launches background processes if not running, and connects the 900 API software to them. 

292 

293 Parameters 

294 ---------- 

295 test_name : str 

296 Pointer to a null-terminated (C-style) char array representing the test name. 

297 Pass an empty string (pointer to an empty char) to start from a new test. 

298 Unless recalling a Saved test setup (with save_test()), pass an empty string. 

299 Commas are not allowed in the string. 

300 

301 """ 

302 # if not self.is_hw_connected(): 

303 if DEBUG: 

304 debug_fn(f"Calling Connect with\n test_name = {test_name}") 

305 success = self._api.Connect(test_name.encode("utf-8")) 

306 # else: 

307 # raise RuntimeError('Hardware is already connected') 

308 if not success == 1: 

309 self.raise_error() 

310 else: 

311 self.status = DP900Status.IDLE 

312 

313 def disconnect(self): 

314 """ 

315 Disconnects from the API. 

316 

317 Returns 

318 ------- 

319 int 

320 1 if disconnection to the API was successful, 0 if not successful. 

321 

322 """ 

323 # if self.is_hw_connected(): 

324 if DEBUG: 

325 debug_fn("Calling Disconnect") 

326 success = self._api.Disconnect() 

327 # else: 

328 # raise RuntimeError('Hardware is not connected') 

329 if not success == 1: 

330 self.raise_error() 

331 else: 

332 self.status = DP900Status.DISCONNECTED 

333 

334 def set_sample_rate(self, sample_rate): 

335 """ 

336 Sets the sample rate to be used for the next test. The sample rate can 

337 not be changed while a test is running; it must be set before the 

338 init() command. 

339 

340 The sample rate can be arbitrarily set to any value between 256Hz and 216kHz. 

341 

342 Parameters 

343 ---------- 

344 sample_rate : float 

345 The sample rate for the test. 

346 """ 

347 if DEBUG: 

348 debug_fn(f"Calling SetSampleRate with\n sample_rate = {sample_rate}") 

349 success = self._api.SetSampleRate(sample_rate) 

350 if not success == 1: 

351 self.raise_error() 

352 

353 def init(self): 

354 """ 

355 Initializes the test 

356 

357 Returns 

358 ------- 

359 int 

360 0 if the command was not successful; 1 if the command was 

361 successful. 

362 """ 

363 if self.status == DP900Status.IDLE: 

364 if DEBUG: 

365 debug_fn("Calling Init") 

366 success = self._api.Init() 

367 if not success == 1: 

368 self.raise_error() 

369 else: 

370 self.status = DP900Status.INIT 

371 else: 

372 raise RuntimeError( 

373 f"Hardware status must be IDLE to initialize. " 

374 f"Current status is {self.status.name}." 

375 ) 

376 

377 def setup_input_parameters(self, coupling_array, channel_array, sensitivity_array, range_array): 

378 """ 

379 Tells the API how many channels will be used; as well as the hardware 

380 settings for those parameters. 

381 

382 Parameters 

383 ---------- 

384 coupling_array : array of DP900Coupling 

385 Pointer to an integer array, of length num_inps representing the 

386 hardware coupling for each channel 

387 channel_array : array of integers 

388 Array of integers of length num_inps where the nth value represents 

389 the nth input channel number. If channels are to be skipped 

390 (eg. enabling channels 1, 2, 5, 6) the array will contain 

391 non-consecutive numbers (eg [1, 2, 5, 6] with num_inps=4) 

392 sensitivity_array : array of float 

393 Pointer to a float array, of length numInps where the nth value 

394 represents the sensitivity of the nth input channel. After voltages 

395 are read from the hardware, each channel is scaled by its 

396 sensitivity before being recorded or passed to the API. 

397 range_array : array of float 

398 Pointer to a float array, of length numInps where the nth value 

399 represents the voltage range of the nth input channel. Can be 

400 0.1, 0.3, 1.0, 3.0, 10.0, 30.0 

401 """ 

402 # Set up the channel arrays 

403 if len(coupling_array) != len(sensitivity_array): 

404 raise ValueError("Coupling array must have same size as Sensitivity Array") 

405 if len(range_array) != len(sensitivity_array): 

406 raise ValueError("Range array must have same size as Sensitivity Array") 

407 if len(channel_array) != len(sensitivity_array): 

408 raise ValueError("Channel array must have same size as Sensitivity Array") 

409 self._num_inputs = len(coupling_array) 

410 coupling_array = np.array( 

411 [int(coupling.value) for coupling in coupling_array], dtype=np.int32 

412 ) 

413 sensitivity_array = np.array([float(val) for val in sensitivity_array], dtype=np.float32) 

414 channel_array = np.array([int(channel) for channel in channel_array], dtype=np.int32) 

415 validated_range_array = [] 

416 for rng in range_array: 

417 close_ranges = self._valid_input_ranges[np.isclose(self._valid_input_ranges, rng)] 

418 if len(close_ranges) == 0: 

419 raise ValueError( 

420 f"Range {rng} is not valid. Valid sample rates are " 

421 f"{', '.join([f'{v:0.1f}' for v in self._valid_input_ranges])}" 

422 ) 

423 elif len(close_ranges) > 1: 

424 raise ValueError( 

425 f"Multiple Ranges are close to the specified rate ({rng}, {close_ranges}). " 

426 f"This shouldn't happen!" 

427 ) 

428 validated_range_array.append(close_ranges[0]) 

429 validated_range_array = np.array(validated_range_array, dtype=np.float32) 

430 # Call the API function 

431 if DEBUG: 

432 debug_fn( 

433 f"Calling SetInpParams with\n " 

434 f"coupling_array = {coupling_array.tolist()}\n " 

435 f"channel_array = {channel_array.tolist()}\n " 

436 f"sensitivity_array = {sensitivity_array.tolist()}\n " 

437 f"range_array = {validated_range_array.tolist()}\n " 

438 f"num_inputs = {self._num_inputs}" 

439 ) 

440 success = self._api.SetInpParams( 

441 coupling_array, 

442 channel_array, 

443 sensitivity_array, 

444 validated_range_array, 

445 ctypes.c_int(self._num_inputs), 

446 ) 

447 if not success == 1: 

448 self.raise_error() 

449 

450 def setup_output_parameters(self, sensitivity_array, range_array, channel_array): 

451 """ 

452 Configures the settings for the output channels to use during test. 

453 

454 This must be run before a test is Initialized (Init()). 

455 

456 Parameters 

457 ---------- 

458 sensitivity_array : array of float 

459 A floating-point array, where the Nth element representing the 

460 sensitivity (scaling) for the Nth output channel, in EU/V 

461 (EU=Engineering Unit). Each sample, when acquired, is divided by 

462 this value. A value of 1 essentially applies no scaling (outputs 

463 are voltages) 

464 range_array : array of float 

465 A floating point array containing the output range for each output 

466 channel. 

467 channel_array : array of integers 

468 Array of integers of length num_outputs where the nth value represents 

469 the nth output channel number. If channels are to be skipped 

470 (eg. enabling channels 1, 2, 5, 6) the array will contain 

471 non-consecutive numbers (eg [1, 2, 5, 6] with num_outputs=4) 

472 

473 Raises 

474 ------ 

475 ValueError 

476 If input arrays are not the same size or if ranges are not valid. 

477 

478 """ 

479 if len(range_array) != len(sensitivity_array): 

480 raise ValueError("Range array must have same size as Sensitivity Array") 

481 if len(channel_array) != len(sensitivity_array): 

482 raise ValueError("Channel number array must have same size as Sensitivity Array") 

483 self._num_outputs = len(sensitivity_array) 

484 sensitivity_array = np.array([float(val) for val in sensitivity_array], dtype=np.float32) 

485 channel_array = np.array([int(val) for val in channel_array], dtype=np.int32) 

486 validated_range_array = [] 

487 for rng in range_array: 

488 close_ranges = self._valid_output_ranges[np.isclose(self._valid_output_ranges, rng)] 

489 if len(close_ranges) == 0: 

490 raise ValueError( 

491 f"Range {rng} is not valid. Valid sample rates are " 

492 f"{', '.join([f'{v:0.1f}' for v in self._valid_output_ranges])}" 

493 ) 

494 elif len(close_ranges) > 1: 

495 raise ValueError( 

496 f"Multiple Ranges are close to the specified rate ({rng}, {close_ranges}). " 

497 f"This shouldn't happen!" 

498 ) 

499 validated_range_array.append(close_ranges[0]) 

500 validated_range_array = np.array(validated_range_array, dtype=np.float32) 

501 # Call the API function 

502 if DEBUG: 

503 debug_fn( 

504 f"Calling SetOutParams with \n " 

505 f"sensitivity_array = {sensitivity_array.tolist()}\n " 

506 f"range_array = {validated_range_array.tolist()}\n " 

507 f"channel_array = {channel_array.tolist()}\n " 

508 f"num_outputs = {self._num_outputs}" 

509 ) 

510 success = self._api.SetOutParams( 

511 sensitivity_array, 

512 validated_range_array, 

513 channel_array, 

514 ctypes.c_int(self._num_outputs), 

515 ) 

516 if not success == 1: 

517 self.raise_error() 

518 

519 def start(self): 

520 """ 

521 Starts data acquisition (and output through output channel). The test 

522 must be initialized (init()) before running start(). 

523 """ 

524 if self.status in [DP900Status.INIT, DP900Status.STOPPED]: 

525 if DEBUG: 

526 debug_fn("Calling Start") 

527 success = self._api.Start() 

528 if not success == 1: 

529 self.raise_error() 

530 else: 

531 self.status = DP900Status.RUNNING 

532 else: 

533 raise RuntimeError( 

534 f"Current hardware status is {self.status.name}. Hardware must be initialized or " 

535 f"stopped prior to starting a measurement" 

536 ) 

537 

538 def stop(self): 

539 """ 

540 Stops data acquisition (and output through output channel). The test 

541 must be started (start()) before running stop(). 

542 """ 

543 if self.status == DP900Status.RUNNING: 

544 if DEBUG: 

545 debug_fn("Calling Stop") 

546 success = self._api.Stop() 

547 if not success == 1: 

548 self.raise_error() 

549 else: 

550 self.status = DP900Status.STOPPED 

551 else: 

552 raise RuntimeError( 

553 f"Current hardware status is {self.status.name}. Hardware must be " 

554 "running prior to stopping a measurement" 

555 ) 

556 

557 def end(self): 

558 """ 

559 Ends the current test. The test must be stopped (stop()) before running 

560 Stop(). 

561 """ 

562 if self.status in [DP900Status.STOPPED, DP900Status.INIT]: 

563 if DEBUG: 

564 debug_fn("Calling End") 

565 success = self._api.End() 

566 if not success == 1: 

567 self.raise_error() 

568 else: 

569 self.status = DP900Status.IDLE 

570 

571 def emergency_end(self): 

572 """ 

573 If a test is Initialized or Started; but the client (your) code running 

574 the API crashes or goes into a bad state; the client code can be 

575 relaunched and execute this function to Stop/End a test that is in 

576 progress. 

577 

578 Stops or Ends a test that is currently running, but the API is not 

579 connected to. This is intended to be used if your code crashes or goes 

580 into a bad state while the API is running a test. 

581 """ 

582 if DEBUG: 

583 debug_fn("Calling EmergencyEnd") 

584 success = self._api.EmergencyEnd() 

585 if not success == 1: 

586 self.raise_error() 

587 else: 

588 self.status = DP900Status.IDLE 

589 

590 def get_system_list(self, online): 

591 """ 

592 Gets a list of hardware units available for selection 

593 in the test. 

594 

595 Note: this command also returns Abacus0 blocks (Data Server), which 

596 can not be selected for use in a test. 

597 

598 Parameters 

599 ---------- 

600 online : bool 

601 If True, the returned list will only contain systems that are all 

602 online (detected by the 900 API). 

603 

604 Returns 

605 ------- 

606 list 

607 A list of systems that are selectable for use in 

608 the 900 software 

609 

610 """ 

611 if DEBUG: 

612 debug_fn(f"Calling GetSystemList with\n online = {online}") 

613 return self._api.GetSystemList(online).decode("utf-8").split(",") 

614 

615 def get_test_list(self): 

616 """ 

617 Gets a comma separated list of tests available for opening with the 

618 connect() command. 

619 

620 Note: This will return a list of all tests in the Manage Tests screen 

621 of DP900. If DP900 contains tests which were not created with the API, 

622 they should not be opened with the API. 

623 

624 Returns 

625 ------- 

626 str 

627 A comma separated list of tests available for opening with the 

628 connect() command 

629 

630 """ 

631 if DEBUG: 

632 debug_fn("Calling GetTestList") 

633 return self._api.GetTestList().decode("utf-8") 

634 

635 def set_system_list(self, sys_list): 

636 """ 

637 Sets the hardware units available to be used in the test. 

638 

639 Parameters 

640 ---------- 

641 sys_list : array of strings 

642 A list of the names of systems to include in the test (eg. 

643 ["dp912-98012","dp901-94019"]). 

644 """ 

645 if isinstance(sys_list, str): 

646 sys_list = [sys_list] 

647 sys_list = ",".join(sys_list) 

648 if DEBUG: 

649 debug_fn(f"Calling SetSystemList with\n system_list = {sys_list}") 

650 success = self._api.SetSystemList(sys_list.encode("utf-8")) 

651 if not success == 1: 

652 self.raise_error() 

653 

654 def save_test(self, test_name): 

655 """ 

656 Saves the test setup for recalling with the connect() command. 

657 

658 The test will be saved in the 900 Series database, and can be recalled 

659 with the connect() call. 

660 

661 Parameters 

662 ---------- 

663 test_name : str 

664 String to use as the name for the test to be saved as. 

665 

666 """ 

667 if DEBUG: 

668 debug_fn(f"Calling SaveTest with\n test_name = {test_name}") 

669 success = self._api.SaveTest(test_name.encode("utf-8")) 

670 if not success == 1: 

671 self.raise_error() 

672 

673 def delete_test(self, test_name): 

674 """ 

675 Deletes a test setup from the archived database. 

676 

677 Parameters 

678 ---------- 

679 test_name : str 

680 String containing the name of the test to be deleted 

681 """ 

682 if DEBUG: 

683 debug_fn(f"Calling DeleteTest with\n test_name = {test_name}") 

684 success = self._api.DeleteTest(test_name.encode("utf-8")) 

685 if not success == 1: 

686 self.raise_error() 

687 

688 def read_input_data(self, num_samples, newest_data=False): 

689 """ 

690 Reads data from the data acquisition software 

691 

692 Parameters 

693 ---------- 

694 num_samples : int 

695 The number of samples to read 

696 newest_data : TYPE, optional 

697 If True, gets the newest data in the buffer, otherwise gets the 

698 oldest data in the buffer. The default is False. 

699 

700 Returns 

701 ------- 

702 np.ndarray 

703 An array of data with shape (num_inputs + num_outputs) x num_samples 

704 

705 """ 

706 read_array = np.zeros( 

707 (self._num_inputs + self._num_outputs) * num_samples, dtype=np.float32 

708 ) 

709 read_type = ctypes.c_int(0 if newest_data else 1) 

710 if DEBUG: 

711 debug_fn(f"Calling GetData with\n length = {num_samples}") 

712 _ = self._api.GetData(read_array, read_type, ctypes.c_int(num_samples)) 

713 # if not success == 1: 

714 # self.raise_error() 

715 return read_array.reshape((self._num_inputs + self._num_outputs, num_samples)) 

716 

717 def get_available_input_data_samples(self): 

718 """ 

719 Gets the number of available samples in the input channel circular 

720 buffers. 

721 

722 Returns 

723 ------- 

724 samples : int 

725 The number of samples that are available to be read out of the input 

726 channel buffer. 

727 

728 """ 

729 if DEBUG: 

730 debug_fn("Calling GetAvailableDataLength") 

731 samples = self._api.GetAvailableDataLength() 

732 if DEBUG: 

733 debug_fn(f"{samples} Samples Available") 

734 return samples 

735 

736 def get_space_in_out_buffer(self): 

737 """ 

738 Gets the amount of free space in the output buffer (how much data can 

739 be sent to the output buffer before it is full). 

740 

741 Returns 

742 ------- 

743 int 

744 The number of samples that can be sent to the output buffer 

745 before it is full 

746 

747 """ 

748 if DEBUG: 

749 debug_fn("Calling GetSpaceInOutBuffer") 

750 samples = self._api.GetSpaceInOutBuffer() 

751 if DEBUG: 

752 debug_fn(f" {samples} Samples Available in Buffer") 

753 return samples 

754 

755 def get_total_output_samples_on_buffer(self): 

756 """ 

757 Gets the number of samples in the output channel circular buffers. 

758 

759 Returns 

760 ------- 

761 samples : int 

762 The number of samples in line to be output through the output 

763 channels. 

764 

765 """ 

766 if DEBUG: 

767 debug_fn("Calling GetTotalSamplesInOutputBuffer") 

768 samples = self._api.GetTotalSamplesInOutputBuffer() 

769 if DEBUG: 

770 debug_fn(f" {samples} Output Samples Available") 

771 return samples 

772 

773 def write_output_data(self, output_data): 

774 """ 

775 The function will fill the output buffer with data specified, which will 

776 result in it eventually getting output from the system. 

777 

778 Parameters 

779 ---------- 

780 output_data : array of float 

781 2D data array with shape num_outputs x num_samples containing the 

782 data to be output from the signal generator. 

783 

784 Raises 

785 ------ 

786 ValueError 

787 DESCRIPTION. 

788 

789 Returns 

790 ------- 

791 None. 

792 

793 """ 

794 if output_data.ndim != 2: 

795 raise ValueError("`output_data` should have 2 dimensions (num_outputs x num_samples)") 

796 if output_data.shape[0] != self._num_outputs: 

797 raise ValueError( 

798 f"`output_data` must have number of rows equal to the number of " 

799 f"outputs ({self._num_outputs})" 

800 ) 

801 num_samples = output_data.shape[-1] 

802 this_output_data = np.zeros(np.prod(output_data.shape), dtype=np.float32) 

803 this_output_data[:] = output_data.flatten().astype(np.float32) 

804 # debug_fn(this_output_data.shape, num_samples, self._num_outputs) 

805 if DEBUG: 

806 debug_fn(f"Calling PutOutData with\n length {num_samples}") 

807 _ = self._api.PutOutData(this_output_data, ctypes.c_int(num_samples)) 

808 # if not success == 1: 

809 # self.raise_error() 

810 

811 def get_raw_error_list(self): 

812 """Gets the raw bytes from the error list""" 

813 if DEBUG: 

814 debug_fn("Calling GetErrorList") 

815 return self._api.GetErrorList() 

816 

817 def get_error_list(self): 

818 """Gets the decoded error list""" 

819 if DEBUG: 

820 debug_fn("Calling GetErrorList") 

821 data = self._api.GetErrorList() 

822 return data.decode() 

823 

824 def set_buffer_size(self, buff_sz): 

825 """ 

826 Sets the number of samples, per channel, in the input and output 

827 channel circular buffers. 

828 

829 Parameters 

830 ---------- 

831 buff_sz : int 

832 The desired size of the circular buffers, in samples. Must be greater 

833 than 4096. The upper limit of buff_sz is limited by available 

834 memory in the PC. 

835 """ 

836 if DEBUG: 

837 debug_fn("Calling SetCBufSize") 

838 success = self._api.SetCBufSize(buff_sz) 

839 if not success == 1: 

840 self.raise_error() 

841 

842 def set_save_recording(self, enabled): 

843 """ 

844 Sets the API to archive (or not archive) the complete time history 

845 being sent through the API. If enabled, this happens in the background 

846 as the test is running; and can be accessed by later launching the 900 

847 Series software. 

848 

849 Parameters 

850 ---------- 

851 enabled : bool 

852 If True, enables the recording. If False, disables it. 

853 """ 

854 if DEBUG: 

855 debug_fn("Calling SetSaveRecording") 

856 success = self._api.SetSaveRecording(enabled) 

857 if not success == 1: 

858 self.raise_error() 

859 

860 def get_num_inps_available(self): 

861 """ 

862 Returns the number of available input channels on the currently 

863 selected hardware. 

864 

865 Returns 

866 ------- 

867 int 

868 The number of input channels available for usage, with the 

869 currently selected hardware units. 

870 

871 """ 

872 if DEBUG: 

873 debug_fn("Calling GetNumInpsAvailable") 

874 inps_available = self._api.GetNumInpsAvailable() 

875 if DEBUG: 

876 debug_fn(f" {inps_available} Inputs Available") 

877 return inps_available 

878 

879 def get_num_inps_selected(self): 

880 """ 

881 Returns the number of selected input channels 

882 

883 Returns 

884 ------- 

885 int 

886 The number of input channels selected in the current setup. 

887 

888 """ 

889 if DEBUG: 

890 debug_fn("Calling GetNumInpsSelected") 

891 inps_selected = self._api.GetNumInpsSelected() 

892 if DEBUG: 

893 debug_fn(f" {inps_selected} Inputs Selected") 

894 return inps_selected 

895 

896 def get_num_outs_available(self): 

897 """ 

898 Returns the number of available output channels on the currently 

899 selected hardware. 

900 

901 Returns 

902 ------- 

903 int 

904 The number of output channels available for use, with the currently 

905 selected hardware units 

906 

907 """ 

908 if DEBUG: 

909 debug_fn("Calling GetNumOutsAvailable") 

910 outs_available = self._api.GetNumOutsAvailable() 

911 if DEBUG: 

912 debug_fn(f" {outs_available} Outputs Available") 

913 return outs_available 

914 

915 def get_num_outs_selected(self): 

916 """ 

917 Returns the number of output channels selected in the current test 

918 

919 Returns 

920 ------- 

921 int 

922 The number of output channels selected in the current setup 

923 

924 """ 

925 if DEBUG: 

926 debug_fn("Calling GetNumOutsSelected") 

927 outs_selected = self._api.GetNumOutsSelected() 

928 if DEBUG: 

929 debug_fn(f" {outs_selected} Outputs Selected") 

930 return outs_selected 

931 

932 def get_cbuf_size(self): 

933 """ 

934 Gets the number of samples, per channel, in the input and output 

935 channel circular buffers. 

936 

937 Returns 

938 ------- 

939 int 

940 The circular buffer size. 

941 

942 """ 

943 if DEBUG: 

944 debug_fn("Calling GetCBufSize") 

945 cbuf_size = self._api.GetCBufSize() 

946 if DEBUG: 

947 debug_fn(f" CBuf Size {cbuf_size}") 

948 return cbuf_size 

949 

950 def get_input_channel_bncs(self): 

951 """ 

952 Gets the BNC numbers of available input channels on the currently selected hardware. 

953 

954 Returns 

955 ------- 

956 bncs : array of int 

957 BNC numbers corresponding to input channels 

958 

959 """ 

960 num_inputs = self.get_num_inps_available() 

961 bncs = np.zeros((num_inputs,), dtype=np.int32) 

962 if DEBUG: 

963 debug_fn("Calling GetInputChannelBNCs") 

964 success = self._api.GetInputChannelBNCs(bncs) 

965 if not success == 1: 

966 self.raise_error() 

967 if DEBUG: 

968 debug_fn(f" Input BNCs: {bncs.tolist()}") 

969 return bncs 

970 

971 def get_output_channel_bncs(self): 

972 """ 

973 

974 

975 Parameters 

976 ---------- 

977 bncs : TYPE 

978 DESCRIPTION. 

979 

980 Returns 

981 ------- 

982 TYPE 

983 DESCRIPTION. 

984 

985 """ 

986 num_outputs = self.get_num_outs_available() 

987 bncs = np.zeros((num_outputs,), dtype=np.int32) 

988 if DEBUG: 

989 debug_fn("Calling GetOutputChannelBNCs") 

990 success = self._api.GetOutputChannelBNCs(bncs) 

991 if not success == 1: 

992 self.raise_error() 

993 if DEBUG: 

994 debug_fn(f" Output BNCs: {bncs.tolist()}") 

995 return bncs 

996 

997 def __del__(self): 

998 if DEBUG: 

999 log_file.close() 

1000 

1001 

1002# Example usage: 

1003# wrapper = DP900("path_to_your_dll.dll") 

1004# print(wrapper.is_hw_connected())