Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / nidaqmx_hardware_multitask.py: 10%

328 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3This file defines an interface to the NIDAQmx hardware, and is used to set up 

4and interact with read and write tasks on the hardware. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24 

25import time 

26from typing import List 

27 

28import nidaqmx as ni 

29import nidaqmx.constants as nic 

30import nidaqmx.stream_readers as ni_read 

31import nidaqmx.stream_writers as ni_write 

32import numpy as np 

33 

34from .abstract_hardware import HardwareAcquisition, HardwareOutput 

35from .utilities import Channel, DataAcquisitionParameters 

36 

37BUFFER_SIZE_FACTOR = 3 

38 

39 

40class NIDAQmxAcquisition(HardwareAcquisition): 

41 """Class defining the interface between the controller and NI hardware 

42 

43 This class defines the interfaces between the controller and National 

44 Instruments Hardware that runs the NI-DAQmx library. It is run by the 

45 Acquisition process, and must define how to get data from the test 

46 hardware into the controller.""" 

47 

48 def __init__(self, task_trigger, output_trigger_generator): 

49 """ 

50 Constructs the NIDAQmx Acquisition class and specifies values to null. 

51 """ 

52 self.tasks = None 

53 self.channel_task_map = None 

54 self.read_datas = None 

55 self.read_data = None 

56 self.readers = None 

57 self.acquisition_delay = None 

58 self.read_triggers = None 

59 self.task_trigger = task_trigger 

60 self.output_trigger_generator = output_trigger_generator 

61 self.has_printed_read_statement = False 

62 self.trigger_output_task = None 

63 self.test_data = None 

64 

65 def set_up_data_acquisition_parameters_and_channels( 

66 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

67 ): 

68 """ 

69 Initialize the hardware and set up channels and sampling properties 

70 

71 The function must create channels on the hardware corresponding to 

72 the channels in the test. It must also set the sampling rates. 

73 

74 Parameters 

75 ---------- 

76 test_data : DataAcquisitionParameters : 

77 A container containing the data acquisition parameters for the 

78 controller set by the user. 

79 channel_data : List[Channel] : 

80 A list of ``Channel`` objects defining the channels in the test 

81 

82 Returns 

83 ------- 

84 None. 

85 

86 """ 

87 self.create_response_channels(channel_data) 

88 self.set_parameters(test_data) 

89 self.test_data = test_data 

90 

91 def create_response_channels(self, channel_data: List[Channel]): 

92 """Method to set up response channels 

93 

94 This function takes channels from the supplied list of channels and 

95 creates analog inputs on the hardware. 

96 

97 Parameters 

98 ---------- 

99 channel_data : List[Channel] : 

100 A list of ``Channel`` objects defining the channels in the test 

101 

102 """ 

103 physical_devices = list(set(channel.physical_device for channel in channel_data)) 

104 device_tasks = {} 

105 extra_task_index = 1 

106 task_names = set([]) 

107 for device in physical_devices: 

108 if self.task_trigger == 0: 

109 chassis_number = "all" 

110 else: 

111 d = ni.system.device.Device(device) 

112 try: 

113 chassis_number = f"P{d.pxi_chassis_num}" 

114 except ni.DaqError: 

115 try: 

116 chassis_number = f"C{d.compact_daq_chassis_device.name}" 

117 except ni.DaqError: 

118 chassis_number = f"E{extra_task_index}" 

119 extra_task_index += 1 

120 device_tasks[device] = chassis_number 

121 task_names.add(chassis_number) 

122 task_names = list(task_names) 

123 print(f"Input Tasks: {task_names}") 

124 print(" P: PXI, C: CDAQ, E: Other, All: All devices on one task") 

125 self.tasks = [ni.Task() for name in task_names] 

126 self.read_triggers = [None for name in task_names] 

127 self.channel_task_map = [[] for name in task_names] 

128 index = 0 

129 for channel in channel_data: 

130 task_name = device_tasks[channel.physical_device] 

131 task_index = task_names.index(task_name) 

132 self.channel_task_map[task_index].append(index) 

