1# -*- coding: utf-8 -*-
2"""
3Hardware definition that allows for the Data Physics DP900 series hardware to
4be run with Rattlesnake.
5
6Rattlesnake Vibration Control Software
7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
9Government retains certain rights in this software.
10
11This program is free software: you can redistribute it and/or modify
12it under the terms of the GNU General Public License as published by
13the Free Software Foundation, either version 3 of the License, or
14(at your option) any later version.
15
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License for more details.
20
21You should have received a copy of the GNU General Public License
22along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""
24
25import ctypes
26import datetime
27from ctypes import c_bool, c_char_p, c_double, c_float, c_int
28from enum import Enum
29
30import numpy as np
31from numpy.ctypeslib import ndpointer
32
33DEBUG = False
34
35if DEBUG:
36 __log_file__ = "DataPhysics_Log.txt"
37 _PRINT_MESSAGE = True
38 _WRITE_MESSAGE = True
39 if _WRITE_MESSAGE:
40 log_file = open(__log_file__, "w", encoding="utf-8")
41
42 def debug_fn(message):
43 """Reports the message via print or log file
44
45 Parameters
46 ----------
47 message : str
48 The message to be reported
49 """
50 now = datetime.datetime.now()
51 if _PRINT_MESSAGE:
52 print(f"{now} -- {message}")
53 if _WRITE_MESSAGE:
54 log_file.write(f"{now} -- {message}\n")
55
56
57class DP900Status(Enum):
58 """Valid DP900 statuses"""
59
60 DISCONNECTED = -1
61 IDLE = 0
62 INIT = 1
63 RUNNING = 2
64 STOPPED = 3
65
66
67class DP900Coupling(Enum):
68 """Valid DP900 Couplings"""
69
70 AC_DIFFERENTIAL = 0
71 DC_DIFFERENTIAL = 1
72 AC_SINGLE_ENDED = 2
73 DC_SINGLE_ENDED = 3
74 AC_COUPLED_IEPE = 4
75
76
77class DP900:
78 """An interface to the data physics C API for the DP900 hardware"""
79
80 def __init__(self, library_path: str):
81 """
82 Connects to the library
83
84 Parameters
85 ----------
86 library_path : str
87 Path to the Dp900Matlab.dll file that is used to run the DP900 device
88
89 Returns
90 -------
91 None.
92
93 """
94 self._api = ctypes.WinDLL(library_path)
95 self._valid_input_ranges = np.array([0.1, 0.3, 1.0, 3.0, 10.0, 30.0])
96 self._valid_output_ranges = np.array([2.0, 10.0])
97 self._num_inputs = 0
98 self._num_outputs = 0
99
100 # Set up prototypes for the various function calls
101 # Define the argument and return types for each function
102
103 # DP900_API int IsHwConnected();
104 # self._api.IsHwConnected.argtypes = []
105 # self._api.IsHwConnected.restype = c_int
106
107 # DP900_API int Connect(char* testName);
108 self._api.Connect.argtypes = [c_char_p]
109 self._api.Connect.restype = c_int
110
111 # DP900_API int Disconnect();
112 self._api.Disconnect.argtypes = []
113 self._api.Disconnect.restype = c_int
114
115 # DP900_API int SetSampleRate(double sampleRate);
116 self._api.SetSampleRate.argtypes = [c_double]
117 self._api.SetSampleRate.restype = c_int
118
119 # DP900_API int Init();
120 self._api.Init.argtypes = []
121 self._api.Init.restype = c_int
122
123 # DP900_API int SetInpParams(
124 # int* coupling, int* chNum, float* sensitivity, float* range, int numInps);
125 self._api.SetInpParams.argtypes = [
126 ndpointer(c_int),
127 ndpointer(c_int),
128 ndpointer(c_float),
129 ndpointer(c_float),
130 c_int,
131 ]
132 self._api.SetInpParams.restype = c_int
133
134 # DP900_API int SetOutParams(
135 # float* sensitivity, float* range, int* chNums,int numOuts);
136 self._api.SetOutParams.argtypes = [
137 ndpointer(c_float),
138 ndpointer(c_float),
139 ndpointer(c_int),
140 c_int,
141 ]
142 self._api.SetOutParams.restype = c_int
143
144 # DP900_API int Start();
145 self._api.Start.argtypes = []
146 self._api.Start.restype = c_int
147
148 # DP900_API int Stop();
149 self._api.Stop.argtypes = []
150 self._api.Stop.restype = c_int
151
152 # DP900_API int End();
153 self._api.End.argtypes = []
154 self._api.End.restype = c_int
155
156 # DP900_API int EmergencyEnd();
157 self._api.EmergencyEnd.argtypes = []
158 self._api.EmergencyEnd.restype = c_int
159
160 # DP900_API char* GetSystemList(bool online);
161 self._api.GetSystemList.argtypes = [c_bool]
162 self._api.GetSystemList.restype = c_char_p
163
164 # DP900_API char* GetTestList();
165 self._api.GetTestList.argtypes = []
166 self._api.GetTestList.restype = c_char_p
167
168 # DP900_API int SetSystemList(char* sysList);
169 self._api.SetSystemList.argtypes = [c_char_p]
170 self._api.SetSystemList.restype = c_int
171
172 # DP900_API int SaveTest(char* testName);
173 self._api.SaveTest.argtypes = [c_char_p]
174 self._api.SaveTest.restype = c_int
175
176 # DP900_API int DeleteTest(char* testName);
177 self._api.DeleteTest.argtypes = [c_char_p]
178 self._api.DeleteTest.restype = c_int
179
180 # DP900_API int GetData(float* outputBuf, int dataType, int length);
181 self._api.GetData.argtypes = [ndpointer(c_float), c_int, c_int]
182 self._api.GetData.restype = c_int
183
184 # DP900_API int GetAvailableDataLength();
185 self._api.GetAvailableDataLength.argtypes = []
186 self._api.GetAvailableDataLength.restype = c_int
187
188 # DP900_API int GetSpaceInOutBuffer();
189 self._api.GetSpaceInOutBuffer.argtypes = []
190 self._api.GetSpaceInOutBuffer.restype = c_int
191
192 # DP900_API int GetTotalSamplesInOutputBuffer();
193 self._api.GetTotalSamplesInOutputBuffer.argtypes = []
194 self._api.GetTotalSamplesInOutputBuffer.restype = c_int
195
196 # DP900_API int PutOutData(float* outputBuf, int length);
197 self._api.PutOutData.argtypes = [ndpointer(c_float), c_int]
198 self._api.PutOutData.restype = c_int
199
200 # DP900_API char* GetErrorList();
201 self._api.GetErrorList.argtypes = []
202 self._api.GetErrorList.restype = c_char_p
203
204 # DP900_API int SetCBufSize(int buffSz);
205 self._api.SetCBufSize.argtypes = [c_int]
206 self._api.SetCBufSize.restype = c_int
207
208 # DP900_API int SetSaveRecording(int bEnabled);
209 self._api.SetSaveRecording.argtypes = [c_int]
210 self._api.SetSaveRecording.restype = c_int
211
212 # DP900_API int GetNumInpsAvailable();
213 self._api.GetNumInpsAvailable.argtypes = []
214 self._api.GetNumInpsAvailable.restype = c_int
215
216 # DP900_API int GetNumInpsSelected();
217 self._api.GetNumInpsSelected.argtypes = []
218 self._api.GetNumInpsSelected.restype = c_int
219
220 # DP900_API int GetNumOutsAvailable();
221 self._api.GetNumOutsAvailable.argtypes = []
222 self._api.GetNumOutsAvailable.restype = c_int
223
224 # DP900_API int GetNumOutsSelected();
225 self._api.GetNumOutsSelected.argtypes = []
226 self._api.GetNumOutsSelected.restype = c_int
227
228 # DP900_API int GetCBufSize();
229 self._api.GetCBufSize.argtypes = []
230 self._api.GetCBufSize.restype = c_int
231
232 # DP900_API int GetInputChannelBNCs(int* bncs);
233 self._api.GetInputChannelBNCs.argtypes = [ndpointer(c_int)]
234 self._api.GetInputChannelBNCs.restype = c_int
235
236 # DP900_API int GetOutputChannelBNCs(int* bncs);
237 self._api.GetOutputChannelBNCs.argtypes = [ndpointer(c_int)]
238 self._api.GetOutputChannelBNCs.restype = c_int
239
240 # if self.is_hardware_connected():
241 # self.status = DP900Status.IDLE
242 # else:
243 self.status = DP900Status.DISCONNECTED
244
245 @property
246 def num_outputs(self):
247 """Gets the number of output channels"""
248 return self._num_outputs
249
250 @property
251 def num_inputs(self):
252 """Gets the number of acquisition channels"""
253 return self._num_inputs
254
255 def raise_error(self):
256 """
257 Collects the data physics error and raises it in Python.
258
259 Raises
260 ------
261 RuntimeError
262 DESCRIPTION.
263
264 Returns
265 -------
266 None.
267
268 """
269 error = self.get_error_list()
270 if DEBUG:
271 debug_fn(f"DP Error: {error}")
272 raise RuntimeError(f"DP Error: {error}")
273
274 # def is_hw_connected(self):
275 # """
276 # Checks if the hardware is connected
277
278 # Returns
279 # -------
280 # bool
281 # Returns True if the hardware is already connected, otherwise returns
282 # False.
283
284 # """
285 # if DEBUG:
286 # debug_fn('Calling IsHwConnected\n')
287 # return bool(self._api.IsHwConnected())
288
289 def connect(self, test_name):
290 """
291 Launches background processes if not running, and connects the 900 API software to them.
292
293 Parameters
294 ----------
295 test_name : str
296 Pointer to a null-terminated (C-style) char array representing the test name.
297 Pass an empty string (pointer to an empty char) to start from a new test.
298 Unless recalling a Saved test setup (with save_test()), pass an empty string.
299 Commas are not allowed in the string.
300
301 """
302 # if not self.is_hw_connected():
303 if DEBUG:
304 debug_fn(f"Calling Connect with\n test_name = {test_name}")
305 success = self._api.Connect(test_name.encode("utf-8"))
306 # else:
307 # raise RuntimeError('Hardware is already connected')
308 if not success == 1:
309 self.raise_error()
310 else:
311 self.status = DP900Status.IDLE
312
313 def disconnect(self):
314 """
315 Disconnects from the API.
316
317 Returns
318 -------
319 int
320 1 if disconnection to the API was successful, 0 if not successful.
321
322 """
323 # if self.is_hw_connected():
324 if DEBUG:
325 debug_fn("Calling Disconnect")
326 success = self._api.Disconnect()
327 # else:
328 # raise RuntimeError('Hardware is not connected')
329 if not success == 1:
330 self.raise_error()
331 else:
332 self.status = DP900Status.DISCONNECTED
333
334 def set_sample_rate(self, sample_rate):
335 """
336 Sets the sample rate to be used for the next test. The sample rate can
337 not be changed while a test is running; it must be set before the
338 init() command.
339
340 The sample rate can be arbitrarily set to any value between 256Hz and 216kHz.
341
342 Parameters
343 ----------
344 sample_rate : float
345 The sample rate for the test.
346 """
347 if DEBUG:
348 debug_fn(f"Calling SetSampleRate with\n sample_rate = {sample_rate}")
349 success = self._api.SetSampleRate(sample_rate)
350 if not success == 1:
351 self.raise_error()
352
353 def init(self):
354 """
355 Initializes the test
356
357 Returns
358 -------
359 int
360 0 if the command was not successful; 1 if the command was
361 successful.
362 """
363 if self.status == DP900Status.IDLE:
364 if DEBUG:
365 debug_fn("Calling Init")
366 success = self._api.Init()
367 if not success == 1:
368 self.raise_error()
369 else:
370 self.status = DP900Status.INIT
371 else:
372 raise RuntimeError(
373 f"Hardware status must be IDLE to initialize. "
374 f"Current status is {self.status.name}."
375 )
376
377 def setup_input_parameters(self, coupling_array, channel_array, sensitivity_array, range_array):
378 """
379 Tells the API how many channels will be used; as well as the hardware
380 settings for those parameters.
381
382 Parameters
383 ----------
384 coupling_array : array of DP900Coupling
385 Pointer to an integer array, of length num_inps representing the
386 hardware coupling for each channel
387 channel_array : array of integers
388 Array of integers of length num_inps where the nth value represents
389 the nth input channel number. If channels are to be skipped
390 (eg. enabling channels 1, 2, 5, 6) the array will contain
391 non-consecutive numbers (eg [1, 2, 5, 6] with num_inps=4)
392 sensitivity_array : array of float
393 Pointer to a float array, of length numInps where the nth value
394 represents the sensitivity of the nth input channel. After voltages
395 are read from the hardware, each channel is scaled by its
396 sensitivity before being recorded or passed to the API.
397 range_array : array of float
398 Pointer to a float array, of length numInps where the nth value
399 represents the voltage range of the nth input channel. Can be
400 0.1, 0.3, 1.0, 3.0, 10.0, 30.0
401 """
402 # Set up the channel arrays
403 if len(coupling_array) != len(sensitivity_array):
404 raise ValueError("Coupling array must have same size as Sensitivity Array")
405 if len(range_array) != len(sensitivity_array):
406 raise ValueError("Range array must have same size as Sensitivity Array")
407 if len(channel_array) != len(sensitivity_array):
408 raise ValueError("Channel array must have same size as Sensitivity Array")
409 self._num_inputs = len(coupling_array)
410 coupling_array = np.array(
411 [int(coupling.value) for coupling in coupling_array], dtype=np.int32
412 )
413 sensitivity_array = np.array([float(val) for val in sensitivity_array], dtype=np.float32)
414 channel_array = np.array([int(channel) for channel in channel_array], dtype=np.int32)
415 validated_range_array = []
416 for rng in range_array:
417 close_ranges = self._valid_input_ranges[np.isclose(self._valid_input_ranges, rng)]
418 if len(close_ranges) == 0:
419 raise ValueError(
420 f"Range {rng} is not valid. Valid sample rates are "
421 f"{', '.join([f'{v:0.1f}' for v in self._valid_input_ranges])}"
422 )
423 elif len(close_ranges) > 1:
424 raise ValueError(
425 f"Multiple Ranges are close to the specified rate ({rng}, {close_ranges}). "
426 f"This shouldn't happen!"
427 )
428 validated_range_array.append(close_ranges[0])
429 validated_range_array = np.array(validated_range_array, dtype=np.float32)
430 # Call the API function
431 if DEBUG:
432 debug_fn(
433 f"Calling SetInpParams with\n "
434 f"coupling_array = {coupling_array.tolist()}\n "
435 f"channel_array = {channel_array.tolist()}\n "
436 f"sensitivity_array = {sensitivity_array.tolist()}\n "
437 f"range_array = {validated_range_array.tolist()}\n "
438 f"num_inputs = {self._num_inputs}"
439 )
440 success = self._api.SetInpParams(
441 coupling_array,
442 channel_array,
443 sensitivity_array,
444 validated_range_array,
445 ctypes.c_int(self._num_inputs),
446 )
447 if not success == 1:
448 self.raise_error()
449
450 def setup_output_parameters(self, sensitivity_array, range_array, channel_array):
451 """
452 Configures the settings for the output channels to use during test.
453
454 This must be run before a test is Initialized (Init()).
455
456 Parameters
457 ----------
458 sensitivity_array : array of float
459 A floating-point array, where the Nth element representing the
460 sensitivity (scaling) for the Nth output channel, in EU/V
461 (EU=Engineering Unit). Each sample, when acquired, is divided by
462 this value. A value of 1 essentially applies no scaling (outputs
463 are voltages)
464 range_array : array of float
465 A floating point array containing the output range for each output
466 channel.
467 channel_array : array of integers
468 Array of integers of length num_outputs where the nth value represents
469 the nth output channel number. If channels are to be skipped
470 (eg. enabling channels 1, 2, 5, 6) the array will contain
471 non-consecutive numbers (eg [1, 2, 5, 6] with num_outputs=4)
472
473 Raises
474 ------
475 ValueError
476 If input arrays are not the same size or if ranges are not valid.
477
478 """
479 if len(range_array) != len(sensitivity_array):
480 raise ValueError("Range array must have same size as Sensitivity Array")
481 if len(channel_array) != len(sensitivity_array):
482 raise ValueError("Channel number array must have same size as Sensitivity Array")
483 self._num_outputs = len(sensitivity_array)
484 sensitivity_array = np.array([float(val) for val in sensitivity_array], dtype=np.float32)
485 channel_array = np.array([int(val) for val in channel_array], dtype=np.int32)
486 validated_range_array = []
487 for rng in range_array:
488 close_ranges = self._valid_output_ranges[np.isclose(self._valid_output_ranges, rng)]
489 if len(close_ranges) == 0:
490 raise ValueError(
491 f"Range {rng} is not valid. Valid sample rates are "
492 f"{', '.join([f'{v:0.1f}' for v in self._valid_output_ranges])}"
493 )
494 elif len(close_ranges) > 1:
495 raise ValueError(
496 f"Multiple Ranges are close to the specified rate ({rng}, {close_ranges}). "
497 f"This shouldn't happen!"
498 )
499 validated_range_array.append(close_ranges[0])
500 validated_range_array = np.array(validated_range_array, dtype=np.float32)
501 # Call the API function
502 if DEBUG:
503 debug_fn(
504 f"Calling SetOutParams with \n "
505 f"sensitivity_array = {sensitivity_array.tolist()}\n "
506 f"range_array = {validated_range_array.tolist()}\n "
507 f"channel_array = {channel_array.tolist()}\n "
508 f"num_outputs = {self._num_outputs}"
509 )
510 success = self._api.SetOutParams(
511 sensitivity_array,
512 validated_range_array,
513 channel_array,
514 ctypes.c_int(self._num_outputs),
515 )
516 if not success == 1:
517 self.raise_error()
518
519 def start(self):
520 """
521 Starts data acquisition (and output through output channel). The test
522 must be initialized (init()) before running start().
523 """
524 if self.status in [DP900Status.INIT, DP900Status.STOPPED]:
525 if DEBUG:
526 debug_fn("Calling Start")
527 success = self._api.Start()
528 if not success == 1:
529 self.raise_error()
530 else:
531 self.status = DP900Status.RUNNING
532 else:
533 raise RuntimeError(
534 f"Current hardware status is {self.status.name}. Hardware must be initialized or "
535 f"stopped prior to starting a measurement"
536 )
537
538 def stop(self):
539 """
540 Stops data acquisition (and output through output channel). The test
541 must be started (start()) before running stop().
542 """
543 if self.status == DP900Status.RUNNING:
544 if DEBUG:
545 debug_fn("Calling Stop")
546 success = self._api.Stop()
547 if not success == 1:
548 self.raise_error()
549 else:
550 self.status = DP900Status.STOPPED
551 else:
552 raise RuntimeError(
553 f"Current hardware status is {self.status.name}. Hardware must be "
554 "running prior to stopping a measurement"
555 )
556
557 def end(self):
558 """
559 Ends the current test. The test must be stopped (stop()) before running
560 Stop().
561 """
562 if self.status in [DP900Status.STOPPED, DP900Status.INIT]:
563 if DEBUG:
564 debug_fn("Calling End")
565 success = self._api.End()
566 if not success == 1:
567 self.raise_error()
568 else:
569 self.status = DP900Status.IDLE
570
571 def emergency_end(self):
572 """
573 If a test is Initialized or Started; but the client (your) code running
574 the API crashes or goes into a bad state; the client code can be
575 relaunched and execute this function to Stop/End a test that is in
576 progress.
577
578 Stops or Ends a test that is currently running, but the API is not
579 connected to. This is intended to be used if your code crashes or goes
580 into a bad state while the API is running a test.
581 """
582 if DEBUG:
583 debug_fn("Calling EmergencyEnd")
584 success = self._api.EmergencyEnd()
585 if not success == 1:
586 self.raise_error()
587 else:
588 self.status = DP900Status.IDLE
589
590 def get_system_list(self, online):
591 """
592 Gets a list of hardware units available for selection
593 in the test.
594
595 Note: this command also returns Abacus0 blocks (Data Server), which
596 can not be selected for use in a test.
597
598 Parameters
599 ----------
600 online : bool
601 If True, the returned list will only contain systems that are all
602 online (detected by the 900 API).
603
604 Returns
605 -------
606 list
607 A list of systems that are selectable for use in
608 the 900 software
609
610 """
611 if DEBUG:
612 debug_fn(f"Calling GetSystemList with\n online = {online}")
613 return self._api.GetSystemList(online).decode("utf-8").split(",")
614
615 def get_test_list(self):
616 """
617 Gets a comma separated list of tests available for opening with the
618 connect() command.
619
620 Note: This will return a list of all tests in the Manage Tests screen
621 of DP900. If DP900 contains tests which were not created with the API,
622 they should not be opened with the API.
623
624 Returns
625 -------
626 str
627 A comma separated list of tests available for opening with the
628 connect() command
629
630 """
631 if DEBUG:
632 debug_fn("Calling GetTestList")
633 return self._api.GetTestList().decode("utf-8")
634
635 def set_system_list(self, sys_list):
636 """
637 Sets the hardware units available to be used in the test.
638
639 Parameters
640 ----------
641 sys_list : array of strings
642 A list of the names of systems to include in the test (eg.
643 ["dp912-98012","dp901-94019"]).
644 """
645 if isinstance(sys_list, str):
646 sys_list = [sys_list]
647 sys_list = ",".join(sys_list)
648 if DEBUG:
649 debug_fn(f"Calling SetSystemList with\n system_list = {sys_list}")
650 success = self._api.SetSystemList(sys_list.encode("utf-8"))
651 if not success == 1:
652 self.raise_error()
653
654 def save_test(self, test_name):
655 """
656 Saves the test setup for recalling with the connect() command.
657
658 The test will be saved in the 900 Series database, and can be recalled
659 with the connect() call.
660
661 Parameters
662 ----------
663 test_name : str
664 String to use as the name for the test to be saved as.
665
666 """
667 if DEBUG:
668 debug_fn(f"Calling SaveTest with\n test_name = {test_name}")
669 success = self._api.SaveTest(test_name.encode("utf-8"))
670 if not success == 1:
671 self.raise_error()
672
673 def delete_test(self, test_name):
674 """
675 Deletes a test setup from the archived database.
676
677 Parameters
678 ----------
679 test_name : str
680 String containing the name of the test to be deleted
681 """
682 if DEBUG:
683 debug_fn(f"Calling DeleteTest with\n test_name = {test_name}")
684 success = self._api.DeleteTest(test_name.encode("utf-8"))
685 if not success == 1:
686 self.raise_error()
687
688 def read_input_data(self, num_samples, newest_data=False):
689 """
690 Reads data from the data acquisition software
691
692 Parameters
693 ----------
694 num_samples : int
695 The number of samples to read
696 newest_data : TYPE, optional
697 If True, gets the newest data in the buffer, otherwise gets the
698 oldest data in the buffer. The default is False.
699
700 Returns
701 -------
702 np.ndarray
703 An array of data with shape (num_inputs + num_outputs) x num_samples
704
705 """
706 read_array = np.zeros(
707 (self._num_inputs + self._num_outputs) * num_samples, dtype=np.float32
708 )
709 read_type = ctypes.c_int(0 if newest_data else 1)
710 if DEBUG:
711 debug_fn(f"Calling GetData with\n length = {num_samples}")
712 _ = self._api.GetData(read_array, read_type, ctypes.c_int(num_samples))
713 # if not success == 1:
714 # self.raise_error()
715 return read_array.reshape((self._num_inputs + self._num_outputs, num_samples))
716
717 def get_available_input_data_samples(self):
718 """
719 Gets the number of available samples in the input channel circular
720 buffers.
721
722 Returns
723 -------
724 samples : int
725 The number of samples that are available to be read out of the input
726 channel buffer.
727
728 """
729 if DEBUG:
730 debug_fn("Calling GetAvailableDataLength")
731 samples = self._api.GetAvailableDataLength()
732 if DEBUG:
733 debug_fn(f"{samples} Samples Available")
734 return samples
735
736 def get_space_in_out_buffer(self):
737 """
738 Gets the amount of free space in the output buffer (how much data can
739 be sent to the output buffer before it is full).
740
741 Returns
742 -------
743 int
744 The number of samples that can be sent to the output buffer
745 before it is full
746
747 """
748 if DEBUG:
749 debug_fn("Calling GetSpaceInOutBuffer")
750 samples = self._api.GetSpaceInOutBuffer()
751 if DEBUG:
752 debug_fn(f" {samples} Samples Available in Buffer")
753 return samples
754
755 def get_total_output_samples_on_buffer(self):
756 """
757 Gets the number of samples in the output channel circular buffers.
758
759 Returns
760 -------
761 samples : int
762 The number of samples in line to be output through the output
763 channels.
764
765 """
766 if DEBUG:
767 debug_fn("Calling GetTotalSamplesInOutputBuffer")
768 samples = self._api.GetTotalSamplesInOutputBuffer()
769 if DEBUG:
770 debug_fn(f" {samples} Output Samples Available")
771 return samples
772
773 def write_output_data(self, output_data):
774 """
775 The function will fill the output buffer with data specified, which will
776 result in it eventually getting output from the system.
777
778 Parameters
779 ----------
780 output_data : array of float
781 2D data array with shape num_outputs x num_samples containing the
782 data to be output from the signal generator.
783
784 Raises
785 ------
786 ValueError
787 DESCRIPTION.
788
789 Returns
790 -------
791 None.
792
793 """
794 if output_data.ndim != 2:
795 raise ValueError("`output_data` should have 2 dimensions (num_outputs x num_samples)")
796 if output_data.shape[0] != self._num_outputs:
797 raise ValueError(
798 f"`output_data` must have number of rows equal to the number of "
799 f"outputs ({self._num_outputs})"
800 )
801 num_samples = output_data.shape[-1]
802 this_output_data = np.zeros(np.prod(output_data.shape), dtype=np.float32)
803 this_output_data[:] = output_data.flatten().astype(np.float32)
804 # debug_fn(this_output_data.shape, num_samples, self._num_outputs)
805 if DEBUG:
806 debug_fn(f"Calling PutOutData with\n length {num_samples}")
807 _ = self._api.PutOutData(this_output_data, ctypes.c_int(num_samples))
808 # if not success == 1:
809 # self.raise_error()
810
811 def get_raw_error_list(self):
812 """Gets the raw bytes from the error list"""
813 if DEBUG:
814 debug_fn("Calling GetErrorList")
815 return self._api.GetErrorList()
816
817 def get_error_list(self):
818 """Gets the decoded error list"""
819 if DEBUG:
820 debug_fn("Calling GetErrorList")
821 data = self._api.GetErrorList()
822 return data.decode()
823
824 def set_buffer_size(self, buff_sz):
825 """
826 Sets the number of samples, per channel, in the input and output
827 channel circular buffers.
828
829 Parameters
830 ----------
831 buff_sz : int
832 The desired size of the circular buffers, in samples. Must be greater
833 than 4096. The upper limit of buff_sz is limited by available
834 memory in the PC.
835 """
836 if DEBUG:
837 debug_fn("Calling SetCBufSize")
838 success = self._api.SetCBufSize(buff_sz)
839 if not success == 1:
840 self.raise_error()
841
842 def set_save_recording(self, enabled):
843 """
844 Sets the API to archive (or not archive) the complete time history
845 being sent through the API. If enabled, this happens in the background
846 as the test is running; and can be accessed by later launching the 900
847 Series software.
848
849 Parameters
850 ----------
851 enabled : bool
852 If True, enables the recording. If False, disables it.
853 """
854 if DEBUG:
855 debug_fn("Calling SetSaveRecording")
856 success = self._api.SetSaveRecording(enabled)
857 if not success == 1:
858 self.raise_error()
859
860 def get_num_inps_available(self):
861 """
862 Returns the number of available input channels on the currently
863 selected hardware.
864
865 Returns
866 -------
867 int
868 The number of input channels available for usage, with the
869 currently selected hardware units.
870
871 """
872 if DEBUG:
873 debug_fn("Calling GetNumInpsAvailable")
874 inps_available = self._api.GetNumInpsAvailable()
875 if DEBUG:
876 debug_fn(f" {inps_available} Inputs Available")
877 return inps_available
878
879 def get_num_inps_selected(self):
880 """
881 Returns the number of selected input channels
882
883 Returns
884 -------
885 int
886 The number of input channels selected in the current setup.
887
888 """
889 if DEBUG:
890 debug_fn("Calling GetNumInpsSelected")
891 inps_selected = self._api.GetNumInpsSelected()
892 if DEBUG:
893 debug_fn(f" {inps_selected} Inputs Selected")
894 return inps_selected
895
896 def get_num_outs_available(self):
897 """
898 Returns the number of available output channels on the currently
899 selected hardware.
900
901 Returns
902 -------
903 int
904 The number of output channels available for use, with the currently
905 selected hardware units
906
907 """
908 if DEBUG:
909 debug_fn("Calling GetNumOutsAvailable")
910 outs_available = self._api.GetNumOutsAvailable()
911 if DEBUG:
912 debug_fn(f" {outs_available} Outputs Available")
913 return outs_available
914
915 def get_num_outs_selected(self):
916 """
917 Returns the number of output channels selected in the current test
918
919 Returns
920 -------
921 int
922 The number of output channels selected in the current setup
923
924 """
925 if DEBUG:
926 debug_fn("Calling GetNumOutsSelected")
927 outs_selected = self._api.GetNumOutsSelected()
928 if DEBUG:
929 debug_fn(f" {outs_selected} Outputs Selected")
930 return outs_selected
931
932 def get_cbuf_size(self):
933 """
934 Gets the number of samples, per channel, in the input and output
935 channel circular buffers.
936
937 Returns
938 -------
939 int
940 The circular buffer size.
941
942 """
943 if DEBUG:
944 debug_fn("Calling GetCBufSize")
945 cbuf_size = self._api.GetCBufSize()
946 if DEBUG:
947 debug_fn(f" CBuf Size {cbuf_size}")
948 return cbuf_size
949
950 def get_input_channel_bncs(self):
951 """
952 Gets the BNC numbers of available input channels on the currently selected hardware.
953
954 Returns
955 -------
956 bncs : array of int
957 BNC numbers corresponding to input channels
958
959 """
960 num_inputs = self.get_num_inps_available()
961 bncs = np.zeros((num_inputs,), dtype=np.int32)
962 if DEBUG:
963 debug_fn("Calling GetInputChannelBNCs")
964 success = self._api.GetInputChannelBNCs(bncs)
965 if not success == 1:
966 self.raise_error()
967 if DEBUG:
968 debug_fn(f" Input BNCs: {bncs.tolist()}")
969 return bncs
970
971 def get_output_channel_bncs(self):
972 """
973
974
975 Parameters
976 ----------
977 bncs : TYPE
978 DESCRIPTION.
979
980 Returns
981 -------
982 TYPE
983 DESCRIPTION.
984
985 """
986 num_outputs = self.get_num_outs_available()
987 bncs = np.zeros((num_outputs,), dtype=np.int32)
988 if DEBUG:
989 debug_fn("Calling GetOutputChannelBNCs")
990 success = self._api.GetOutputChannelBNCs(bncs)
991 if not success == 1:
992 self.raise_error()
993 if DEBUG:
994 debug_fn(f" Output BNCs: {bncs.tolist()}")
995 return bncs
996
997 def __del__(self):
998 if DEBUG:
999 log_file.close()
1000
1001
1002# Example usage:
1003# wrapper = DP900("path_to_your_dll.dll")
1004# print(wrapper.is_hw_connected())