1# -*- coding: utf-8 -*-
2"""
3Hardware definition that allows for the Data Physics DP900 Device to be run
4with Rattlesnake.
5
6Rattlesnake Vibration Control Software
7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
9Government retains certain rights in this software.
10
11This program is free software: you can redistribute it and/or modify
12it under the terms of the GNU General Public License as published by
13the Free Software Foundation, either version 3 of the License, or
14(at your option) any later version.
15
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License for more details.
20
21You should have received a copy of the GNU General Public License
22along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""
24
25import multiprocessing as mp
26import time
27from typing import List
28
29import numpy as np
30
31from .abstract_hardware import HardwareAcquisition, HardwareOutput
32from .utilities import Channel, DataAcquisitionParameters, flush_queue
33
34BUFFER_SIZE_FACTOR = 3
35SLEEP_FACTOR = 10
36
37
38class DataPhysicsDP900Acquisition(HardwareAcquisition):
39 """Class defining the interface between the controller and Data Physics
40 DP900 hardware
41
42 This class defines the interfaces between the controller and the
43 Data Physics hardware that runs their open API. It is run by the Acquisition
44 process, and must define how to get data from the test hardware into the
45 controller."""
46
47 def __init__(self, dll_path: str, queue: mp.queues.Queue):
48 """
49 Initializes the data physics hardware interface.
50
51 Parameters
52 ----------
53 dll_path : str
54 Path to the Dp900Matlab.dll file that defines
55 queue : mp.queues.Queue
56 Multiprocessing queue used to pass output data from the output task
57 to the acquisition task because DP900 runs on a single processor
58
59 Returns
60 -------
61 None.
62
63 """
64 from .data_physics_dp900_interface import DP900, DP900Coupling, DP900Status
65
66 self.DP900Coupling = DP900Coupling
67 self.DP900Status = DP900Status
68 self.output_active = False
69 self.dp900 = DP900(dll_path)
70 self.buffer_size = 2**24
71 self.input_bnc_indices = []
72 self.output_bnc_indices = []
73 self.input_channel_table_indices = []
74 self.output_channel_table_indices = []
75 self.channel_sorting = None
76 self.output_sorting = None
77 self.data_acquisition_parameters = None
78 self.output_data_queue = queue
79 self.time_per_read = None
80 self.last_write_time = None
81
82 def set_up_data_acquisition_parameters_and_channels(
83 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
84 ):
85 """
86 Initialize the hardware and set up channels and sampling properties
87
88 The function must create channels on the hardware corresponding to
89 the channels in the test. It must also set the sampling rates.
90
91 Parameters
92 ----------
93 test_data : DataAcquisitionParameters :
94 A container containing the data acquisition parameters for the
95 controller set by the user.
96 channel_data : List[Channel] :
97 A list of ``Channel`` objects defining the channels in the test
98
99 Returns
100 -------
101 None.
102
103 """
104 # Store data acquisition parameters for later
105 self.data_acquisition_parameters = test_data
106 self.time_per_read = test_data.samples_per_read / test_data.sample_rate
107
108 # End the measurement if necessary
109 if self.dp900.status == self.DP900Status.RUNNING:
110 self.dp900.stop()
111 if (
112 self.dp900.status == self.DP900Status.STOPPED
113 or self.dp900.status == self.DP900Status.INIT
114 ):
115 self.dp900.end()
116 if self.dp900.status == self.DP900Status.DISCONNECTED:
117 self.dp900.connect("")
118
119 # Get the system information
120 system_list = self.dp900.get_system_list(online=True)
121
122 # Get the channel information
123 input_bncs = self.dp900.get_input_channel_bncs()
124 output_bncs = self.dp900.get_output_channel_bncs()
125
126 # Set up defaults that will be overwritten
127 self.output_active = False
128 self.input_bnc_indices = []
129 self.output_bnc_indices = []
130 self.input_channel_table_indices = []
131 self.output_channel_table_indices = []
132
133 input_couplings = []
134 input_ranges = []
135 input_sensitivities = []
136 output_ranges = []
137 output_sensitivities = []
138 all_bncs = []
139 systems = []
140
141 # Set up channel parameters
142 for ct_index, channel in enumerate(channel_data):
143 system = channel.physical_device
144 if system not in system_list:
145 raise ValueError(
146 f"System {system} is not a valid system. Must be one of {system_list}"
147 )
148 if system not in systems:
149 systems.append(system)
150 if len(systems) > 1:
151 raise ValueError(
152 "Multi-chassis tests are not currently supported in Rattlesnake"
153 )
154 # Figure out if the channel is an output channel or just acquisition
155 is_output = not (channel.feedback_device is None) and not (
156 channel.feedback_device.strip() == ""
157 )
158
159 # Get the channel index from the bnc number
160 if is_output:
161 self.output_active = True
162 all_bncs.append(int(channel.physical_channel))
163 bnc_index = np.flatnonzero(output_bncs == int(channel.physical_channel))
164 if len(bnc_index) > 1:
165 raise ValueError(
166 f"More than one matching channel for BNC {channel.physical_channel} "
167 "(how did this happen?)"
168 )
169 if len(bnc_index) < 1:
170 raise ValueError(
171 f"BNC {channel.physical_channel} was not found in the list of output "
172 f"BNCs {output_bncs}. Please run DP900Config to correctly set input "
173 f"and output channels."
174 )
175 bnc_index = bnc_index[0]
176 self.output_bnc_indices.append(bnc_index)
177 self.output_channel_table_indices.append(ct_index)
178 output_ranges.append(float(channel.maximum_value))
179 output_sensitivities.append(1)
180 else:
181 all_bncs.append(int(channel.physical_channel))
182 bnc_index = np.flatnonzero(input_bncs == int(channel.physical_channel))
183 if len(bnc_index) > 1:
184 raise ValueError(
185 f"More than one matching channel for BNC {channel.physical_channel} "
186 f"(how did this happen?)"
187 )
188 if len(bnc_index) < 1:
189 raise ValueError(
190 f"BNC {channel.physical_channel} was not found in the list of input "
191 f"BNCs {input_bncs}. Please run DP900Config to correctly set input "
192 f"and output channels."
193 )
194 bnc_index = bnc_index[0]
195 self.input_bnc_indices.append(bnc_index)
196 self.input_channel_table_indices.append(ct_index)
197 input_ranges.append(float(channel.maximum_value))
198 input_sensitivities.append(float(channel.sensitivity))
199 # Set the values in the arrays appropriately given the channel index
200 if channel.coupling.lower() in ["ac differential", "ac diff", "ac"]:
201 input_couplings.append(self.DP900Coupling.AC_DIFFERENTIAL)
202 elif channel.coupling.lower() in ["dc differential", "dc diff", "dc"]:
203 input_couplings.append(self.DP900Coupling.DC_DIFFERENTIAL)
204 elif channel.coupling.lower() in [
205 "ac single ended",
206 "ac single-ended",
207 "ac single",
208 ]:
209 input_couplings.append(self.DP900Coupling.AC_SINGLE_ENDED)
210 elif channel.coupling.lower() in [
211 "dc single ended",
212 "dc single-ended",
213 "dc single",
214 ]:
215 input_couplings.append(self.DP900Coupling.DC_SINGLE_ENDED)
216 elif channel.coupling.lower() in ["iepe", "icp", "ac icp", "ccld"]:
217 input_couplings.append(self.DP900Coupling.AC_COUPLED_IEPE)
218
219 self.dp900.set_system_list(systems)
220
221 # Set the sample rate
222 self.dp900.set_sample_rate(test_data.sample_rate)
223
224 # Set the buffer size
225 self.dp900.set_buffer_size(self.buffer_size)
226 # print('Buffer Size: {:}'.format(self.buffer_size))
227
228 # Now we need to re-order the items to pass to the parameter setup
229 # functions
230 input_sorting = np.argsort(self.input_bnc_indices)
231 input_channels = np.array(self.input_bnc_indices)[input_sorting] + 1
232 input_couplings = np.array(input_couplings)[input_sorting]
233 input_ranges = np.array(input_ranges)[input_sorting]
234 input_sensitivities = np.array(input_sensitivities)[input_sorting]
235 self.dp900.setup_input_parameters(
236 input_couplings, input_channels, input_sensitivities, input_ranges
237 )
238
239 if self.output_active:
240 self.output_sorting = np.argsort(self.output_bnc_indices)
241 output_channels = np.array(self.output_bnc_indices)[self.output_sorting] + 1
242 output_ranges = np.array(output_ranges)[self.output_sorting]
243 output_sensitivities = np.array(output_sensitivities)[self.output_sorting]
244
245 # Now send the data to the dp900 device
246 self.dp900.setup_output_parameters(output_sensitivities, output_ranges, output_channels)
247
248 # Since the outputs are at the end, we want to adjust the sorting to
249 # put the outputs at the end
250 # all_bncs = np.array(all_bncs)
251 # all_bncs[self.output_channel_table_indices] += 100000000
252
253 # We should then be able to use this channel sorting to unsort the
254 # read data
255 self.channel_sorting = np.argsort(all_bncs)
256
257 self.dp900.set_save_recording(False)
258
259 def start(self):
260 """Method to start acquiring data from the hardware"""
261 self.dp900.init()
262 self.dp900.start()
263
264 def read(self) -> np.ndarray:
265 """Method to read a frame of data from the hardware that returns
266 an appropriately sized np.ndarray"""
267 while (
268 self.dp900.get_available_input_data_samples()
269 < self.data_acquisition_parameters.samples_per_read
270 ):
271 # Check if we need to output anything
272 if self.output_active:
273 self.get_and_write_output_data()
274 # Pause for a bit to allow more samples to accumulate
275 time.sleep(self.time_per_read / SLEEP_FACTOR)
276 # Read the data now that we have enough samples
277 read_data = self.dp900.read_input_data(self.data_acquisition_parameters.samples_per_read)
278 # Now we need to sort the data correctly to give it back to the channel table
279 read_data[self.channel_sorting] = read_data.copy()
280 return read_data
281
282 def read_remaining(self) -> np.ndarray:
283 """Method to read the rest of the data on the acquisition from the hardware
284 that returns an appropriately sized np.ndarray"""
285 # Check if we need to output anything
286 if self.output_active:
287 self.get_and_write_output_data()
288 # Check how many samples are available
289 samples_available = 0
290 # Wait until some arrive
291 while samples_available == 0:
292 samples_available = self.dp900.get_available_input_data_samples()
293 # Pause for a bit to allow more samples to accumulate
294 time.sleep(self.time_per_read / SLEEP_FACTOR)
295 # Read that many data samples and put it to the "read_data" array
296 # Make sure we rearrange the channels correctly per the rattlesnake
297 # channel table using self.input_channel_order
298 read_data = self.dp900.read_input_data(samples_available)
299 read_data[self.channel_sorting] = read_data.copy()
300 return read_data
301
302 def stop(self):
303 """Method to stop the acquisition"""
304 self.dp900.stop()
305 self.dp900.end()
306 flush_queue(self.output_data_queue)
307
308 def close(self):
309 """Method to close down the hardware"""
310 self.dp900.disconnect()
311 flush_queue(self.output_data_queue)
312
313 def get_acquisition_delay(self) -> int:
314 """Get the number of samples between output and acquisition
315
316 This function is designed to handle buffering done in the output
317 hardware, ensuring that all data written to the output is read by the
318 acquisition. If a output hardware has a buffer, there may be a non-
319 negligable delay between when output is written to the device and
320 actually played out from the device."""
321 return BUFFER_SIZE_FACTOR * self.data_acquisition_parameters.samples_per_write
322
323 def get_and_write_output_data(self, block: bool = False):
324 """
325 Checks to see if there is any data on the output queue that needs to be
326 written to the hardware.
327
328 Parameters
329 ----------
330 block : bool, optional
331 If True, this function will wait until the data appears with a timeout
332 of 10 seconds. Otherwise it will simply return if there is no
333 data available. The default is False.
334
335 Raises
336 ------
337 RuntimeError
338 Raised if the timeout occurs while waiting for data while blocking
339
340 Returns
341 -------
342 None.
343
344 """
345 samples_on_buffer = self.dp900.get_total_output_samples_on_buffer()
346 write_threshold = 3 * self.data_acquisition_parameters.samples_per_write
347 # print('{:} Samples on Output Buffer, (<{:} to output more)'.format(
348 # samples_on_buffer,write_threshold))
349 # TODO: Uncomment this
350 if not block and samples_on_buffer >= write_threshold:
351 # print('Too much data on buffer, not putting new data')
352 return
353 try:
354 data = self.output_data_queue.get(block, timeout=10)
355 # print('Got New Data from queue')
356 except mp.queues.Empty as e:
357 # print('Did not get new data from queue')
358 if block:
359 raise RuntimeError(
360 "Did not receive output in a reasonable amount of time, check output "
361 "process and output hardware for issues"
362 ) from e
363 # Otherwise just return because there's no data available
364 return
365 # If we did get output, we need to put it into a numpy array that we can
366 # send to the daq
367 this_write_time = time.time()
368 outputs = data[self.output_sorting]
369 # if self.last_write_time is not None:
370 # print('Time since last write: {:}'.format(this_write_time - self.last_write_time))
371 self.last_write_time = this_write_time
372 # Send the outputs to the daq
373 self.dp900.write_output_data(outputs)
374 return
375
376
377class DataPhysicsDP900Output(HardwareOutput):
378 """Abstract class defining the interface between the controller and output
379
380 This class defines the interfaces between the controller and the
381 output or source portion of the hardware. It is run by the Output
382 process, and must define how to get write data to the hardware from the
383 control system"""
384
385 def __init__(self, queue: mp.queues.Queue):
386 """
387 Initializes the hardware by simply storing the data passing queue
388
389 Parameters
390 ----------
391 queue : mp.queues.Queue
392 Queue used to pass data from output to acquisition
393
394 Returns
395 -------
396 None.
397
398 """
399 self.queue = queue
400
401 def set_up_data_output_parameters_and_channels(
402 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
403 ):
404 """
405 Initialize the hardware and set up sources and sampling properties
406
407 The function must create channels on the hardware corresponding to
408 the sources in the test. It must also set the sampling rates.
409
410 Parameters
411 ----------
412 test_data : DataAcquisitionParameters :
413 A container containing the data acquisition parameters for the
414 controller set by the user.
415 channel_data : List[Channel] :
416 A list of ``Channel`` objects defining the channels in the test
417
418 Returns
419 -------
420 None.
421
422 """
423
424 def start(self):
425 """Method to start outputting data to the hardware"""
426 # TODO: Remove this
427 # self.last_check_time = time.time()
428
429 def write(self, data):
430 """Method to write a np.ndarray with a frame of data to the hardware"""
431 self.queue.put(data)
432
433 def stop(self):
434 """Method to stop the output"""
435 flush_queue(self.queue)
436
437 def close(self):
438 """Method to close down the hardware"""
439 flush_queue(self.queue)
440
441 def ready_for_new_output(self) -> bool:
442 """Method that returns true if the hardware should accept a new signal
443
444 Returns ``True`` if the data-passing queue is empty."""
445 return self.queue.empty()