133 if self.task_trigger != 0: 

134 if self.read_triggers[task_index] is None: 

135 try: 

136 chassis_device = ni.system.device.Device( 

137 channel.physical_device 

138 ).compact_daq_chassis_device 

139 pfi_terminals = [ 

140 trigger for trigger in chassis_device.terminals if "/PFI0" in trigger 

141 ] 

142 print(f"PFI Terminals on CDAQ Device:\n{pfi_terminals}") 

143 self.read_triggers[task_index] = pfi_terminals[0] 

144 except ni.DaqError: 

145 self.read_triggers[task_index] = ( 

146 "/" + channel.physical_device.strip() + "/PFI0" 

147 ) 

148 index += 1 

149 self._create_channel(channel, task_index) 

150 print(f"Input Mapping: {self.channel_task_map}") 

151 

152 def set_parameters(self, test_data: DataAcquisitionParameters): 

153 """Method to set up sampling rate and other test parameters 

154 

155 This function sets the clock configuration on the NIDAQmx hardware. 

156 

157 Parameters 

158 ---------- 

159 test_data : DataAcquisitionParameters : 

160 A container containing the data acquisition parameters for the 

161 controller set by the user. 

162 

163 """ 

164 self.readers = [] 

165 self.read_datas = [] 

166 self.acquisition_delay = BUFFER_SIZE_FACTOR * test_data.samples_per_write 

167 self.read_data = np.zeros((len(test_data.channel_list), test_data.samples_per_read)) 

168 for i, (task, trigger) in enumerate(zip(self.tasks, self.read_triggers)): 

169 task.timing.cfg_samp_clk_timing( 

170 test_data.sample_rate, 

171 sample_mode=nic.AcquisitionType.CONTINUOUS, 

172 samps_per_chan=test_data.samples_per_read, 

173 ) 

174 task.in_stream.wait_mode = nic.WaitMode.POLL 

175 if trigger is not None: 

176 task.triggers.start_trigger.dig_edge_src = trigger 

177 task.triggers.start_trigger.dig_edge_edge = ni.constants.Edge.RISING 

178 task.triggers.start_trigger.trig_type = ni.constants.TriggerType.DIGITAL_EDGE 

179 print(f"Acquisition Task {i} Trigger {trigger}") 

180 self.readers.append(ni_read.AnalogMultiChannelReader(task.in_stream)) 

181 self.read_datas.append(np.zeros((len(task.ai_channels), test_data.samples_per_read))) 

182 

183 print(f"Acquisition Task {i} Actual Sample Rate: {task.timing.samp_clk_rate}") 

184 

185 def start(self): 

186 """Start acquiring data""" 

187 for task in self.tasks: 

188 task.start() 

189 if self.task_trigger != 0: 

190 print("Input is Running, waiting for PFI Trigger") 

191 self.has_printed_read_statement = False 

192 # Now we're going to output the signal 

193 if self.task_trigger == 2: 

194 print("Creating Triggering Task") 

195 self.trigger_output_task = ni.Task() 

196 self.trigger_output_task.ao_channels.add_ao_voltage_chan( 

197 self.output_trigger_generator, min_val=-3.5, max_val=3.5 

198 ) 

199 self.trigger_output_task.timing.cfg_samp_clk_timing( 

200 self.test_data.sample_rate, 

201 sample_mode=nic.AcquisitionType.CONTINUOUS, 

202 samps_per_chan=self.test_data.samples_per_write, 

203 ) 

204 self.trigger_output_task.out_stream.regen_mode = nic.RegenerationMode.ALLOW_REGENERATION 

205 writer = ni_write.AnalogMultiChannelWriter( 

206 self.trigger_output_task.out_stream, auto_start=False 

207 ) 

208 writer.write_many_sample(3 * np.ones((1, 100))) 

209 print("Starting Triggering Task") 

210 self.trigger_output_task.start() 

211 writer.write_many_sample(np.zeros((1, 100))) 

212 

213 def get_acquisition_delay(self) -> int: 

