1# -*- coding: utf-8 -*-
2"""
3Hardware definition that allows for the Data Physics Quattro Device to be run
4with Rattlesnake.
5
6Rattlesnake Vibration Control Software
7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
9Government retains certain rights in this software.
10
11This program is free software: you can redistribute it and/or modify
12it under the terms of the GNU General Public License as published by
13the Free Software Foundation, either version 3 of the License, or
14(at your option) any later version.
15
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License for more details.
20
21You should have received a copy of the GNU General Public License
22along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""
24
25import ctypes
26from enum import Enum
27
28import numpy as np
29from numpy.ctypeslib import ndpointer
30
31DEBUG = False
32
33if DEBUG:
34 __log_file__ = "DataPhysics_Log.txt"
35
36
37class QuattroStatus(Enum):
38 """Valid Quattro statuses"""
39
40 DISCONNECTED = -1
41 IDLE = 0
42 INIT = 1
43 RUNNING = 2
44 STOPPED = 3
45
46
47class QuattroCoupling(Enum):
48 """Valid Quattro Couplings"""
49
50 AC_DIFFERENTIAL = 0
51 DC_DIFFERENTIAL = 1
52 AC_SINGLE_ENDED = 2
53 DC_SINGLE_ENDED = 3
54 AC_COUPLED_IEPE = 4
55
56
57class DPQuattro:
58 """An interface to the data physics C API for the quattro hardware"""
59
60 def __init__(self, library_path: str):
61 """
62 Connects to the library
63
64 Parameters
65 ----------
66 library_path : str
67 Path to the DpQuattro.dll file that is used to run the Quattro device
68
69 Returns
70 -------
71 None.
72
73 """
74 if DEBUG:
75 self.log_file = open(__log_file__, "w", encoding="utf-8")
76 self.input_channel_parameters = None
77 self.output_channel_parameters = None
78 self._api = ctypes.WinDLL(library_path)
79 self._valid_sample_rates = np.array(
80 [
81 10.24,
82 12.5,
83 12.8,
84 13.1072,
85 16,
86 16.384,
87 20,
88 20.48,
89 25,
90 25.6,
91 32,
92 32.768,
93 40,
94 40.96,
95 50,
96 51.2,
97 64,
98 65.536,
99 80,
100 81.92,
101 100,
102 102.4,
103 128,
104 160,
105 163.84,
106 200,
107 204.8,
108 256,
109 320,
110 327.68,
111 400,
112 409.6,
113 512,
114 640,
115 800,
116 819.2,
117 1024,
118 1280,
119 1600,
120 1638.4,
121 2048,
122 2560,
123 3200,
124 4096,
125 5120,
126 6400,
127 8192,
128 10240,
129 12800,
130 20480,
131 25600,
132 40960,
133 51200,
134 102400,
135 204800,
136 ]
137 )
138 self._valid_input_ranges = np.array([0.1, 1.0, 10.0, 20.0])
139 self._valid_output_ranges = np.array([2.0, 10.0])
140 self._num_inputs = 0
141 self._num_outputs = 0
142
143 # Set up prototypes for the various function calls
144 # Comments are from the quattro API header file
145 # DPCOMM_API int IsHwConnected();
146 self._api.IsHwConnected.restype = ctypes.c_int
147 # DPCOMM_API int Connect();
148 self._api.Connect.restype = ctypes.c_int
149 # DPCOMM_API int Disconnect();
150 self._api.Disconnect.restype = ctypes.c_int
151 # DPCOMM_API int SetSampleRate(double sampleRate);
152 self._api.SetSampleRate.argtypes = (ctypes.c_double,)
153 # DPCOMM_API int IsLicensed();
154 self._api.IsLicensed.restype = ctypes.c_int
155 # DPCOMM_API int Init();
156 self._api.Init.restype = ctypes.c_int
157 # DPCOMM_API int SetInpParams(
158 # int* coupling, float* sensitivity, float* range, int numInps);
159 self._api.SetInpParams.argtypes = (
160 ndpointer(ctypes.c_int),
161 ndpointer(ctypes.c_float),
162 ndpointer(ctypes.c_float),
163 ctypes.c_int,
164 )
165 # DPCOMM_API int SetOutParams(float* sensitivity, float* range, int numOuts);
166 self._api.SetOutParams.argtypes = (
167 ndpointer(ctypes.c_float),
168 ndpointer(ctypes.c_float),
169 ctypes.c_int,
170 )
171 # DPCOMM_API int SetTacParams(
172 # int* coupling, float* holdOffTime, float* hysteresis, float* preScaler, float* PPR,
173 # float* smoothing, float* speedRatio, float* trigLevel, int* trigSlope, int numTacs);
174 # DPCOMM_API int Start();
175 self._api.Start.restype = ctypes.c_int
176 # DPCOMM_API int Stop();
177 self._api.Stop.restype = ctypes.c_int
178 # DPCOMM_API int End();
179 self._api.End.restype = ctypes.c_int
180 # DPCOMM_API int SerialNumber();
181 self._api.SerialNumber.restype = ctypes.c_int
182 # DPCOMM_API int GetData(float* outputBuf, int dataType, int length);
183 self._api.GetData.argtypes = (
184 ndpointer(ctypes.c_float),
185 ctypes.c_int,
186 ctypes.c_int,
187 )
188 self._api.GetData.restype = ctypes.c_int
189 # DPCOMM_API int GetAvailableDataLength();
190 self._api.GetAvailableDataLength.restype = ctypes.c_int
191 # # DPCOMM_API int GetAvailableOutData();
192 # self._api.GetAvailableOutData.restype = ctypes.c_int
193 # DPCOMM_API int GetTotalSamplesInOutputBuffer();
194 self._api.GetTotalSamplesInOutputBuffer.restype = ctypes.c_int
195 # DPCOMM_API int PutOutData(float* outputBuf, int length);
196 self._api.PutOutData.argtypes = (ndpointer(ctypes.c_float), ctypes.c_int)
197 self._api.PutOutData.restype = ctypes.c_int
198 # DPCOMM_API char* GetErrorList();
199 self._api.GetErrorList.restype = ctypes.c_char_p
200 # DPCOMM_API int SetCBufSize(int buffSz);
201 self._api.SetCBufSize.argtypes = (ctypes.c_int,)
202 # DPCOMM_API int GetCBufSize();
203 self._api.GetCBufSize.restype = ctypes.c_int
204
205 if self.is_hardware_connected():
206 self.status = QuattroStatus.IDLE
207 else:
208 self.status = QuattroStatus.DISCONNECTED
209
210 def connect(self):
211 """Connects to the hardware"""
212 if not self.is_hardware_connected():
213 if DEBUG:
214 self.log_file.write("Calling Connect\n")
215 success = self._api.Connect()
216 else:
217 raise RuntimeError("Hardware is already connected")
218 if not success:
219 self.raise_error()
220 else:
221 self.status = QuattroStatus.IDLE
222
223 def disconnect(self):
224 """Disconnects from the hardware"""
225 if self.is_hardware_connected():
226 if DEBUG:
227 self.log_file.write("Calling Disconnect\n")
228 success = self._api.Disconnect()
229 else:
230 raise RuntimeError("Hardware is not connected")
231 if not success:
232 self.raise_error()
233 else:
234 self.status = QuattroStatus.DISCONNECTED
235
236 def is_hardware_connected(self):
237 """Check if the hardware is connected or not"""
238 if DEBUG:
239 self.log_file.write("Calling IsHwConnected\n")
240 return bool(self._api.IsHwConnected())
241
242 def get_raw_error_list(self):
243 """Gets the raw bytes of the error list from the hardware"""
244 if DEBUG:
245 self.log_file.write("Calling GetErrorList\n")
246 return self._api.GetErrorList()
247
248 def get_error_list(self):
249 """Gets the decoded error list from the hardware"""
250 if DEBUG:
251 self.log_file.write("Calling GetErrorList\n")
252 data = self._api.GetErrorList()
253 return data.decode()
254
255 def set_sample_rate(self, sample_rate):
256 """Sets the sample rate of the hardware
257
258 Parameters
259 ----------
260 sample_rate : float
261 The desired sample rate of the data acquisiiton system
262
263 Raises
264 ------
265 ValueError
266 If the sample rate is not valid
267 """
268 close_rates = np.isclose(self._valid_sample_rates, sample_rate)
269 close_sample_rates = self._valid_sample_rates[close_rates]
270 if len(close_sample_rates) == 0:
271 raise ValueError(
272 f"Sample Rate {sample_rate} is not valid. Valid sample rates are "
273 f"{', '.join([f'{v:0.2f}' for v in self._valid_sample_rates])}"
274 )
275 elif len(close_sample_rates) > 1:
276 raise ValueError(
277 f"Multiple Sample Rates are close to the specified rate ({sample_rate}, "
278 f"{close_sample_rates}). This shouldn't happen!"
279 )
280 if DEBUG:
281 self.log_file.write("Calling SetSampleRate\n")
282 success = self._api.SetSampleRate(ctypes.c_double(close_sample_rates[0]))
283 if not success:
284 self.raise_error()
285
286 def is_licensed(self):
287 """Checks the licensing of the hardware
288
289 Returns
290 -------
291 bool
292 Returns True if the hardware is licensed
293 """
294 if DEBUG:
295 self.log_file.write("Calling IsLicensed\n")
296 return bool(self._api.IsLicensed())
297
298 def initialize(self):
299 """Initializes the data acquisition system
300
301 Raises
302 ------
303 RuntimeError
304 if the hardware is not currently in the idle state
305 """
306 if self.status == QuattroStatus.IDLE:
307 if DEBUG:
308 self.log_file.write("Calling Init\n")
309 success = self._api.Init()
310 if not success:
311 self.raise_error()
312 else:
313 self.status = QuattroStatus.INIT
314 else:
315 raise RuntimeError(
316 f"Hardware status must be IDLE to initialize. "
317 f"Current status is {self.status.name}."
318 )
319
320 def setup_input_parameters(self, coupling_array, sensitivity_array, range_array):
321 """Sets up the acquisition channels for the data acquisition system
322
323 Parameters
324 ----------
325 coupling_array : np.ndarray
326 An array of coupling values for the data acquisition system
327 sensitivity_array : np.ndarray
328 An array of sensitivity values for the data acquisition system
329 range_array : np.ndarray
330 An array of ranges for the data acquisition system
331
332 Raises
333 ------
334 ValueError
335 if any invalid values are passed or the arrays are not the same size
336 """
337 # Set up the channel arrays
338 if len(coupling_array) != len(sensitivity_array):
339 raise ValueError("Coupling array must have same size as Sensitivity Array")
340 if len(range_array) != len(sensitivity_array):
341 raise ValueError("Range array must have same size as Sensitivity Array")
342 self._num_inputs = len(coupling_array)
343 coupling_array = np.array(
344 [int(coupling.value) for coupling in coupling_array], dtype=np.int32
345 )
346 sensitivity_array = np.array([float(val) for val in sensitivity_array], dtype=np.float32)
347 validated_range_array = []
348 for rng in range_array:
349 close_ranges = self._valid_input_ranges[np.isclose(self._valid_input_ranges, rng)]
350 if len(close_ranges) == 0:
351 raise ValueError(
352 f"Range {rng} is not valid. Valid sample rates are "
353 f"{', '.join([f'{v:0.1f}' for v in self._valid_input_ranges])}"
354 )
355 elif len(close_ranges) > 1:
356 raise ValueError(
357 f"Multiple Ranges are close to the specified rate ({rng}, {close_ranges}). "
358 f"This shouldn't happen!"
359 )
360 validated_range_array.append(close_ranges[0])
361 validated_range_array = np.array(validated_range_array, dtype=np.float32)
362 # Call the API function
363 if DEBUG:
364 self.log_file.write("Calling SetInpParams\n")
365 success = self._api.SetInpParams(
366 coupling_array,
367 sensitivity_array,
368 validated_range_array,
369 ctypes.c_int(self._num_inputs),
370 )
371 if not success:
372 self.raise_error()
373
374 def setup_output_parameters(self, sensitivity_array, range_array):
375 """Sets up the drive channels on the data acquisition system
376
377 Parameters
378 ----------
379 sensitivity_array : np.ndarray
380 An array of sensitivities to apply to the output channels
381 range_array : np.ndarray
382 An array of ranges to use for the output channels
383
384 Raises
385 ------
386 ValueError
387 if invalid values are passed or arrays are not the same size
388 """
389 if len(range_array) != len(sensitivity_array):
390 raise ValueError("Range array must have same size as Sensitivity Array")
391 self._num_outputs = len(sensitivity_array)
392 sensitivity_array = np.array([float(val) for val in sensitivity_array], dtype=np.float32)
393 validated_range_array = []
394 for rng in range_array:
395 close_ranges = self._valid_output_ranges[np.isclose(self._valid_output_ranges, rng)]
396 if len(close_ranges) == 0:
397 raise ValueError(
398 f"Range {rng} is not valid. Valid sample rates are "
399 f"{', '.join([f'{v:0.1f}' for v in self._valid_output_ranges])}"
400 )
401 elif len(close_ranges) > 1:
402 raise ValueError(
403 f"Multiple Ranges are close to the specified rate ({rng}, {close_ranges}). "
404 f"This shouldn't happen!"
405 )
406 validated_range_array.append(close_ranges[0])
407 validated_range_array = np.array(validated_range_array, dtype=np.float32)
408 # Call the API function
409 if DEBUG:
410 self.log_file.write("Calling SetOutParams\n")
411 success = self._api.SetOutParams(
412 sensitivity_array, validated_range_array, ctypes.c_int(self._num_outputs)
413 )
414 if not success:
415 self.raise_error()
416
417 def raise_error(self):
418 """Raises an error any writes the error list to the log file"""
419 if DEBUG:
420 self.log_file.write(f"DP Error: {self.get_error_list()}\n")
421 # raise RuntimeError(self.get_error_list())
422
423 def start(self):
424 """Starts the data acquisiiton system
425
426 Raises
427 ------
428 RuntimeError
429 if the status is not either initialized or stopped
430 """
431 if self.status in [QuattroStatus.INIT, QuattroStatus.STOPPED]:
432 if DEBUG:
433 self.log_file.write("Calling Start\n")
434 success = self._api.Start()
435 if not success:
436 self.raise_error()
437 else:
438 self.status = QuattroStatus.RUNNING
439 else:
440 raise RuntimeError(
441 f"Current hardware status is {self.status.name}. Hardware must be "
442 f"initialized or stopped prior to starting a measurement"
443 )
444
445 def stop(self):
446 """Stops the data acquisition system
447
448 Raises
449 ------
450 RuntimeError
451 If the data acquisition system is not currently in the running state
452 """
453 if self.status == QuattroStatus.RUNNING:
454 if DEBUG:
455 self.log_file.write("Calling Stop\n")
456 success = self._api.Stop()
457 if not success:
458 self.raise_error()
459 else:
460 self.status = QuattroStatus.STOPPED
461 else:
462 raise RuntimeError(
463 f"Current hardware status is {self.status.name}. Hardware must be running prior "
464 f"to stopping a measurement"
465 )
466
467 def end(self):
468 """Shuts down the data acquisition system"""
469 if self.status in [QuattroStatus.STOPPED, QuattroStatus.INIT]:
470 if DEBUG:
471 self.log_file.write("Calling End\n")
472 success = self._api.End()
473 if not success:
474 self.raise_error()
475 else:
476 self.status = QuattroStatus.IDLE
477
478 @property
479 def serial_number(self):
480 """Gets the serial number of the hardware"""
481 if DEBUG:
482 self.log_file.write("Warning, this gives out the wrong value!\n")
483 if DEBUG:
484 self.log_file.write("Calling SerialNumber\n")
485 return self._api.SerialNumber()
486
487 def set_buffer_size(self, buffer_size):
488 """Sets the buffer size of the hardware"""
489 if DEBUG:
490 self.log_file.write("Calling SetCBufSize\n")
491 success = self._api.SetCBufSize(ctypes.c_int(buffer_size))
492 if not success:
493 self.raise_error()
494
495 def get_buffer_size(self):
496 """Gets the buffer size of the hardware"""
497 if DEBUG:
498 self.log_file.write("Calling GetCBufSize\n")
499 return self._api.GetCBufSize()
500
501 def get_available_input_data_samples(self):
502 """Gets the number of samples available on the data acquisiiton system"""
503 if DEBUG:
504 self.log_file.write("Calling GetAvailableDataLength\n")
505 samples = self._api.GetAvailableDataLength()
506 if DEBUG:
507 self.log_file.write(f"{samples} Samples Available\n")
508 return samples
509
510 # def get_output_samples_on_buffer(self):
511 # self.log_file.write('Calling GetAvailableOutData\n')
512 # samples = self._api.GetAvailableOutData()
513 # self.log_file.write('{:} Output Samples Available\n'.format(samples))
514 # return samples
515
516 def get_total_output_samples_on_buffer(self):
517 """Gets the total number of samples on the output buffer"""
518 if DEBUG:
519 self.log_file.write("Calling GetTotalSamplesInOutputBuffer\n")
520 samples = self._api.GetTotalSamplesInOutputBuffer()
521 if DEBUG:
522 self.log_file.write(f"{samples} Output Samples Available\n")
523 return samples
524
525 def read_input_data(self, num_samples, newest_data=False):
526 """Reads data from the acquisition channels
527
528 Parameters
529 ----------
530 num_samples : int
531 The number of samples to read
532 newest_data : bool, optional
533 Determines whether to read the newest acquired data (True) or the oldest (False), by
534 default False
535
536 Returns
537 -------
538 np.ndarray
539 The read data in a num_channels x num_samples array
540 """
541 read_array = np.zeros(self._num_inputs * num_samples, dtype=np.float32)
542 read_type = ctypes.c_int(0 if newest_data else 1)
543 if DEBUG:
544 self.log_file.write(f"Calling GetData with length {ctypes.c_int(num_samples)}\n")
545 success = self._api.GetData(read_array, read_type, ctypes.c_int(num_samples))
546 if not success:
547 self.raise_error()
548 return read_array.reshape((self._num_inputs, num_samples))
549
550 def write_output_data(self, output_data):
551 """Puts output data to the hardware output buffer
552
553 Parameters
554 ----------
555 output_data : np.ndarray
556 The signals to be written to the hardware in a num_outputs x num_samples array
557
558 Raises
559 ------
560 ValueError
561 If the output data is not shaped correctly
562 """
563 if output_data.ndim != 2:
564 raise ValueError("`output_data` should have 2 dimensions (num_outputs x num_samples)")
565 if output_data.shape[0] != self._num_outputs:
566 raise ValueError(
567 f"`output_data` must have number of rows equal to the number of "
568 f"outputs ({self._num_outputs})"
569 )
570 num_samples = output_data.shape[-1]
571 this_output_data = np.zeros(np.prod(output_data.shape), dtype=np.float32)
572 this_output_data[:] = output_data.flatten().astype(np.float32)
573 # self.log_file.write(this_output_data.shape, num_samples, self._num_outputs)
574 if DEBUG:
575 self.log_file.write(f"Calling PutOutData with length {ctypes.c_int(num_samples)}\n")
576 _ = self._api.PutOutData(this_output_data, ctypes.c_int(num_samples))
577 # if not success:
578 # self.raise_error()
579
580 def __del__(self):
581 """Closes the hardware automatically when the interface is deleted or garbage collected"""
582 if DEBUG:
583 self.log_file.close()