1# -*- coding: utf-8 -*-
2"""
3This file defines an interface to the NIDAQmx hardware, and is used to set up
4and interact with read and write tasks on the hardware.
5
6Rattlesnake Vibration Control Software
7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
9Government retains certain rights in this software.
10
11This program is free software: you can redistribute it and/or modify
12it under the terms of the GNU General Public License as published by
13the Free Software Foundation, either version 3 of the License, or
14(at your option) any later version.
15
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License for more details.
20
21You should have received a copy of the GNU General Public License
22along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""
24
25import time
26from typing import List
27
28import nidaqmx as ni
29import nidaqmx.constants as nic
30import nidaqmx.stream_readers as ni_read
31import nidaqmx.stream_writers as ni_write
32import numpy as np
33
34from .abstract_hardware import HardwareAcquisition, HardwareOutput
35from .utilities import Channel, DataAcquisitionParameters
36
37BUFFER_SIZE_FACTOR = 3
38
39
40class NIDAQmxAcquisition(HardwareAcquisition):
41 """Class defining the interface between the controller and NI hardware
42
43 This class defines the interfaces between the controller and National
44 Instruments Hardware that runs the NI-DAQmx library. It is run by the
45 Acquisition process, and must define how to get data from the test
46 hardware into the controller."""
47
48 def __init__(self, task_trigger, output_trigger_generator):
49 """
50 Constructs the NIDAQmx Acquisition class and specifies values to null.
51 """
52 self.tasks = None
53 self.channel_task_map = None
54 self.read_datas = None
55 self.read_data = None
56 self.readers = None
57 self.acquisition_delay = None
58 self.read_triggers = None
59 self.task_trigger = task_trigger
60 self.output_trigger_generator = output_trigger_generator
61 self.has_printed_read_statement = False
62 self.trigger_output_task = None
63 self.test_data = None
64
65 def set_up_data_acquisition_parameters_and_channels(
66 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
67 ):
68 """
69 Initialize the hardware and set up channels and sampling properties
70
71 The function must create channels on the hardware corresponding to
72 the channels in the test. It must also set the sampling rates.
73
74 Parameters
75 ----------
76 test_data : DataAcquisitionParameters :
77 A container containing the data acquisition parameters for the
78 controller set by the user.
79 channel_data : List[Channel] :
80 A list of ``Channel`` objects defining the channels in the test
81
82 Returns
83 -------
84 None.
85
86 """
87 self.create_response_channels(channel_data)
88 self.set_parameters(test_data)
89 self.test_data = test_data
90
91 def create_response_channels(self, channel_data: List[Channel]):
92 """Method to set up response channels
93
94 This function takes channels from the supplied list of channels and
95 creates analog inputs on the hardware.
96
97 Parameters
98 ----------
99 channel_data : List[Channel] :
100 A list of ``Channel`` objects defining the channels in the test
101
102 """
103 physical_devices = list(set(channel.physical_device for channel in channel_data))
104 device_tasks = {}
105 extra_task_index = 1
106 task_names = set([])
107 for device in physical_devices:
108 if self.task_trigger == 0:
109 chassis_number = "all"
110 else:
111 d = ni.system.device.Device(device)
112 try:
113 chassis_number = f"P{d.pxi_chassis_num}"
114 except ni.DaqError:
115 try:
116 chassis_number = f"C{d.compact_daq_chassis_device.name}"
117 except ni.DaqError:
118 chassis_number = f"E{extra_task_index}"
119 extra_task_index += 1
120 device_tasks[device] = chassis_number
121 task_names.add(chassis_number)
122 task_names = list(task_names)
123 print(f"Input Tasks: {task_names}")
124 print(" P: PXI, C: CDAQ, E: Other, All: All devices on one task")
125 self.tasks = [ni.Task() for name in task_names]
126 self.read_triggers = [None for name in task_names]
127 self.channel_task_map = [[] for name in task_names]
128 index = 0
129 for channel in channel_data:
130 task_name = device_tasks[channel.physical_device]
131 task_index = task_names.index(task_name)
132 self.channel_task_map[task_index].append(index)
133 if self.task_trigger != 0:
134 if self.read_triggers[task_index] is None:
135 try:
136 chassis_device = ni.system.device.Device(
137 channel.physical_device
138 ).compact_daq_chassis_device
139 pfi_terminals = [
140 trigger for trigger in chassis_device.terminals if "/PFI0" in trigger
141 ]
142 print(f"PFI Terminals on CDAQ Device:\n{pfi_terminals}")
143 self.read_triggers[task_index] = pfi_terminals[0]
144 except ni.DaqError:
145 self.read_triggers[task_index] = (
146 "/" + channel.physical_device.strip() + "/PFI0"
147 )
148 index += 1
149 self._create_channel(channel, task_index)
150 print(f"Input Mapping: {self.channel_task_map}")
151
152 def set_parameters(self, test_data: DataAcquisitionParameters):
153 """Method to set up sampling rate and other test parameters
154
155 This function sets the clock configuration on the NIDAQmx hardware.
156
157 Parameters
158 ----------
159 test_data : DataAcquisitionParameters :
160 A container containing the data acquisition parameters for the
161 controller set by the user.
162
163 """
164 self.readers = []
165 self.read_datas = []
166 self.acquisition_delay = BUFFER_SIZE_FACTOR * test_data.samples_per_write
167 self.read_data = np.zeros((len(test_data.channel_list), test_data.samples_per_read))
168 for i, (task, trigger) in enumerate(zip(self.tasks, self.read_triggers)):
169 task.timing.cfg_samp_clk_timing(
170 test_data.sample_rate,
171 sample_mode=nic.AcquisitionType.CONTINUOUS,
172 samps_per_chan=test_data.samples_per_read,
173 )
174 task.in_stream.wait_mode = nic.WaitMode.POLL
175 if trigger is not None:
176 task.triggers.start_trigger.dig_edge_src = trigger
177 task.triggers.start_trigger.dig_edge_edge = ni.constants.Edge.RISING
178 task.triggers.start_trigger.trig_type = ni.constants.TriggerType.DIGITAL_EDGE
179 print(f"Acquisition Task {i} Trigger {trigger}")
180 self.readers.append(ni_read.AnalogMultiChannelReader(task.in_stream))
181 self.read_datas.append(np.zeros((len(task.ai_channels), test_data.samples_per_read)))
182
183 print(f"Acquisition Task {i} Actual Sample Rate: {task.timing.samp_clk_rate}")
184
185 def start(self):
186 """Start acquiring data"""
187 for task in self.tasks:
188 task.start()
189 if self.task_trigger != 0:
190 print("Input is Running, waiting for PFI Trigger")
191 self.has_printed_read_statement = False
192 # Now we're going to output the signal
193 if self.task_trigger == 2:
194 print("Creating Triggering Task")
195 self.trigger_output_task = ni.Task()
196 self.trigger_output_task.ao_channels.add_ao_voltage_chan(
197 self.output_trigger_generator, min_val=-3.5, max_val=3.5
198 )
199 self.trigger_output_task.timing.cfg_samp_clk_timing(
200 self.test_data.sample_rate,
201 sample_mode=nic.AcquisitionType.CONTINUOUS,
202 samps_per_chan=self.test_data.samples_per_write,
203 )
204 self.trigger_output_task.out_stream.regen_mode = nic.RegenerationMode.ALLOW_REGENERATION
205 writer = ni_write.AnalogMultiChannelWriter(
206 self.trigger_output_task.out_stream, auto_start=False
207 )
208 writer.write_many_sample(3 * np.ones((1, 100)))
209 print("Starting Triggering Task")
210 self.trigger_output_task.start()
211 writer.write_many_sample(np.zeros((1, 100)))
212
213 def get_acquisition_delay(self) -> int:
214 """
215 Get the number of samples between output and acquisition.
216
217 This function returns the number of samples that need to be read to
218 ensure that the last output is read by the acquisition. If there is
219 buffering in the output, this delay should be adjusted accordingly.
220
221 Returns
222 -------
223 int
224 Number of samples between when a dataset is written to the output
225 and when it has finished playing.
226
227 """
228 return self.acquisition_delay
229
230 def read(self):
231 """Method to read a frame of data from the controller
232
233 Returns
234 -------
235 read_data :
236 2D Data read from the controller with shape ``n_channels`` x
237 ``n_samples``
238 """
239 for reader, read_data, channel_mapping in zip(
240 self.readers, self.read_datas, self.channel_task_map
241 ):
242 reader.read_many_sample(
243 read_data,
244 number_of_samples_per_channel=read_data.shape[-1],
245 timeout=nic.WAIT_INFINITELY,
246 )
247 self.read_data[channel_mapping] = read_data
248 if not self.has_printed_read_statement:
249 print("Input Read Data")
250 self.has_printed_read_statement = True
251 return self.read_data
252
253 def read_remaining(self):
254 """Method to read the rest of the data on the acquisition
255
256 Returns
257 -------
258 read_data :
259 2D Data read from the controller with shape ``n_channels`` x
260 ``n_samples``
261 """
262 remaining_data = []
263 for task, reader, channel_mapping in zip(self.tasks, self.readers, self.channel_task_map):
264 read_data = np.zeros((len(task.ai_channels), task.in_stream.avail_samp_per_chan))
265 reader.read_many_sample(
266 read_data,
267 number_of_samples_per_channel=read_data.shape[-1],
268 timeout=nic.WAIT_INFINITELY,
269 )
270 remaining_data.append(read_data)
271 max_samples = max([data.shape[-1] for data in remaining_data])
272 read_data = np.zeros((self.read_data.shape[0], max_samples))
273 for data, channel_mapping in zip(remaining_data, self.channel_task_map):
274 read_data[channel_mapping, : data.shape[-1]] = data
275 if not self.has_printed_read_statement:
276 print("Input Read Data")
277 self.has_printed_read_statement = True
278 return read_data
279
280 def stop(self):
281 """Method to stop the acquisition"""
282 print("Stopping Input Tasks")
283 for task in self.tasks:
284 task.stop()
285 print("Input Tasks Stopped")
286 if self.task_trigger == 2:
287 print("Stopping Triggering Task")
288 self.trigger_output_task.stop()
289 print("Closing Triggering Task")
290 self.trigger_output_task.close()
291
292 def close(self):
293 """Method to close down the hardware"""
294 print("Closing Input Tasks")
295 if self.tasks is not None:
296 for task in self.tasks:
297 task.close()
298 print("Input Tasks Closed")
299
300 def _create_channel(self, channel_data: Channel, task_index: int):
301 """Helper function to construct a channel on the hardware.
302
303 Parameters
304 ----------
305 channel_data: Channel :
306 Channel object specifying the channel parameters.
307 task_index: int :
308 Index of the task to which the channel should be created
309
310 Returns
311 -------
312 channel :
313 A reference to the NIDAQmx channel created by the function
314 """
315 physical_channel = channel_data.physical_device + "/" + channel_data.physical_channel
316 # Parse the channel structure to make sure datatypes are correct
317 # Sensitivity
318 try:
319 sensitivity = float(channel_data.sensitivity)
320 except (TypeError, ValueError) as e:
321 raise ValueError(f"{channel_data.sensitivity} not a valid sensitivity") from e
322 # Minimum Value
323 try:
324 minimum_value = float(channel_data.minimum_value)
325 except (TypeError, ValueError) as e:
326 raise ValueError(f"{channel_data.minimum_value} not a valid minimum value") from e
327 # Maximum Value
328 try:
329 maximum_value = float(channel_data.maximum_value)
330 except (TypeError, ValueError) as e:
331 raise ValueError(f"{channel_data.maximum_value} not a valid maximum value") from e
332 # Channel Type and Units
333 if channel_data.channel_type.lower() in [
334 "accelerometer",
335 "acceleration",
336 "accel",
337 ]:
338 channel_type = nic.UsageTypeAI.ACCELERATION_ACCELEROMETER_CURRENT_INPUT
339 if channel_data.unit.lower() in ["g", "gs"]:
340 unit = nic.AccelUnits.G
341 else:
342 raise ValueError(f"Accelerometer units must be in G, not {channel_data.unit}")
343 elif channel_data.channel_type.lower() == "force":
344 channel_type = nic.UsageTypeAI.FORCE_IEPE_SENSOR
345 if channel_data.unit.lower() in [
346 "lb",
347 "pound",
348 "pounds",
349 "lbf",
350 "lbs",
351 "lbfs",
352 ]:
353 unit = nic.ForceUnits.POUNDS
354 elif channel_data.unit.lower() in ["n", "newton", "newtons", "ns"]:
355 unit = nic.ForceUnits.NEWTONS
356 else:
357 raise ValueError(f"Unrecognized Force Unit {channel_data.unit}")
358 elif channel_data.channel_type.lower() in ["voltage", "volt"]:
359 channel_type = nic.UsageTypeAI.VOLTAGE
360 unit = None
361 else:
362 raise ValueError(
363 f"{channel_type} not a valid channel type. "
364 'Must be one of ["acceleration","accelerometer","accel","force","voltage","volt"]'
365 )
366 # Excitation Source
367 if channel_data.excitation_source.lower() == "internal":
368 excitation_source = nic.ExcitationSource.INTERNAL
369 try:
370 excitation = float(channel_data.excitation)
371 except (TypeError, ValueError) as e:
372 raise ValueError(f"{channel_data.excitation} not a valid excitation") from e
373 elif channel_data.excitation_source.lower() == "none":
374 excitation_source = nic.ExcitationSource.NONE
375 excitation = 0
376 else:
377 raise ValueError(
378 f"{channel_data.excitation_source} not a valid excitation source. "
379 'Must be one of ["internal","none"]'
380 )
381 # Now go and create the channel
382 if channel_type != nic.UsageTypeAI.VOLTAGE:
383 min_val = minimum_value * 1000 / sensitivity
384 max_val = maximum_value * 1000 / sensitivity
385 else:
386 min_val = minimum_value
387 max_val = maximum_value
388 if channel_type == nic.UsageTypeAI.ACCELERATION_ACCELEROMETER_CURRENT_INPUT:
389 try:
390 channel = self.tasks[task_index].ai_channels.add_ai_accel_chan(
391 physical_channel,
392 min_val=min_val,
393 max_val=max_val,
394 units=unit,
395 sensitivity=sensitivity,
396 sensitivity_units=nic.AccelSensitivityUnits.M_VOLTS_PER_G,
397 current_excit_source=excitation_source,
398 current_excit_val=excitation,
399 )
400 except AttributeError:
401 channel = self.tasks[task_index].ai_channels.add_ai_accel_chan(
402 physical_channel,
403 min_val=min_val,
404 max_val=max_val,
405 units=unit,
406 sensitivity=sensitivity,
407 sensitivity_units=nic.AccelSensitivityUnits.MILLIVOLTS_PER_G,
408 current_excit_source=excitation_source,
409 current_excit_val=excitation,
410 )
411 elif channel_type == nic.UsageTypeAI.FORCE_IEPE_SENSOR:
412 try:
413 channel = self.tasks[task_index].ai_channels.add_ai_force_iepe_chan(
414 physical_channel,
415 min_val=min_val,
416 max_val=max_val,
417 units=unit,
418 sensitivity=sensitivity,
419 sensitivity_units=(
420 nic.ForceIEPESensorSensitivityUnits.M_VOLTS_PER_NEWTON
421 if unit == nic.ForceUnits.NEWTONS
422 else nic.ForceIEPESensorSensitivityUnits.M_VOLTS_PER_POUND
423 ),
424 current_excit_source=excitation_source,
425 current_excit_val=excitation,
426 )
427 except AttributeError:
428 channel = self.tasks[task_index].ai_channels.add_ai_force_iepe_chan(
429 physical_channel,
430 min_val=min_val,
431 max_val=max_val,
432 units=unit,
433 sensitivity=sensitivity,
434 sensitivity_units=(
435 nic.ForceIEPESensorSensitivityUnits.MILLIVOLTS_PER_NEWTON
436 if unit == nic.ForceUnits.NEWTONS
437 else nic.ForceIEPESensorSensitivityUnits.MILLIVOLTS_PER_POUND
438 ),
439 current_excit_source=excitation_source,
440 current_excit_val=excitation,
441 )
442 elif channel_type == nic.UsageTypeAI.VOLTAGE:
443 channel = self.tasks[task_index].ai_channels.add_ai_voltage_chan(
444 physical_channel,
445 min_val=min_val,
446 max_val=max_val,
447 units=nic.VoltageUnits.VOLTS,
448 )
449 else:
450 raise ValueError(f"Channel Type Not Implemented: {channel_type}")
451 return channel
452
453
454class NIDAQmxOutput(HardwareOutput):
455 """Class defining the interface between the controller and NI hardware
456
457 This class defines the interfaces between the controller and National
458 Instruments Hardware that runs the NI-DAQmx library. It is run by the
459 Output process, and must define how to get data from the controller to the
460 output hardware."""
461
462 def __init__(self, task_trigger, output_trigger_generator):
463 """
464 Constructs the NIDAQmx Output class and initializes values to null.
465 """
466 self.tasks = None
467 self.channel_task_map = None
468 self.writers = None
469 self.write_triggers = None
470 self.signal_samples = None
471 self.sample_rate = None
472 self.buffer_size_factor = BUFFER_SIZE_FACTOR
473 self.task_trigger = task_trigger
474 self.output_trigger_generator = output_trigger_generator
475 self.has_printed_write_statement = False
476
477 def set_up_data_output_parameters_and_channels(
478 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
479 ):
480 """
481 Initialize the hardware and set up sources and sampling properties
482
483 The function must create channels on the hardware corresponding to
484 the sources in the test. It must also set the sampling rates.
485
486 Parameters
487 ----------
488 test_data : DataAcquisitionParameters :
489 A container containing the data acquisition parameters for the
490 controller set by the user.
491 channel_data : List[Channel] :
492 A list of ``Channel`` objects defining the channels in the test
493
494 Returns
495 -------
496 None.
497
498 """
499 self.create_sources(channel_data)
500 self.set_parameters(test_data)
501
502 def create_sources(self, channel_data: List[Channel]):
503 """Method to set up excitation sources
504
505 This function takes channels from the supplied list of channels and
506 creates analog outputs on the hardware.
507
508 Parameters
509 ----------
510 channel_data : List[Channel] :
511 A list of ``Channel`` objects defining the channels in the test
512 """
513 # Get pairs of product_types and physical device names
514 task_names = set()
515 extra_task_index = 1
516 for channel in channel_data:
517 if not (channel.feedback_device is None) and not (
518 channel.feedback_device.strip() == ""
519 ):
520 device_name = channel.feedback_device
521 device = ni.system.device.Device(device_name)
522 try:
523 chassis_number = f"P{device.pxi_chassis_num}"
524 except ni.DaqError:
525 try:
526 chassis_number = f"C{device.compact_daq_chassis_device.name}"
527 except ni.DaqError:
528 chassis_number = f"E{extra_task_index}"
529 extra_task_index += 1
530 product_name = device.product_type
531 task_names.add((chassis_number, product_name))
532 task_names = list(task_names)
533 print(f"Output Tasks: {task_names}")
534 print(" P: PXI, C: CDAQ, E: Other, All: All devices on one task")
535 # Check if it's a CDAQ device
536
537 self.tasks = [ni.Task() for name in task_names]
538 self.write_triggers = [None for name in task_names]
539 self.channel_task_map = [[] for name in task_names]
540 index = 0
541 extra_task_index = 1
542 for channel in channel_data:
543 if not (channel.feedback_device is None) and not (
544 channel.feedback_device.strip() == ""
545 ):
546 device_name = channel.feedback_device
547 device = ni.system.device.Device(device_name)
548 try:
549 chassis_number = f"P{device.pxi_chassis_num}"
550 except ni.DaqError:
551 try:
552 chassis_number = f"C{device.compact_daq_chassis_device.name}"
553 except ni.DaqError:
554 chassis_number = f"E{extra_task_index}"
555 extra_task_index += 1
556 product_name = device.product_type
557 task_index = task_names.index((chassis_number, product_name))
558 self.channel_task_map[task_index].append(index)
559 index += 1
560 if self.write_triggers[task_index] is None:
561 if self.task_trigger == 0:
562 try:
563 chassis_device = ni.system.device.Device(
564 channel.feedback_device
565 ).compact_daq_chassis_device
566 self.write_triggers[task_index] = [
567 trigger
568 for trigger in chassis_device.terminals
569 if "ai/StartTrigger" in trigger
570 ][0]
571 except ni.DaqError:
572 self.write_triggers[task_index] = (
573 "/" + channel_data[0].physical_device.strip() + "/ai/StartTrigger"
574 )
575 else:
576 try:
577 chassis_device = ni.system.device.Device(
578 channel.feedback_device
579 ).compact_daq_chassis_device
580 self.write_triggers[task_index] = [
581 trigger for trigger in chassis_device.terminals if "PFI0" in trigger
582 ][0]
583 except ni.DaqError:
584 self.write_triggers[task_index] = (
585 "/" + channel.feedback_device.strip() + "/PFI0"
586 )
587 self._create_channel(channel, task_index)
588 print(f"Output Mapping: {self.channel_task_map}")
589
590 def set_parameters(self, test_data: DataAcquisitionParameters):
591 """Method to set up sampling rate and other test parameters
592
593 This function sets the clock configuration on the NIDAQmx hardware.
594
595 Parameters
596 ----------
597 test_data : DataAcquisitionParameters :
598 A container containing the data acquisition parameters for the
599 controller set by the user.
600 """
601 self.signal_samples = test_data.samples_per_write
602 self.sample_rate = test_data.sample_rate
603 self.writers = []
604 for i, (task, trigger) in enumerate(zip(self.tasks, self.write_triggers)):
605 task.timing.cfg_samp_clk_timing(
606 test_data.sample_rate,
607 sample_mode=nic.AcquisitionType.CONTINUOUS,
608 samps_per_chan=test_data.samples_per_write,
609 )
610 task.out_stream.regen_mode = nic.RegenerationMode.DONT_ALLOW_REGENERATION
611 # task.out_stream.relative_to = nic.WriteRelativeTo.CURRENT_WRITE_POSITION
612 task.triggers.start_trigger.dig_edge_src = trigger
613 task.triggers.start_trigger.dig_edge_edge = ni.constants.Edge.RISING
614 task.triggers.start_trigger.trig_type = ni.constants.TriggerType.DIGITAL_EDGE
615 print(f"Output Task {i} Trigger {trigger}")
616 task.out_stream.output_buf_size = self.buffer_size_factor * test_data.samples_per_write
617 self.writers.append(
618 ni_write.AnalogMultiChannelWriter(task.out_stream, auto_start=False)
619 )
620 print(f"Output Task {i} Actual Sample Rate: {task.timing.samp_clk_rate}")
621
622 def start(self):
623 """Method to start acquiring data"""
624 for task in self.tasks:
625 task.start()
626 if self.task_trigger != 0:
627 print("Output is Running, waiting for PFI Trigger")
628
629 def write(self, data):
630 """Method to write a frame of data
631
632 Parameters
633 ----------
634 data : np.ndarray
635 2D Data to be written to the controller with shape ``n_sources`` x
636 ``n_samples``
637
638 """
639 for i, writer in enumerate(self.writers):
640 writer.write_many_sample(data[self.channel_task_map[i]], timeout=nic.WAIT_INFINITELY)
641 if not self.has_printed_write_statement:
642 print("Output Wrote Data")
643 self.has_printed_write_statement = True
644
645 def stop(self):
646 """Method to stop the output"""
647 print("Stopping Output Tasks")
648 # Need to output everything in the buffer and then some zeros and we'll
649 # shut down during the zeros portion
650 for i, writer in enumerate(self.writers):
651 writer.write_many_sample(
652 np.zeros((len(self.channel_task_map[i]), self.signal_samples)),
653 timeout=nic.WAIT_INFINITELY,
654 )
655 # Now figure out how many samples are remaining
656 samples_remaining = (
657 self.tasks[0].out_stream.curr_write_pos
658 - self.tasks[0].out_stream.total_samp_per_chan_generated
659 - self.signal_samples
660 ) # Subtract off the zeros
661 time_remaining = samples_remaining / self.sample_rate
662 time.sleep(time_remaining)
663 for task in self.tasks:
664 task.stop()
665 self.has_printed_write_statement = False
666 print("Output Tasks Stopped")
667
668 def close(self):
669 """Method to close down the hardware"""
670 print("CLosing Output Tasks")
671 if self.tasks is not None:
672 for task in self.tasks:
673 task.close()
674 print("Output Tasks Closed")
675
676 def ready_for_new_output(self):
677 """Returns true if the system is ready for new outputs
678
679 Returns
680 -------
681 bool :
682 True if the hardware is accepting the next data to write."""
683 return (
684 self.tasks[0].out_stream.curr_write_pos
685 - self.tasks[0].out_stream.total_samp_per_chan_generated
686 < (self.buffer_size_factor - 1) * self.signal_samples
687 )
688
689 def _create_channel(self, channel_data: Channel, device_index):
690 """
691 Helper function to construct a channel on the hardware.
692
693 Parameters
694 ----------
695 channel_data: Channel :
696 Channel object specifying the channel parameters.
697
698 Returns
699 -------
700 channel :
701 A reference to the NIDAQmx channel created by the function
702 """
703 # Minimum Value
704 try:
705 minimum_value = float(channel_data.minimum_value)
706 except (TypeError, ValueError) as e:
707 raise ValueError(f"{channel_data.minimum_value} not a valid minimum value") from e
708 # Maximum Value
709 try:
710 maximum_value = float(channel_data.maximum_value)
711 except (TypeError, ValueError) as e:
712 raise ValueError(f"{channel_data.maximum_value} not a valid maximum value") from e
713 physical_channel = channel_data.feedback_device + "/" + channel_data.feedback_channel
714 channel = self.tasks[device_index].ao_channels.add_ao_voltage_chan(
715 physical_channel, min_val=minimum_value, max_val=maximum_value
716 )
717 return channel