214 """ 

215 Get the number of samples between output and acquisition. 

216 

217 This function returns the number of samples that need to be read to 

218 ensure that the last output is read by the acquisition. If there is 

219 buffering in the output, this delay should be adjusted accordingly. 

220 

221 Returns 

222 ------- 

223 int 

224 Number of samples between when a dataset is written to the output 

225 and when it has finished playing. 

226 

227 """ 

228 return self.acquisition_delay 

229 

230 def read(self): 

231 """Method to read a frame of data from the controller 

232 

233 Returns 

234 ------- 

235 read_data : 

236 2D Data read from the controller with shape ``n_channels`` x 

237 ``n_samples`` 

238 """ 

239 for reader, read_data, channel_mapping in zip( 

240 self.readers, self.read_datas, self.channel_task_map 

241 ): 

242 reader.read_many_sample( 

243 read_data, 

244 number_of_samples_per_channel=read_data.shape[-1], 

245 timeout=nic.WAIT_INFINITELY, 

246 ) 

247 self.read_data[channel_mapping] = read_data 

248 if not self.has_printed_read_statement: 

249 print("Input Read Data") 

250 self.has_printed_read_statement = True 

251 return self.read_data 

252 

253 def read_remaining(self): 

254 """Method to read the rest of the data on the acquisition 

255 

256 Returns 

257 ------- 

258 read_data : 

259 2D Data read from the controller with shape ``n_channels`` x 

260 ``n_samples`` 

261 """ 

262 remaining_data = [] 

263 for task, reader, channel_mapping in zip(self.tasks, self.readers, self.channel_task_map): 

264 read_data = np.zeros((len(task.ai_channels), task.in_stream.avail_samp_per_chan)) 

265 reader.read_many_sample( 

266 read_data, 

267 number_of_samples_per_channel=read_data.shape[-1], 

268 timeout=nic.WAIT_INFINITELY, 

269 ) 

270 remaining_data.append(read_data) 

271 max_samples = max([data.shape[-1] for data in remaining_data]) 

272 read_data = np.zeros((self.read_data.shape[0], max_samples)) 

273 for data, channel_mapping in zip(remaining_data, self.channel_task_map): 

274 read_data[channel_mapping, : data.shape[-1]] = data 

275 if not self.has_printed_read_statement: 

276 print("Input Read Data") 

277 self.has_printed_read_statement = True 

278 return read_data 

279 

280 def stop(self): 

281 """Method to stop the acquisition""" 

282 print("Stopping Input Tasks") 

283 for task in self.tasks: 

284 task.stop() 

285 print("Input Tasks Stopped") 

286 if self.task_trigger == 2: 

287 print("Stopping Triggering Task") 

288 self.trigger_output_task.stop() 

289 print("Closing Triggering Task") 

290 self.trigger_output_task.close() 

291 

292 def close(self): 

293 """Method to close down the hardware""" 

294 print("Closing Input Tasks") 

295 if self.tasks is not None: 

296 for task in self.tasks: 

297 task.close() 

298 print("Input Tasks Closed") 

299 

300 def _create_channel(self, channel_data: Channel, task_index: int): 

301 """Helper function to construct a channel on the hardware. 

302 

303 Parameters 

304 ---------- 

305 channel_data: Channel : 

306 Channel object specifying the channel parameters. 

307 task_index: int : 

308 Index of the task to which the channel should be created 

309 

310 Returns 

311 ------- 

312 channel : 

313 A reference to the NIDAQmx channel created by the function 

314 """ 

315 physical_channel = channel_data.physical_device + "/" + channel_data.physical_channel 

316 # Parse the channel structure to make sure datatypes are correct 

317 # Sensitivity 

318 try: 

319 sensitivity = float(channel_data.sensitivity) 

320 except (TypeError, ValueError) as e: 

321 raise ValueError(f"{channel_data.sensitivity} not a valid sensitivity") from e 

322 # Minimum Value 

323 try: 

324 minimum_value = float(channel_data.minimum_value) 

325 except (TypeError, ValueError) as e: 

326 raise ValueError(f"{channel_data.minimum_value} not a valid minimum value") from e 

