1# -*- coding: utf-8 -*-
2"""
3Hardware definition that allows for the Data Physics Quattro Device to be run
4with Rattlesnake.
5
6Rattlesnake Vibration Control Software
7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
9Government retains certain rights in this software.
10
11This program is free software: you can redistribute it and/or modify
12it under the terms of the GNU General Public License as published by
13the Free Software Foundation, either version 3 of the License, or
14(at your option) any later version.
15
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License for more details.
20
21You should have received a copy of the GNU General Public License
22along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""
24
25import multiprocessing as mp
26import time
27from typing import List
28
29import numpy as np
30
31from .abstract_hardware import HardwareAcquisition, HardwareOutput
32from .data_physics_interface import DPQuattro, QuattroCoupling, QuattroStatus
33from .utilities import Channel, DataAcquisitionParameters
34
35BUFFER_SIZE_FACTOR = 3
36SLEEP_FACTOR = 10
37
38
39class DataPhysicsAcquisition(HardwareAcquisition):
40 """Class defining the interface between the controller and Data Physics
41 hardware
42
43 This class defines the interfaces between the controller and the
44 Data Physics hardware that runs their open API. It is run by the Acquisition
45 process, and must define how to get data from the test hardware into the
46 controller."""
47
48 def __init__(self, dll_path: str, queue: mp.queues.Queue):
49 """
50 Initializes the data physics hardware interface.
51
52 Parameters
53 ----------
54 dll_path : str
55 Path to the DpQuattro.dll file that defines
56 queue : mp.queues.Queue
57 Multiprocessing queue used to pass output data from the output task
58 to the acquisition task because Quattro runs on a single processor
59
60 Returns
61 -------
62 None.
63
64 """
65 self.quattro = DPQuattro(dll_path)
66 self.buffer_size = 2**24
67 self.input_channel_order = []
68 self.input_couplings = [QuattroCoupling.DC_DIFFERENTIAL] * 4
69 self.input_ranges = [10.0] * 4
70 self.input_sensitivities = [1.0] * 4
71 self.output_channel_order = []
72 self.output_ranges = [10.0] * 2
73 self.output_sensitivities = [1.0] * 2
74 self.data_acquisition_parameters = None
75 self.output_data_queue = queue
76 self.read_data = None
77 self.time_per_read = None
78
79 def set_up_data_acquisition_parameters_and_channels(
80 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
81 ):
82 """
83 Initialize the hardware and set up channels and sampling properties
84
85 The function must create channels on the hardware corresponding to
86 the channels in the test. It must also set the sampling rates.
87
88 Parameters
89 ----------
90 test_data : DataAcquisitionParameters :
91 A container containing the data acquisition parameters for the
92 controller set by the user.
93 channel_data : List[Channel] :
94 A list of ``Channel`` objects defining the channels in the test
95
96 Returns
97 -------
98 None.
99
100 """
101 # Store data acquisition parameters for later
102 self.data_acquisition_parameters = test_data
103 self.time_per_read = test_data.samples_per_read / test_data.sample_rate
104
105 # End the measurement if necessary
106 if self.quattro.status == QuattroStatus.RUNNING:
107 self.quattro.stop()
108 if (
109 self.quattro.status == QuattroStatus.STOPPED
110 or self.quattro.status == QuattroStatus.INIT
111 ):
112 self.quattro.end()
113 if self.quattro.status == QuattroStatus.DISCONNECTED:
114 self.quattro.connect()
115
116 # Set up defaults that will be overwritten
117 self.input_channel_order = []
118 self.input_couplings = [QuattroCoupling.DC_DIFFERENTIAL] * 4
119 self.input_ranges = [10.0] * 4
120 self.input_sensitivities = [1.0] * 4
121 self.output_channel_order = []
122 self.output_ranges = [10.0] * 2
123 self.output_sensitivities = [1.0] * 2
124
125 # Set the sample rate
126 self.quattro.set_sample_rate(test_data.sample_rate)
127
128 # Set the buffer size ensuring that it is more that 4096
129 # self.buffer_size = (BUFFER_SIZE_FACTOR+1)*max(
130 # test_data.samples_per_write,test_data.samples_per_read)
131 # if self.buffer_size < 8192:
132 # self.buffer_size = 8192
133 self.quattro.set_buffer_size(self.buffer_size)
134 # print('Buffer Size: {:}'.format(self.buffer_size))
135
136 # Set up channel parameters
137 for channel in channel_data:
138 # Figure out if the channel is an output channel or just acquisition
139 is_output = not (channel.feedback_device is None) and not (
140 channel.feedback_device.strip() == ""
141 )
142
143 # Get the channel index from physical device
144 channel_index = int(channel.physical_channel) - 1
145
146 # Track the channel order so we can rearrange measurements upon read
147 self.input_channel_order.append(channel_index)
148
149 # Set the values in the arrays appropriately given the channel index
150 if channel.coupling.lower() in ["ac differential", "ac diff", "ac"]:
151 self.input_couplings[channel_index] = QuattroCoupling.AC_DIFFERENTIAL
152 elif channel.coupling.lower() in ["dc differential", "dc diff", "dc"]:
153 self.input_couplings[channel_index] = QuattroCoupling.DC_DIFFERENTIAL
154 elif channel.coupling.lower() in [
155 "ac single ended",
156 "ac single-ended",
157 "ac single",
158 ]:
159 self.input_couplings[channel_index] = QuattroCoupling.AC_SINGLE_ENDED
160 elif channel.coupling.lower() in [
161 "dc single ended",
162 "dc single-ended",
163 "dc single",
164 ]:
165 self.input_couplings[channel_index] = QuattroCoupling.DC_SINGLE_ENDED
166 elif channel.coupling.lower() in ["iepe", "icp", "ac icp", "ccld"]:
167 self.input_couplings[channel_index] = QuattroCoupling.AC_COUPLED_IEPE
168 self.input_sensitivities[channel_index] = (
169 1.0 if is_output else float(channel.sensitivity) / 1000
170 )
171 self.input_ranges[channel_index] = 10.0 if is_output else float(channel.maximum_value)
172
173 # Set up the output
174 if is_output:
175 channel_index = int(channel.feedback_channel) - 1
176 self.output_channel_order.append(channel_index)
177 self.output_ranges[channel_index] = float(channel.maximum_value)
178
179 # Now send the data to the quattro device
180 self.quattro.setup_input_parameters(
181 self.input_couplings, self.input_sensitivities, self.input_ranges
182 )
183 self.quattro.setup_output_parameters(self.output_sensitivities, self.output_ranges)
184
185 def start(self):
186 """Method to start acquiring data from the hardware"""
187 self.read_data = []
188 self.quattro.initialize()
189 self.quattro.start()
190
191 def read(self) -> np.ndarray:
192 """Method to read a frame of data from the hardware that returns
193 an appropriately sized np.ndarray"""
194 while (
195 np.sum([data.shape[-1] for data in self.read_data])
196 < self.data_acquisition_parameters.samples_per_read
197 ):
198 # Check if we need to output anything
199 self.get_and_write_output_data()
200 # Check how many samples are available
201 samples_available = self.quattro.get_available_input_data_samples()
202 # print('{:} Samples Available to Read'.format(samples_available))
203 # Read that many data samples and put it to the "read_data" array
204 # Make sure we rearrange the channels correctly per the rattlesnake
205 # channel table using self.input_channel_order
206 if samples_available > 0:
207 self.read_data.append(
208 self.quattro.read_input_data(samples_available)[self.input_channel_order]
209 )
210 # Pause for a bit to allow more samples to accumulate
211 time.sleep(self.time_per_read / SLEEP_FACTOR)
212 # After we finish getting enough samples for a read, we can split the
213 # read data into the number of samples requested, and put the remainder
214 # as the start of the next self.read_data list.
215 read_data = np.concatenate(self.read_data, axis=-1)
216 self.read_data = [read_data[:, self.data_acquisition_parameters.samples_per_read :]]
217 return read_data[:, : self.data_acquisition_parameters.samples_per_read]
218
219 def read_remaining(self) -> np.ndarray:
220 """Method to read the rest of the data on the acquisition from the hardware
221 that returns an appropriately sized np.ndarray"""
222 # Check if we need to output anything
223 self.get_and_write_output_data()
224 # Check how many samples are available
225 samples_available = 0
226 # Wait until some arrive
227 while samples_available == 0:
228 samples_available = self.quattro.get_available_input_data_samples()
229 # Read that many data samples and put it to the "read_data" array
230 # Make sure we rearrange the channels correctly per the rattlesnake
231 # channel table using self.input_channel_order
232 self.read_data.append(
233 self.quattro.read_input_data(samples_available)[self.input_channel_order]
234 )
235 # Pause for a bit to allow more samples to accumulate
236 time.sleep(self.time_per_read / SLEEP_FACTOR)
237 # After we finish getting enough samples for a read, we can split the
238 # read data into the number of samples requested, and put the remainder
239 # as the start of the next self.read_data list.
240 read_data = np.concatenate(self.read_data, axis=-1)
241 self.read_data = [read_data[:, self.data_acquisition_parameters.samples_per_read :]]
242 read_data = np.concatenate(self.read_data, axis=-1)
243 return read_data
244
245 def stop(self):
246 """Method to stop the acquisition"""
247 self.quattro.stop()
248 self.quattro.end()
249
250 def close(self):
251 """Method to close down the hardware"""
252 self.quattro.disconnect()
253
254 def get_acquisition_delay(self) -> int:
255 """Get the number of samples between output and acquisition
256
257 This function is designed to handle buffering done in the output
258 hardware, ensuring that all data written to the output is read by the
259 acquisition. If a output hardware has a buffer, there may be a non-
260 negligable delay between when output is written to the device and
261 actually played out from the device."""
262 return BUFFER_SIZE_FACTOR * self.data_acquisition_parameters.samples_per_write
263
264 def get_and_write_output_data(self, block: bool = False):
265 """
266 Checks to see if there is any data on the output queue that needs to be
267 written to the hardware.
268
269 Parameters
270 ----------
271 block : bool, optional
272 If True, this function will wait until the data appears with a timeout
273 of 10 seconds. Otherwise it will simply return if there is no
274 data available. The default is False.
275
276 Raises
277 ------
278 RuntimeError
279 Raised if the timeout occurs while waiting for data while blocking
280
281 Returns
282 -------
283 None.
284
285 """
286 samples_on_buffer = self.quattro.get_total_output_samples_on_buffer()
287 # print(
288 # f"{samples_on_buffer} Samples on Output Buffer, "
289 # f"(<{self.data_acquisition_parameters.samples_per_write} to output more)"
290 # )
291 if not block and samples_on_buffer >= self.data_acquisition_parameters.samples_per_write:
292 # print('Too much data on buffer, not putting new data')
293 return
294 try:
295 data = self.output_data_queue.get(block, timeout=10)
296 # print('Got New Data from queue')
297 except mp.queues.Empty as e:
298 # print('Did not get new data from queue')
299 if block:
300 raise RuntimeError(
301 "Did not receive output in a reasonable amount of time, check "
302 "output process and output hardware for issues"
303 ) from e
304 # Otherwise just return because there's no data available
305 return
306 # If we did get output, we need to put it into a numpy array that we can
307 # send to the daq
308 outputs = np.zeros((2, self.data_acquisition_parameters.samples_per_write))
309 outputs[self.output_channel_order] = data
310 # Send the outputs to the daq
311 self.quattro.write_output_data(outputs)
312 return
313
314
315class DataPhysicsOutput(HardwareOutput):
316 """Abstract class defining the interface between the controller and output
317
318 This class defines the interfaces between the controller and the
319 output or source portion of the hardware. It is run by the Output
320 process, and must define how to get write data to the hardware from the
321 control system"""
322
323 def __init__(self, queue: mp.queues.Queue):
324 """
325 Initializes the hardware by simply storing the data passing queue
326
327 Parameters
328 ----------
329 queue : mp.queues.Queue
330 Queue used to pass data from output to acquisition
331
332 Returns
333 -------
334 None.
335
336 """
337 self.queue = queue
338
339 def set_up_data_output_parameters_and_channels(
340 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
341 ):
342 """
343 Initialize the hardware and set up sources and sampling properties
344
345 The function must create channels on the hardware corresponding to
346 the sources in the test. It must also set the sampling rates.
347
348 Parameters
349 ----------
350 test_data : DataAcquisitionParameters :
351 A container containing the data acquisition parameters for the
352 controller set by the user.
353 channel_data : List[Channel] :
354 A list of ``Channel`` objects defining the channels in the test
355
356 Returns
357 -------
358 None.
359
360 """
361
362 def start(self):
363 """Method to start outputting data to the hardware"""
364
365 def write(self, data):
366 """Method to write a np.ndarray with a frame of data to the hardware"""
367 self.queue.put(data)
368
369 def stop(self):
370 """Method to stop the output"""
371
372 def close(self):
373 """Method to close down the hardware"""
374
375 def ready_for_new_output(self) -> bool:
376 """Method that returns true if the hardware should accept a new signal
377
378 Returns ``True`` if the data-passing queue is empty."""
379 return self.queue.empty()