327 # Maximum Value 

328 try: 

329 maximum_value = float(channel_data.maximum_value) 

330 except (TypeError, ValueError) as e: 

331 raise ValueError(f"{channel_data.maximum_value} not a valid maximum value") from e 

332 # Channel Type and Units 

333 if channel_data.channel_type.lower() in [ 

334 "accelerometer", 

335 "acceleration", 

336 "accel", 

337 ]: 

338 channel_type = nic.UsageTypeAI.ACCELERATION_ACCELEROMETER_CURRENT_INPUT 

339 if channel_data.unit.lower() in ["g", "gs"]: 

340 unit = nic.AccelUnits.G 

341 else: 

342 raise ValueError(f"Accelerometer units must be in G, not {channel_data.unit}") 

343 elif channel_data.channel_type.lower() == "force": 

344 channel_type = nic.UsageTypeAI.FORCE_IEPE_SENSOR 

345 if channel_data.unit.lower() in [ 

346 "lb", 

347 "pound", 

348 "pounds", 

349 "lbf", 

350 "lbs", 

351 "lbfs", 

352 ]: 

353 unit = nic.ForceUnits.POUNDS 

354 elif channel_data.unit.lower() in ["n", "newton", "newtons", "ns"]: 

355 unit = nic.ForceUnits.NEWTONS 

356 else: 

357 raise ValueError(f"Unrecognized Force Unit {channel_data.unit}") 

358 elif channel_data.channel_type.lower() in ["voltage", "volt"]: 

359 channel_type = nic.UsageTypeAI.VOLTAGE 

360 unit = None 

361 else: 

362 raise ValueError( 

363 f"{channel_type} not a valid channel type. " 

364 'Must be one of ["acceleration","accelerometer","accel","force","voltage","volt"]' 

365 ) 

366 # Excitation Source 

367 if channel_data.excitation_source.lower() == "internal": 

368 excitation_source = nic.ExcitationSource.INTERNAL 

369 try: 

370 excitation = float(channel_data.excitation) 

371 except (TypeError, ValueError) as e: 

372 raise ValueError(f"{channel_data.excitation} not a valid excitation") from e 

373 elif channel_data.excitation_source.lower() == "none": 

374 excitation_source = nic.ExcitationSource.NONE 

375 excitation = 0 

376 else: 

377 raise ValueError( 

378 f"{channel_data.excitation_source} not a valid excitation source. " 

379 'Must be one of ["internal","none"]' 

380 ) 

381 # Now go and create the channel 

382 if channel_type != nic.UsageTypeAI.VOLTAGE: 

383 min_val = minimum_value * 1000 / sensitivity 

384 max_val = maximum_value * 1000 / sensitivity 

385 else: 

386 min_val = minimum_value 

387 max_val = maximum_value 

388 if channel_type == nic.UsageTypeAI.ACCELERATION_ACCELEROMETER_CURRENT_INPUT: 

389 try: 

390 channel = self.tasks[task_index].ai_channels.add_ai_accel_chan( 

391 physical_channel, 

392 min_val=min_val, 

393 max_val=max_val, 

394 units=unit, 

395 sensitivity=sensitivity, 

396 sensitivity_units=nic.AccelSensitivityUnits.M_VOLTS_PER_G, 

397 current_excit_source=excitation_source, 

398 current_excit_val=excitation, 

399 ) 

400 except AttributeError: 

401 channel = self.tasks[task_index].ai_channels.add_ai_accel_chan( 

402 physical_channel, 

403 min_val=min_val, 

404 max_val=max_val, 

405 units=unit, 

406 sensitivity=sensitivity, 

407 sensitivity_units=nic.AccelSensitivityUnits.MILLIVOLTS_PER_G, 

408 current_excit_source=excitation_source, 

409 current_excit_val=excitation, 

410 ) 

411 elif channel_type == nic.UsageTypeAI.FORCE_IEPE_SENSOR: 

412 try: 

413 channel = self.tasks[task_index].ai_channels.add_ai_force_iepe_chan( 

414 physical_channel, 

415 min_val=min_val, 

416 max_val=max_val, 

417 units=unit, 

418 sensitivity=sensitivity, 

419 sensitivity_units=( 

420 nic.ForceIEPESensorSensitivityUnits.M_VOLTS_PER_NEWTON 

421 if unit == nic.ForceUnits.NEWTONS 

422 else nic.ForceIEPESensorSensitivityUnits.M_VOLTS_PER_POUND 

423 ), 

424 current_excit_source=excitation_source, 

425 current_excit_val=excitation, 

426 ) 

427 except AttributeError: 

428 channel = self.tasks[task_index].ai_channels.add_ai_force_iepe_chan( 

429 physical_channel, 

430 min_val=min_val, 

431 max_val=max_val, 

432 units=unit, 

433 sensitivity=sensitivity, 

434 sensitivity_units=( 

435 nic.ForceIEPESensorSensitivityUnits.MILLIVOLTS_PER_NEWTON 

436 if unit == nic.ForceUnits.NEWTONS 

437 else nic.ForceIEPESensorSensitivityUnits.MILLIVOLTS_PER_POUND 

438 ), 

439 current_excit_source=excitation_source, 

440 current_excit_val=excitation, 

441 ) 

442 elif channel_type == nic.UsageTypeAI.VOLTAGE: 

443 channel = self.tasks[task_index].ai_channels.add_ai_voltage_chan( 

444 physical_channel, 

445 min_val=min_val, 

446 max_val=max_val, 

447 units=nic.VoltageUnits.VOLTS, 

448 ) 

449 else: 

450 raise ValueError(f"Channel Type Not Implemented: {channel_type}") 

451 return channel 

452 

453 

454class NIDAQmxOutput(HardwareOutput): 

455 """Class defining the interface between the controller and NI hardware 

456 

457 This class defines the interfaces between the controller and National 

458 Instruments Hardware that runs the NI-DAQmx library. It is run by the 

459 Output process, and must define how to get data from the controller to the 

460 output hardware.""" 

461 

462 def __init__(self, task_trigger, output_trigger_generator): 

463 """ 

464 Constructs the NIDAQmx Output class and initializes values to null. 

465 """ 

466 self.tasks = None 

467 self.channel_task_map = None 

468 self.writers = None 

469 self.write_triggers = None 

470 self.signal_samples = None 

471 self.sample_rate = None 

472 self.buffer_size_factor = BUFFER_SIZE_FACTOR 

473 self.task_trigger = task_trigger 

474 self.output_trigger_generator = output_trigger_generator 

475 self.has_printed_write_statement = False 

476 

477 def set_up_data_output_parameters_and_channels( 

478 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

479 ): 

480 """ 

481 Initialize the hardware and set up sources and sampling properties 

482 

483 The function must create channels on the hardware corresponding to 

484 the sources in the test. It must also set the sampling rates. 

485 

486 Parameters 

487 ---------- 

488 test_data : DataAcquisitionParameters : 

489 A container containing the data acquisition parameters for the 

490 controller set by the user. 

491 channel_data : List[Channel] : 

492 A list of ``Channel`` objects defining the channels in the test 

493 

494 Returns 

495 ------- 

496 None. 

497 

498 """ 

499 self.create_sources(channel_data) 

500 self.set_parameters(test_data) 

501 

502 def create_sources(self, channel_data: List[Channel]): 

503 """Method to set up excitation sources 

504 

505 This function takes channels from the supplied list of channels and 

506 creates analog outputs on the hardware. 

507 

508 Parameters 

509 ---------- 

510 channel_data : List[Channel] : 

511 A list of ``Channel`` objects defining the channels in the test 

512 """ 

513 # Get pairs of product_types and physical device names 

514 task_names = set() 

515 extra_task_index = 1 

516 for channel in channel_data: 

517 if not (channel.feedback_device is None) and not ( 

518 channel.feedback_device.strip() == "" 

519 ): 

520 device_name = channel.feedback_device 

521 device = ni.system.device.Device(device_name) 

522 try: 

523 chassis_number = f"P{device.pxi_chassis_num}" 

524 except ni.DaqError: 

525 try: 

526 chassis_number = f"C{device.compact_daq_chassis_device.name}" 

527 except ni.DaqError: 

528 chassis_number = f"E{extra_task_index}" 

529 extra_task_index += 1 

530 product_name = device.product_type 

531 task_names.add((chassis_number, product_name)) 

532 task_names = list(task_names) 

533 print(f"Output Tasks: {task_names}") 

534 print(" P: PXI, C: CDAQ, E: Other, All: All devices on one task") 

535 # Check if it's a CDAQ device 

536 

537 self.tasks = [ni.Task() for name in task_names] 

538 self.write_triggers = [None for name in task_names] 

539 self.channel_task_map = [[] for name in task_names] 

540 index = 0 

541 extra_task_index = 1 

542 for channel in channel_data: 

543 if not (channel.feedback_device is None) and not ( 

544 channel.feedback_device.strip() == "" 

545 ): 

546 device_name = channel.feedback_device 

547 device = ni.system.device.Device(device_name) 

548 try: 

549 chassis_number = f"P{device.pxi_chassis_num}" 

550 except ni.DaqError: 

551 try: 

552 chassis_number = f"C{device.compact_daq_chassis_device.name}" 

553 except ni.DaqError: 

554 chassis_number = f"E{extra_task_index}" 

555 extra_task_index += 1 

556 product_name = device.product_type 

557 task_index = task_names.index((chassis_number, product_name)) 

558 self.channel_task_map[task_index].append(index) 

559 index += 1 

560 if self.write_triggers[task_index] is None: 

561 if self.task_trigger == 0: 

562 try: 

563 chassis_device = ni.system.device.Device( 

564 channel.feedback_device 

565 ).compact_daq_chassis_device 

566 self.write_triggers[task_index] = [ 

567 trigger 

568 for trigger in chassis_device.terminals 

569 if "ai/StartTrigger" in trigger 

570 ][0] 

571 except ni.DaqError: 

572 self.write_triggers[task_index] = ( 

573 "/" + channel_data[0].physical_device.strip() + "/ai/StartTrigger" 

574 ) 

575 else: 

576 try: 

577 chassis_device = ni.system.device.Device( 

578 channel.feedback_device 

579 ).compact_daq_chassis_device 

580 self.write_triggers[task_index] = [ 

581 trigger for trigger in chassis_device.terminals if "PFI0" in trigger 

582 ][0] 

583 except ni.DaqError: 

584 self.write_triggers[task_index] = ( 

585 "/" + channel.feedback_device.strip() + "/PFI0" 

586 ) 

587 self._create_channel(channel, task_index) 

588 print(f"Output Mapping: {self.channel_task_map}") 

589 

590 def set_parameters(self, test_data: DataAcquisitionParameters): 

591 """Method to set up sampling rate and other test parameters 

592 

593 This function sets the clock configuration on the NIDAQmx hardware. 

594 

595 Parameters 

596 ---------- 

597 test_data : DataAcquisitionParameters : 

598 A container containing the data acquisition parameters for the 

599 controller set by the user. 

600 """ 

601 self.signal_samples = test_data.samples_per_write 

602 self.sample_rate = test_data.sample_rate 

603 self.writers = [] 

604 for i, (task, trigger) in enumerate(zip(self.tasks, self.write_triggers)): 

605 task.timing.cfg_samp_clk_timing( 

606 test_data.sample_rate, 

607 sample_mode=nic.AcquisitionType.CONTINUOUS, 

608 samps_per_chan=test_data.samples_per_write, 

609 ) 

610 task.out_stream.regen_mode = nic.RegenerationMode.DONT_ALLOW_REGENERATION 

611 # task.out_stream.relative_to = nic.WriteRelativeTo.CURRENT_WRITE_POSITION 

612 task.triggers.start_trigger.dig_edge_src = trigger 

613 task.triggers.start_trigger.dig_edge_edge = ni.constants.Edge.RISING 

614 task.triggers.start_trigger.trig_type = ni.constants.TriggerType.DIGITAL_EDGE 

615 print(f"Output Task {i} Trigger {trigger}") 

616 task.out_stream.output_buf_size = self.buffer_size_factor * test_data.samples_per_write 

617 self.writers.append( 

618 ni_write.AnalogMultiChannelWriter(task.out_stream, auto_start=False) 

619 ) 

620 print(f"Output Task {i} Actual Sample Rate: {task.timing.samp_clk_rate}") 

621 

622 def start(self): 

623 """Method to start acquiring data""" 

624 for task in self.tasks: 

625 task.start() 

626 if self.task_trigger != 0: 

627 print("Output is Running, waiting for PFI Trigger") 

628 

629 def write(self, data): 

630 """Method to write a frame of data 

631 

632 Parameters 

633 ---------- 

634 data : np.ndarray 

635 2D Data to be written to the controller with shape ``n_sources`` x 

636 ``n_samples`` 

637 

638 """ 

639 for i, writer in enumerate(self.writers): 

640 writer.write_many_sample(data[self.channel_task_map[i]], timeout=nic.WAIT_INFINITELY) 

641 if not self.has_printed_write_statement: 

642 print("Output Wrote Data") 

643 self.has_printed_write_statement = True 

644 

645 def stop(self): 

646 """Method to stop the output""" 

647 print("Stopping Output Tasks") 

648 # Need to output everything in the buffer and then some zeros and we'll 

649 # shut down during the zeros portion 

650 for i, writer in enumerate(self.writers): 

651 writer.write_many_sample( 

652 np.zeros((len(self.channel_task_map[i]), self.signal_samples)), 

653 timeout=nic.WAIT_INFINITELY, 

654 ) 

655 # Now figure out how many samples are remaining 

656 samples_remaining = ( 

657 self.tasks[0].out_stream.curr_write_pos 

658 - self.tasks[0].out_stream.total_samp_per_chan_generated 

659 - self.signal_samples 

660 ) # Subtract off the zeros 

661 time_remaining = samples_remaining / self.sample_rate 

662 time.sleep(time_remaining) 

663 for task in self.tasks: 

664 task.stop() 

665 self.has_printed_write_statement = False 

666 print("Output Tasks Stopped") 

667 

668 def close(self): 

669 """Method to close down the hardware""" 

670 print("CLosing Output Tasks") 

671 if self.tasks is not None: 

672 for task in self.tasks: 

673 task.close() 

674 print("Output Tasks Closed") 

675 

676 def ready_for_new_output(self): 

677 """Returns true if the system is ready for new outputs 

678 

679 Returns 

680 ------- 

681 bool : 

682 True if the hardware is accepting the next data to write.""" 

683 return ( 

684 self.tasks[0].out_stream.curr_write_pos 

685 - self.tasks[0].out_stream.total_samp_per_chan_generated 

686 < (self.buffer_size_factor - 1) * self.signal_samples 

687 ) 

688 

689 def _create_channel(self, channel_data: Channel, device_index): 

690 """ 

691 Helper function to construct a channel on the hardware. 

692 

693 Parameters 

694 ---------- 

695 channel_data: Channel : 

696 Channel object specifying the channel parameters. 

697 

698 Returns 

699 ------- 

700 channel : 

701 A reference to the NIDAQmx channel created by the function 

702 """ 

703 # Minimum Value 

704 try: 

705 minimum_value = float(channel_data.minimum_value) 

706 except (TypeError, ValueError) as e: 

707 raise ValueError(f"{channel_data.minimum_value} not a valid minimum value") from e 

708 # Maximum Value 

709 try: 

710 maximum_value = float(channel_data.maximum_value) 

711 except (TypeError, ValueError) as e: 

712 raise ValueError(f"{channel_data.maximum_value} not a valid maximum value") from e 

713 physical_channel = channel_data.feedback_device + "/" + channel_data.feedback_channel 

714 channel = self.tasks[device_index].ao_channels.add_ao_voltage_chan( 

715 physical_channel, min_val=minimum_value, max_val=maximum_value 

716 ) 

717 return channel