Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / lanxi_hardware_multiprocessing.py: 9%

589 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Implementation for the HBK LAN-XI Hardware 

4 

5Rattlesnake Vibration Control Software 

6Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

7(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

8Government retains certain rights in this software. 

9 

10This program is free software: you can redistribute it and/or modify 

11it under the terms of the GNU General Public License as published by 

12the Free Software Foundation, either version 3 of the License, or 

13(at your option) any later version. 

14 

15This program is distributed in the hope that it will be useful, 

16but WITHOUT ANY WARRANTY; without even the implied warranty of 

17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18GNU General Public License for more details. 

19 

20You should have received a copy of the GNU General Public License 

21along with this program. If not, see <https://www.gnu.org/licenses/>. 

22""" 

23 

24import multiprocessing as mp 

25import re 

26import socket 

27import time 

28from typing import List 

29 

30import numpy as np 

31import requests 

32 

33from .abstract_hardware import HardwareAcquisition, HardwareOutput 

34from .lanxi_stream import OpenapiHeader, OpenapiMessage 

35from .utilities import Channel, DataAcquisitionParameters 

36 

37OUTPUT_RATE = 131072 

38LANXI_STATE_TIMEOUT = 255.0 

39LANXI_STATE_REPORT = 10 

40VALID_FILTERS = ["DC", "0.7 Hz", "7.0 Hz", "22.4 Hz", "Intensity"] 

41VALID_RANGES = ["0.316", "1", "10", "31.6"] 

42HEADER_LENGTH = 28 

43BUFFER_SIZE = 2.5 

44 

45LANXI_STATE_SHUTDOWN = { 

46 "RecorderRecording": "/rest/rec/measurements/stop", 

47 "RecorderStreaming": "/rest/rec/finish", 

48 "RecorderOpened": "/rest/rec/close", 

49 "Idle": None, 

50} 

51 

52IPV4_PATTERN = r"^((25[0-5]|(2[0-4]|1[0-9]|[1-9]|)[0-9])(\.(?!$)|$)){4}$" 

53IPV6_PATTERN = r"\[\s*([0-9a-fA-F]{1,4}:){0,7}(:[0-9a-fA-F]{1,4})*%?\d*\s*\]" 

54 

55# TO DO List 

56# TODO Get responses each time a get or put is done so we know if it was successful 

57# TODO Shut down the data acquisition more quickly 

58 

59 

60class LanXIError(Exception): 

61 """Exception to signify an error using LAN-XI""" 

62 

63 

64def read_lanxi(socket_handle: socket.socket): 

65 """ 

66 Reads and interprets data from a Lan-XI 

67 

68 Parameters 

69 ---------- 

70 socket_handle : socket.socket 

71 Handle to the socket that the communication is happening over. 

72 

73 Returns 

74 ------- 

75 message_type : OpenapiStream.Header.EMessageType 

76 Enum determining what the data type is 

77 data : np.ndarray or dict 

78 The data read from the device. Will be a np.ndarray for a signal and 

79 a dictionary for a interpretation 

80 

81 """ 

82 # print('Reading data from {:}:{:}'.format(*socket_handle.getpeername())) 

83 data = socket_handle.recv(HEADER_LENGTH, socket.MSG_WAITALL) 

84 if len(data) == 0: 

85 raise LanXIError("Socket is not connected anymore") 

86 wstream = OpenapiHeader.from_bytes(data) 

87 content_length = wstream.message_length + HEADER_LENGTH 

88 # We use the header's content_length to collect the rest of a package 

89 while len(data) < content_length: 

90 packet = socket_handle.recv(content_length - len(data)) 

91 data += packet 

92 # Now we parse the package 

93 try: 

94 package = OpenapiMessage.from_bytes(data) 

95 except EOFError as e: 

96 print(f"Data Invalid {data}") 

97 raise e 

98 if package.header.message_type == OpenapiMessage.Header.EMessageType.e_interpretation: 

99 interpretation_dict = {} 

100 for interpretation in package.message.interpretations: 

101 interpretation_dict[interpretation.descriptor_type] = interpretation.value 

102 return (package.header.message_type, interpretation_dict) 

103 elif ( 

104 package.header.message_type == OpenapiMessage.Header.EMessageType.e_signal_data 

105 ): # If the data contains signal data 

106 array = [] 

107 for signal in package.message.signals: # For each signal in the package 

108 array.append(np.array([x.calc_value for x in signal.values]) / 2**23) 

109 return package.header.message_type, np.concatenate(array, axis=-1) 

110 # If 'quality data' message, then record information on data quality issues 

111 elif package.header.message_type == OpenapiMessage.Header.EMessageType.e_data_quality: 

112 ip, port = socket_handle.getpeername() 

113 for q in package.message.qualities: 

114 if q.validity_flags.overload: 

115 print(f"Overload Detected on {ip}:{port}") 

116 if q.validity_flags.invalid: 

117 print(f"Invalid Data Detected on {ip}:{port}") 

118 if q.validity_flags.overrun: 

119 print(f"Overrun Detected on {ip}:{port}") 

120 return None, None 

121 else: 

122 raise LanXIError(f"Unknown Message Type: {package.header.message_type}") 

123 

124 

125def lanxi_multisocket_reader( 

126 socket_handles: List[socket.socket], 

127 active_channels_list: List[int], 

128 data_queues: List[mp.queues.Queue], 

129): 

130 """ 

131 Reads data from all channels on multiple modules. 

132 

133 This function is designed to be run by a multiprocessing Process 

134 

135 Parameters 

136 ---------- 

137 socket_handles : List[socket.socket] 

138 A list of the sockets for the cards on this module. 

139 active_channels_list : List[int] 

140 A list of number of active channels on each module. 

141 data_queues : List[mp.queues.Queue] 

142 A set of queues to pass data back to the main process. 

143 

144 """ 

145 print( 

146 "Starting to record from:\n {:}".format( 

147 "\n ".join( 

148 ["{:}:{:}".format(*socket_handle.getpeername()) for socket_handle in socket_handles] 

149 ) 

150 ) 

151 ) 

152 try: 

153 while True: 

154 for socket_handle, active_channels, data_queue in zip( 

155 socket_handles, active_channels_list, data_queues 

156 ): 

157 socket_data = [] 

158 socket_data_types = [] 

159 while len(socket_data) < active_channels: 

160 message_type, data = read_lanxi(socket_handle) 

161 # print('Reading {:}:{:} Data Type {:}'.format( 

162 # *socket_handle.getpeername(),message_type)) 

163 if message_type is not None: 

164 socket_data.append(data) 

165 socket_data_types.append(message_type) 

166 # Make sure they are all the same type 

167 assert all([data_type == socket_data_types[0] for data_type in socket_data_types]) 

168 if socket_data_types[0] == OpenapiMessage.Header.EMessageType.e_interpretation: 

169 # print('{:}:{:} Putting Interpretation to Queue'.format( 

170 # *socket_handle.getpeername())) 

171 data_queue.put(("Interpretation", socket_data)) 

172 elif socket_data_types[0] == OpenapiMessage.Header.EMessageType.e_signal_data: 

173 # print('{:}:{:} Putting Signal to Queue'.format( 

174 # *socket_handle.getpeername())) 

175 data_queue.put(("Signal", socket_data)) 

176 else: 

177 raise ValueError( 

178 "Unknown Signal Type {:} in {:}:{:}".format( # pylint: disable=consider-using-f-string 

179 socket_data_types[0], *socket_handle.getpeername() 

180 ) 

181 ) 

182 except LanXIError: 

183 for socket_handle, data_queue in zip(socket_handles, data_queues): 

184 # The socket has closed, so gracefully close down 

185 ip, port = socket_handle.getpeername() 

186 print(f"Closing Socket {ip}:{port}") 

187 while True: 

188 try: 

189 print(f"Emptying Queue {ip}:{port}") 

190 data_queue.get(False) 

191 except mp.queues.Empty: 

192 print(f"Returning {ip}:{port}") 

193 break 

194 return 

195 

196 

197def create_harware_maps(acquisition_map, output_map, channel_list): 

198 """Creates mapping between the LAN-XI channels and the rattlesnake channel list 

199 

200 Parameters 

201 ---------- 

202 acquisition_map : dict 

203 A dictionary that will be populated with the acquisition map information. The dictionary 

204 will be nested, with the first key being the physical device and the second key being the 

205 physical channel. The value will be a tuple containing the channel index and the Channel 

206 itself. 

207 output_map : dict 

208 A dictionary that will be populated with the acquisition map information. The dictionary 

209 will be nested, with the first key being the feedback device and the second key being the 

210 feedback channel. The value will be a tuple containing the channel index and the Channel 

211 itself. 

212 channel_list : list of Channel objects 

213 A list of channels in the rattlesnake test 

214 """ 

215 for i, channel in enumerate(channel_list): 

216 if channel.physical_device not in acquisition_map: 

217 acquisition_map[channel.physical_device] = {} 

218 acquisition_map[channel.physical_device][int(channel.physical_channel)] = ( 

219 i, 

220 channel, 

221 ) 

222 for i, channel in enumerate( 

223 [channel for channel in channel_list if channel.feedback_device is not None] 

224 ): 

225 if channel.feedback_device not in output_map: 

226 output_map[channel.feedback_device] = {} 

227 output_map[channel.feedback_device][int(channel.feedback_channel)] = i, channel 

228 

229 

230def wait_for_ptp_state(host: str, state: str): 

231 """Waits until hardware is at a current state 

232 

233 Parameters 

234 ---------- 

235 host : str 

236 The address of the host to wait for 

237 state : str 

238 The name of the state to wait until. 

239 

240 Returns 

241 ------- 

242 bool 

243 True if the state has changed, False if the hardware timed out. 

244 

245 """ 

246 start_time = time.time() 

247 current_state = "" 

248 iteration = 0 

249 while True: 

250 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60) 

251 state_data = response.json() 

252 current_state = state_data["ptpStatus"] 

253 if current_state == state: 

254 result = True 

255 break 

256 if time.time() - start_time > LANXI_STATE_TIMEOUT: 

257 result = False 

258 break 

259 time.sleep(1) 

260 iteration += 1 

261 if iteration % LANXI_STATE_REPORT == 0: 

262 print(f"Host {host} at {current_state} state, waiting for {state}") 

263 if not result: 

264 raise LanXIError( 

265 f"Wait for PTP State {state} timed out on host {host}. Last retrieved " 

266 f"state: {current_state}" 

267 ) 

268 return result 

269 

270 

271def wait_for_recorder_state(host: str, state: str): 

272 """Waits until hardware is at a current state 

273 

274 Parameters 

275 ---------- 

276 host : str 

277 The address of the host to wait for 

278 state : str 

279 The name of the state to wait until. 

280 

281 Returns 

282 ------- 

283 bool 

284 True if the state has changed, False if the hardware timed out. 

285 

286 """ 

287 start_time = time.time() 

288 current_state = "" 

289 iteration = 0 

290 while True: 

291 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60) 

292 state_data = response.json() 

293 current_state = state_data["moduleState"] 

294 if current_state == state: 

295 result = True 

296 break 

297 if current_state == "PostFailed": 

298 result = False 

299 break 

300 if time.time() - start_time > LANXI_STATE_TIMEOUT: 

301 result = False 

302 break 

303 time.sleep(1) 

304 iteration += 1 

305 if iteration % LANXI_STATE_REPORT == 0: 

306 print(f"Host {host} at {current_state} state, waiting for {state}") 

307 if not result: 

308 raise LanXIError( 

309 f"Wait for Recorder State {state} timed out on host {host}. Last retrieved " 

310 f"state: {current_state}" 

311 ) 

312 return result 

313 

314 

315def wait_for_input_state(host: str, state: str): 

316 """Waits until hardware is at a current state 

317 

318 Parameters 

319 ---------- 

320 host : str 

321 The address of the host to wait for 

322 state : str 

323 The name of the state to wait until. 

324 

325 Returns 

326 ------- 

327 bool 

328 True if the state has changed, False if the hardware timed out. 

329 

330 """ 

331 start_time = time.time() 

332 current_state = "" 

333 iteration = 0 

334 while True: 

335 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60) 

336 state_data = response.json() 

337 current_state = state_data["inputStatus"] 

338 if current_state == state: 

339 result = True 

340 break 

341 if time.time() - start_time > LANXI_STATE_TIMEOUT: 

342 result = False 

343 break 

344 time.sleep(1) 

345 iteration += 1 

346 if iteration % LANXI_STATE_REPORT == 0: 

347 print(f"Host {host} at {current_state} state, waiting for {state}") 

348 if not result: 

349 raise LanXIError( 

350 f"Wait for Input State {state} timed out on host {host}. Last retrieved state: " 

351 f"{current_state}" 

352 ) 

353 return result 

354 

355 

356def close_recorder(host): 

357 """Closes the host based on its current state""" 

358 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60) 

359 state_data = response.json() 

360 current_state = state_data["moduleState"] 

361 if current_state == "RecorderRecording": 

362 print(f"Stopping Measurement on {host}") 

363 requests.put("http://" + host + "/rest/rec/measurements/stop", timeout=60) 

364 wait_for_recorder_state(host, "RecorderStreaming") 

365 close_recorder(host) 

366 elif current_state == "RecorderConfiguring": 

367 response = requests.get("http://" + host + "/rest/rec/channels/input/default", timeout=60) 

368 channel_settings = response.json() 

369 response = requests.put( 

370 "http://" + host + "/rest/rec/channels/input", json=channel_settings, timeout=60 

371 ) 

372 wait_for_recorder_state(host, "RecorderStreaming") 

373 close_recorder(host) 

374 elif current_state == "RecorderStreaming": 

375 print(f"Finishing Streaming on {host}") 

376 requests.put("http://" + host + "/rest/rec/finish", timeout=60) 

377 wait_for_recorder_state(host, "RecorderOpened") 

378 close_recorder(host) 

379 elif current_state == "RecorderOpened": 

380 print(f"Closing Recorder on {host}") 

381 requests.put("http://" + host + "/rest/rec/close", timeout=60) 

382 wait_for_recorder_state(host, "Idle") 

383 close_recorder(host) 

384 elif current_state == "Idle": 

385 print(f"Recorder {host} Idle") 

386 else: 

387 raise LanXIError(f"Unknown State {current_state} on {host}") 

388 return 

389 

390 

391class LanXIAcquisition(HardwareAcquisition): 

392 """Class defining the interface between LAN-XI acquisition and the controller 

393 

394 This class defines the interfaces between the controller and the 

395 data acquisition portion of the hardware. It is run by the Acquisition 

396 process, and must define how to get data from the test hardware into the 

397 controller.""" 

398 

399 def __init__(self, maximum_processes): 

400 """ 

401 Constructs the LAN-XI Acquisition class and specifies values to null. 

402 """ 

403 self.acquisition_map = {} 

404 self.output_map = {} 

405 self.sockets = {} 

406 self.processes = {} 

407 self.process_data_queues = {} 

408 self.interpretations = None 

409 self.master_address = None 

410 self.slave_addresses = set([]) 

411 self.samples_per_read = None 

412 self.last_acquisition_time = None 

413 self.maximum_processes = maximum_processes 

414 self.modules_per_process = None 

415 self.total_processes = None 

416 self.acquisition_delay = None 

417 

418 def set_up_data_acquisition_parameters_and_channels( 

419 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

420 ): 

421 """ 

422 Initialize the hardware and set up channels and sampling properties 

423 

424 The function must create channels on the hardware corresponding to 

425 the channels in the test. It must also set the sampling rates. 

426 

427 Parameters 

428 ---------- 

429 test_data : DataAcquisitionParameters : 

430 A container containing the data acquisition parameters for the 

431 controller set by the user. 

432 channel_data : List[Channel] : 

433 A list of ``Channel`` objects defining the channels in the test 

434 Returns 

435 ------- 

436 None. 

437 

438 """ 

439 # Now create a hardware map that will help us do bookkeeping 

440 create_harware_maps(self.acquisition_map, self.output_map, channel_data) 

441 # Go through the channel table and get the hardware and channel 

442 # information 

443 host_addresses = [channel.physical_device for channel in channel_data] 

444 host_addresses += [ 

445 channel.feedback_device 

446 for channel in channel_data 

447 if ( 

448 not (channel.feedback_device is None) 

449 and not (channel.feedback_device.strip() == "") 

450 ) 

451 ] 

452 self.master_address = host_addresses[0] 

453 self.slave_addresses = set( 

454 [address for address in host_addresses if not address == self.master_address] 

455 ) 

456 self.samples_per_read = test_data.samples_per_read 

457 modules_per_process_floor = len(self.acquisition_map) // self.maximum_processes 

458 modules_per_process_remainder = len(self.acquisition_map) % self.maximum_processes 

459 if modules_per_process_remainder == 0: 

460 self.modules_per_process = modules_per_process_floor 

461 else: 

462 self.modules_per_process = modules_per_process_floor + 1 

463 self.total_processes = (len(self.acquisition_map) // self.modules_per_process) + ( 

464 0 if len(self.acquisition_map) % self.modules_per_process == 0 else 1 

465 ) 

466 self.acquisition_delay = ( 

467 (BUFFER_SIZE + 2) * test_data.samples_per_write / test_data.output_oversample 

468 ) 

469 

470 def start(self): 

471 """Method to start acquiring data from the hardware""" 

472 self.sockets = {} 

473 self.processes = {} 

474 self.process_data_queues = {} 

475 # Apply the trigger for multi-frame acquisition 

476 if len(set(self.acquisition_map) | set(self.output_map)) > 1: 

477 requests.put("http://" + self.master_address + "/rest/rec/apply", timeout=60) 

478 # Collect the sockets 

479 for acquisition_device in self.acquisition_map: 

480 response = requests.get( 

481 "http://" + acquisition_device + "/rest/rec/destination/socket", timeout=60 

482 ) 

483 port = response.json()["tcpPort"] 

484 # Connect to the socket 

485 is_ipv4 = re.search(IPV4_PATTERN, acquisition_device) is not None 

486 is_ipv6 = re.search(IPV6_PATTERN, acquisition_device) is not None 

487 if is_ipv4: 

488 self.sockets[acquisition_device] = socket.socket( 

489 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP 

490 ) 

491 elif is_ipv6: 

492 self.sockets[acquisition_device] = socket.socket( 

493 socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP 

494 ) 

495 else: # This will crash but is fixed in overhaul version so... 

496 self.sockets[acquisition_device] = socket.socket( 

497 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP 

498 ) 

499 self.sockets[acquisition_device].connect((acquisition_device, port)) 

500 for slave_address in self.slave_addresses: 

501 if slave_address in self.acquisition_map: 

502 requests.post("http://" + slave_address + "/rest/rec/measurements", timeout=60) 

503 if self.master_address in self.acquisition_map: 

504 requests.post("http://" + self.master_address + "/rest/rec/measurements", timeout=60) 

505 print("Started Measurements") 

506 # Wait for the module state to be recorder streaming 

507 for slave_address in self.slave_addresses: 

508 if slave_address in self.acquisition_map: 

509 wait_for_recorder_state(slave_address, "RecorderRecording") 

510 if self.master_address in self.acquisition_map: 

511 wait_for_recorder_state(self.master_address, "RecorderRecording") 

512 

513 # Here we need to start the processes 

514 # Split it up into multiple processes 

515 socket_handles = [] 

516 active_channels_list = [] 

517 data_queues = [] 

518 for acquisition_device, channel_dict in self.acquisition_map.items(): 

519 self.process_data_queues[acquisition_device] = mp.Queue() 

520 active_channels = len(channel_dict) 

521 

522 socket_handles.append(self.sockets[acquisition_device]) 

523 active_channels_list.append(active_channels) 

524 data_queues.append(self.process_data_queues[acquisition_device]) 

525 

526 if len(socket_handles) % self.modules_per_process == 0: 

527 self.processes[acquisition_device] = mp.Process( 

528 target=lanxi_multisocket_reader, 

529 args=(socket_handles, active_channels_list, data_queues), 

530 ) 

531 self.processes[acquisition_device].start() 

532 socket_handles = [] 

533 active_channels_list = [] 

534 data_queues = [] 

535 if len(socket_handles) > 0: 

536 self.processes[acquisition_device] = mp.Process( 

537 target=lanxi_multisocket_reader, 

538 args=(socket_handles, active_channels_list, data_queues), 

539 ) 

540 self.processes[acquisition_device].start() 

541 socket_handles = [] 

542 active_channels_list = [] 

543 data_queues = [] 

544 

545 def get_acquisition_delay(self) -> int: 

546 """ 

547 Get the number of samples between output and acquisition. 

548 

549 This function returns the number of samples that need to be read to 

550 ensure that the last output is read by the acquisition. If there is 

551 buffering in the output, this delay should be adjusted accordingly. 

552 

553 Returns 

554 ------- 

555 int 

556 Number of samples between when a dataset is written to the output 

557 and when it has finished playing. 

558 

559 """ 

560 return self.acquisition_delay 

561 

562 def read(self): 

563 """Method to read a frame of data from the hardware""" 

564 samples = 0 

565 full_read_data = [] 

566 while samples < self.samples_per_read: 

567 read_data = [] 

568 acquisition_indices = [] 

569 for acquisition_device, _ in self.process_data_queues.items(): 

570 # We are going to loop until we get a signal which should be every time except the 

571 # first, which will pass the interpretation. 

572 while True: 

573 # Get the data from the queues 

574 # print('Reading from queue') 

575 data_type, data = self.process_data_queues[acquisition_device].get() 

576 if data_type == "Interpretation": 

577 if self.interpretations is None: 

578 self.interpretations = {} 

579 self.interpretations[acquisition_device] = data # Store the interpretation 

580 elif data_type == "Signal": 

581 for signal, channel_number, interpretation in zip( 

582 data, 

583 sorted(self.acquisition_map[acquisition_device]), 

584 self.interpretations[acquisition_device], 

585 ): 

586 acquisition_index, _ = self.acquisition_map[acquisition_device][ 

587 channel_number 

588 ] 

589 array = ( 

590 signal 

591 * interpretation[ 

592 OpenapiMessage.Interpretation.EDescriptorType.scale_factor 

593 ] # This is the scale factor 

594 + interpretation[ 

595 OpenapiMessage.Interpretation.EDescriptorType.offset 

596 ] # This is the offset 

597 ) 

598 read_data.append(array) 

599 acquisition_indices.append(acquisition_index) 

600 break # Exit the loop because we found the signal 

601 # Check if all the data are the same length 

602 index_map = np.empty(len(acquisition_indices), dtype=int) 

603 index_map[acquisition_indices] = np.arange(len(acquisition_indices)) 

604 read_data = np.array(read_data)[index_map] 

605 full_read_data.append(read_data) 

606 samples += read_data.shape[-1] 

607 full_read_data = np.concatenate(full_read_data, axis=-1) 

608 current_time = time.time() 

609 if self.last_acquisition_time is not None: 

610 dtime = current_time - self.last_acquisition_time 

611 print(f"Took {dtime:0.4f}s to read {full_read_data.shape[-1]} samples") 

612 self.last_acquisition_time = current_time 

613 return full_read_data 

614 

615 def read_remaining(self): 

616 """Method to read the rest of the data on the acquisition from the hardware""" 

617 return np.zeros( 

618 ( 

619 sum( 

620 [ 

621 1 

622 for acquisition_device, acquisition_dict in self.acquisition_map.items() 

623 for channel_number in acquisition_dict 

624 ] 

625 ), 

626 1, 

627 ) 

628 ) 

629 

630 def stop(self): 

631 """Method to stop the acquisition""" 

632 if self.master_address in self.acquisition_map: 

633 requests.put( 

634 "http://" + self.master_address + "/rest/rec/measurements/stop", timeout=60 

635 ) 

636 for slave_address in self.slave_addresses: 

637 if slave_address in self.acquisition_map: 

638 requests.put("http://" + slave_address + "/rest/rec/measurements/stop", timeout=60) 

639 # Wait for the module state to be recorder streaming 

640 for slave_address in self.slave_addresses: 

641 if slave_address in self.acquisition_map: 

642 wait_for_recorder_state(slave_address, "RecorderStreaming") 

643 if self.master_address in self.acquisition_map: 

644 wait_for_recorder_state(self.master_address, "RecorderStreaming") 

645 # Join the processes 

646 for acquisition_device, process in self.processes.items(): 

647 print(f"Recovering process {acquisition_device}") 

648 process.join(timeout=5) 

649 if process.is_alive(): 

650 process.terminate() 

651 process.join() 

652 print(f"Process {acquisition_device} recovered") 

653 print("All processes recovered, ready for next acquire.") 

654 self.processes = {} 

655 self.process_data_queues = {} 

656 self.interpretations = None 

657 self.last_acquisition_time = None 

658 

659 def close(self): 

660 """Method to close down the hardware""" 

661 if len(self.processes) > 0: # This means we are still running! 

662 self.stop() 

663 

664 def _get_states(self): 

665 for host in list(self.slave_addresses) + [self.master_address]: 

666 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60) 

667 state_data = response.json() 

668 print( 

669 f"Host {host}: Recorder State {state_data['moduleState']}, Input State " 

670 f"{state_data['inputStatus']}, PTP State {state_data['ptpStatus']}, Recording Mode" 

671 ) 

672 

673 def _reboot_all(self): 

674 for host in list(self.slave_addresses) + [self.master_address]: 

675 requests.put("http://" + host + "/rest/rec/reboot", timeout=60) 

676 

677 

678class LanXIOutput(HardwareOutput): 

679 """Abstract class defining the interface between the controller and output 

680 

681 This class defines the interfaces between the controller and the 

682 output or source portion of the hardware. It is run by the Output 

683 process, and must define how to get write data to the hardware from the 

684 control system""" 

685 

686 def __init__(self, maximum_processes): 

687 """Method to start up the hardware""" 

688 self.sockets = {} 

689 self.acquisition_map = {} 

690 self.output_map = {} 

691 self.master_address = None 

692 self.slave_addresses = set([]) 

693 self.oversample_factor = None 

694 self.output_rate = None 

695 self.bandwidth_string = None 

696 self.transfer_size = 4096 * 4 

697 self.sample_rate = None 

698 self.write_size = None 

699 self.empty_time = 0.0 

700 self.generator_sample_rate = None 

701 self.buffer_size = 5 

702 self.ready_signal_factor = BUFFER_SIZE 

703 self.maximum_processes = maximum_processes 

704 

705 def set_up_data_output_parameters_and_channels( 

706 self, test_data: DataAcquisitionParameters, channel_data: List[Channel] 

707 ): 

708 # Create a hardware map that will help us do bookkeeping 

709 create_harware_maps(self.acquisition_map, self.output_map, channel_data) 

710 self.write_size = test_data.samples_per_write 

711 self.sample_rate = test_data.sample_rate 

712 # Go through the channel table and get the hardware and channel 

713 # information 

714 host_addresses = [channel.physical_device for channel in channel_data] 

715 host_addresses += [ 

716 channel.feedback_device 

717 for channel in channel_data 

718 if ( 

719 not (channel.feedback_device is None) 

720 and not (channel.feedback_device.strip() == "") 

721 ) 

722 ] 

723 self.master_address = host_addresses[0] 

724 self.slave_addresses = set( 

725 [address for address in host_addresses if not address == self.master_address] 

726 ) 

727 print("\nInitial States:") 

728 self._get_states() 

729 

730 # time.sleep(10) 

731 

732 # Close all devices to start from scratch 

733 print("Resetting Data Acquisition System") 

734 self.close(reboot=False) 

735 

736 # time.sleep(10) 

737 

738 # If there are any slave addresses, need to perform PTP sync 

739 if len(self.slave_addresses) > 0: 

740 print("PTP Mode") 

741 master_json = { 

742 "synchronization": { 

743 "mode": "ptp", 

744 "domain": 42, 

745 "preferredMaster": True, 

746 } 

747 } 

748 requests.put( 

749 "http://" + self.master_address + "/rest/rec/syncmode", json=master_json, timeout=60 

750 ) 

751 slave_json = { 

752 "synchronization": { 

753 "mode": "ptp", 

754 "domain": 42, 

755 "preferredMaster": False, 

756 } 

757 } 

758 for slave_address in self.slave_addresses: 

759 requests.put( 

760 "http://" + slave_address + "/rest/rec/syncmode", json=slave_json, timeout=60 

761 ) 

762 print("Waiting for PTP Sync...") 

763 # Wait until PTP locks 

764 for slave_address in self.slave_addresses: 

765 wait_for_ptp_state(slave_address, "Locked") 

766 wait_for_ptp_state(self.master_address, "Locked") 

767 print("PTP Synced!") 

768 single_module = False 

769 else: 

770 print("Single Module Mode") 

771 master_json = {"synchronization": {"mode": "stand-alone"}} 

772 requests.put( 

773 "http://" + self.master_address + "/rest/rec/syncmode", json=master_json, timeout=60 

774 ) 

775 single_module = True 

776 print("\nStates after synchronization") 

777 self._get_states() 

778 

779 # Now we open the recorders 

780 open_json = { 

781 # May need to investigate this further, but for now we won't use TEDS 

782 "performTransducerDetection": False, 

783 "singleModule": single_module, 

784 } 

785 for slave_address in self.slave_addresses: 

786 requests.put("http://" + slave_address + "/rest/rec/open", json=open_json, timeout=60) 

787 requests.put("http://" + self.master_address + "/rest/rec/open", json=open_json, timeout=60) 

788 print("\nStates after Open") 

789 self._get_states() 

790 

791 # Now get the sample rate 

792 for i, address in enumerate(self.acquisition_map): 

793 response = requests.get("http://" + address + "/rest/rec/module/info", timeout=60) 

794 module_info = response.json() 

795 if i == 0: 

796 supported_sample_rates = module_info["supportedSampleRates"] 

797 else: 

798 supported_sample_rates = [ 

799 v for v in supported_sample_rates if v in module_info["supportedSampleRates"] 

800 ] 

801 print(f"Supported Sample Rates {supported_sample_rates}") 

802 bandwidth = test_data.sample_rate / 2.56 

803 if bandwidth > 1000: 

804 self.bandwidth_string = f"{bandwidth / 1000:0.1f} kHz" 

805 else: 

806 self.bandwidth_string = str(round(bandwidth)) + " Hz" 

807 print(f"Sample Rate: {test_data.sample_rate} Hz (Bandwidth {self.bandwidth_string})") 

808 if test_data.sample_rate not in supported_sample_rates: 

809 raise LanXIError( 

810 f"Invalid Sample Rate {test_data.sample_rate}, must be one of " 

811 f"{supported_sample_rates}" 

812 ) 

813 

814 # Get the Generator Sample Rate 

815 self.generator_sample_rate = round(np.log2(OUTPUT_RATE / test_data.sample_rate)) 

816 if self.generator_sample_rate > 3: 

817 self.generator_sample_rate = 3 

818 elif self.generator_sample_rate < 0: 

819 self.generator_sample_rate = 0 

820 self.output_rate = OUTPUT_RATE // 2 ** (self.generator_sample_rate) 

821 # Now prep the generators 

822 self.set_generators() 

823 

824 # Now we need to set up the recording configuration 

825 for slave_address in self.slave_addresses: 

826 if slave_address in self.acquisition_map: 

827 requests.put("http://" + slave_address + "/rest/rec/create", timeout=60) 

828 else: 

829 print( 

830 f"Skipping Creating Slave Address Recorder {slave_address}, not in acquisition" 

831 ) 

832 if self.master_address in self.acquisition_map: 

833 requests.put("http://" + self.master_address + "/rest/rec/create", timeout=60) 

834 else: 

835 print( 

836 f"Skipping Creating Master Address Recorder " 

837 f"{self.master_address}, not in acquisition" 

838 ) 

839 for slave_address in self.slave_addresses: 

840 if slave_address in self.acquisition_map: 

841 wait_for_recorder_state(slave_address, "RecorderConfiguring") 

842 if self.master_address in self.acquisition_map: 

843 wait_for_recorder_state(self.master_address, "RecorderConfiguring") 

844 print("\nStates after Recorder Create") 

845 self._get_states() 

846 

847 # State is now in Recorder Configuring 

848 print("Recorder in Configuring State") 

849 # Now we have to go through and create the channels 

850 for acquisition_device, device_dictionary in self.acquisition_map.items(): 

851 response = requests.get( 

852 "http://" + acquisition_device + "/rest/rec/channels/input/default", timeout=60 

853 ) 

854 channel_settings = response.json() 

855 # Go through and disable all channels 

856 for channel_json in channel_settings["channels"]: 

857 channel_json["enabled"] = False 

858 for channel_number, (_, channel) in device_dictionary.items(): 

859 _, channel_json = [ 

860 (i, channel_json) 

861 for i, channel_json in enumerate(channel_settings["channels"]) 

862 if channel_json["channel"] == channel_number 

863 ][0] 

864 channel_json["bandwidth"] = self.bandwidth_string 

865 channel_json["ccld"] = False if channel.excitation_source is None else True 

866 channel_json["transducer"]["requiresCcld"] = channel_json["ccld"] 

867 if channel_json["ccld"]: 

868 print(f"Device {acquisition_device} channel {channel_number} has CCLD enabled") 

869 channel_json["destinations"] = ["socket"] 

870 channel_json["enabled"] = True 

871 channel_coupling = "DC" if channel.coupling is None else channel.coupling 

872 if channel_coupling not in VALID_FILTERS: 

873 raise LanXIError(f"For LAN-XI, Coupling must be sent to one of {VALID_FILTERS}") 

874 channel_json["filter"] = channel_coupling 

875 if channel.maximum_value not in VALID_RANGES: 

876 raise LanXIError(f"For LAN-XI, Maximum Value must be one of {VALID_RANGES}") 

877 channel_json["range"] = channel.maximum_value + " Vpeak" 

878 channel_json["transducer"]["sensitivity"] = float(channel.sensitivity) / 1000 

879 # The metadata doesn't really matter here, so we just use an arbitrary number 

880 # Otherwise it shold be something like this: 

881 # ('' if channel.serial_number is None else channel.serial_number) 

882 # +('' if channel.triax_dof is None else channel.triax_dof) 

883 channel_json["transducer"]["serialNumber"] = 9999 

884 channel_json["transducer"]["type"]["model"] = ( 

885 "" if channel.make is None else channel.make 

886 ) + ("" if channel.model is None else " " + channel.model) 

887 channel_json["transducer"]["unit"] = channel.unit 

888 response = requests.put( 

889 "http://" + acquisition_device + "/rest/rec/channels/input", 

890 json=channel_settings, 

891 timeout=60, 

892 ) 

893 print( 

894 f"Setting inputs to {acquisition_device} Channels, {response.status_code} " 

895 f"{response.text}" 

896 ) 

897 print("\nStates after Channel Input") 

898 self._get_states() 

899 

900 # Now check for synchronization 

901 if len(self.slave_addresses) > 0: 

902 for slave_address in self.slave_addresses: 

903 if slave_address in self.acquisition_map: 

904 wait_for_input_state(slave_address, "Settled") 

905 if self.master_address in self.acquisition_map: 

906 wait_for_input_state(self.master_address, "Settled") 

907 print("Recorder Settled, Synchronizing...") 

908 

909 for slave_address in self.slave_addresses: 

910 if slave_address in self.acquisition_map: 

911 requests.put("http://" + slave_address + "/rest/rec/synchronize", timeout=60) 

912 if self.master_address in self.acquisition_map: 

913 requests.put("http://" + self.master_address + "/rest/rec/synchronize", timeout=60) 

914 for slave_address in self.slave_addresses: 

915 if slave_address in self.acquisition_map: 

916 wait_for_input_state(slave_address, "Synchronized") 

917 if self.master_address in self.acquisition_map: 

918 wait_for_input_state(self.master_address, "Synchronized") 

919 print("Recorder Synchronized, Starting Streaming...") 

920 

921 for slave_address in self.slave_addresses: 

922 if slave_address in self.acquisition_map: 

923 requests.put("http://" + slave_address + "/rest/rec/startstreaming", timeout=60) 

924 if self.master_address in self.acquisition_map: 

925 requests.put( 

926 "http://" + self.master_address + "/rest/rec/startstreaming", timeout=60 

927 ) 

928 

929 # Wait for the module state to be recorder streaming 

930 for slave_address in self.slave_addresses: 

931 if slave_address in self.acquisition_map: 

932 wait_for_recorder_state(slave_address, "RecorderStreaming") 

933 if self.master_address in self.acquisition_map: 

934 wait_for_recorder_state(self.master_address, "RecorderStreaming") 

935 print("Recorder Streaming") 

936 self._get_states() 

937 

938 print("\n\nData Acquisition Ready for Acquire") 

939 

940 def start(self): 

941 """Method to start outputting data to the hardware""" 

942 self.empty_time += time.time() 

943 # Now start the generators 

944 master_json = None 

945 for generator_device, generator_channel_dict in self.output_map.items(): 

946 json = { 

947 "outputs": [{"number": channel_number} for channel_number in generator_channel_dict] 

948 } 

949 if generator_device == self.master_address: 

950 master_json = ( 

951 json # Pull this out because the master should be assigned last I think. 

952 ) 

953 continue 

954 requests.put( 

955 "http://" + generator_device + "/rest/rec/generator/start", json=json, timeout=60 

956 ) 

957 if master_json is not None: 

958 requests.put( 

959 "http://" + self.master_address + "/rest/rec/generator/start", 

960 json=master_json, 

961 timeout=60, 

962 ) 

963 print("States after Generator Started") 

964 self._get_states() 

965 

966 def write(self, data): 

967 """Method to write a frame of data to the hardware""" 

968 for output_device, socket_dict in self.sockets.items(): 

969 for channel_number, socket_handle in socket_dict.items(): 

970 output_index, _ = self.output_map[output_device][channel_number] 

971 this_data = (data[output_index] / 10 * 8372224).astype("int32").tobytes() 

972 while len(this_data) > 0: 

973 sent_bytes = socket_handle.send(this_data[: self.transfer_size]) 

974 this_data = this_data[sent_bytes:] 

975 self.empty_time += self.write_size / self.output_rate 

976 

977 def stop(self): 

978 """Method to stop the output""" 

979 master_json = None 

980 for generator_device, generator_channel_dict in self.output_map.items(): 

981 json = { 

982 "outputs": [{"number": channel_number} for channel_number in generator_channel_dict] 

983 } 

984 if generator_device == self.master_address: 

985 master_json = ( 

986 json # Pull this out because the master should be assigned last I think. 

987 ) 

988 continue 

989 requests.put( 

990 "http://" + generator_device + "/rest/rec/generator/stop", json=json, timeout=60 

991 ) 

992 if master_json is not None: 

993 requests.put( 

994 "http://" + self.master_address + "/rest/rec/generator/stop", 

995 json=master_json, 

996 timeout=60, 

997 ) 

998 self.empty_time = 0.0 

999 self.set_generators() 

1000 

1001 def set_generators(self): 

1002 """Sets the generator states""" 

1003 if len(self.output_map) == 0: 

1004 return 

1005 master_json = None 

1006 for generator_device, generator_channel_dict in self.output_map.items(): 

1007 json = { 

1008 "outputs": [{"number": channel_number} for channel_number in generator_channel_dict] 

1009 } 

1010 if generator_device == self.master_address: 

1011 master_json = ( 

1012 json # Pull this out because the master should be assigned last I think. 

1013 ) 

1014 continue 

1015 requests.put( 

1016 "http://" + generator_device + "/rest/rec/generator/prepare", json=json, timeout=60 

1017 ) 

1018 if master_json is not None: 

1019 requests.put( 

1020 "http://" + self.master_address + "/rest/rec/generator/prepare", 

1021 json=master_json, 

1022 timeout=60, 

1023 ) 

1024 print("\nStates after Generator Prepare") 

1025 self._get_states() 

1026 

1027 # Configure the generator channels 

1028 master_json = None 

1029 for generator_device, generator_channel_dict in self.output_map.items(): 

1030 json = { 

1031 "bufferSize": self.buffer_size * self.write_size, # TODO: Re-evaluate this number 

1032 "outputs": [ 

1033 { 

1034 "number": channel_number, 

1035 "floating": False, 

1036 "gain": 1.0, 

1037 "inputs": [ 

1038 { 

1039 "number": 1, 

1040 "signalType": "stream", 

1041 "gain": 1.0, 

1042 "offset": 0.0, 

1043 "samplingRate": self.generator_sample_rate, 

1044 }, 

1045 {"number": 2, "signalType": "none"}, 

1046 ], 

1047 } 

1048 for channel_number in generator_channel_dict 

1049 ], 

1050 } 

1051 if generator_device == self.master_address: 

1052 # Pull this out because the master should be assigned last I think. 

1053 master_json = json 

1054 continue 

1055 requests.put( 

1056 "http://" + generator_device + "/rest/rec/generator/output", json=json, timeout=60 

1057 ) 

1058 if master_json is not None: 

1059 requests.put( 

1060 "http://" + self.master_address + "/rest/rec/generator/output", 

1061 json=master_json, 

1062 timeout=60, 

1063 ) 

1064 print("\nStates after Generator Output") 

1065 self._get_states() 

1066 

1067 # Now pull the socket information for the outputs 

1068 for generator_device, generator_dict in self.output_map.items(): 

1069 response = requests.get( 

1070 "http://" + generator_device + "/rest/rec/generator/output", timeout=60 

1071 ) 

1072 output_data = response.json() 

1073 for channel_number in generator_dict: 

1074 output = [out for out in output_data["outputs"] if out["number"] == channel_number][ 

1075 0 

1076 ] 

1077 if generator_device not in self.sockets: 

1078 self.sockets[generator_device] = {} 

1079 

1080 is_ipv4 = re.search(IPV4_PATTERN, generator_device) is not None 

1081 is_ipv6 = re.search(IPV6_PATTERN, generator_device) is not None 

1082 if is_ipv4: 

1083 self.sockets[generator_device][output["number"]] = socket.socket( 

1084 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP 

1085 ) 

1086 elif is_ipv6: 

1087 self.sockets[generator_device][output["number"]] = socket.socket( 

1088 socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP 

1089 ) 

1090 else: # This will crash but is fixed in overhaul version so... 

1091 self.sockets[generator_device][output["number"]] = socket.socket( 

1092 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP 

1093 ) 

1094 self.sockets[generator_device][output["number"]].connect( 

1095 (generator_device, output["inputs"][0]["port"]) 

1096 ) 

1097 print( 

1098 f"Output Connected to Device {generator_device} Channel {output['number']} " 

1099 f"on Port {output['inputs'][0]['port']}" 

1100 ) 

1101 self.oversample_factor = round( 

1102 OUTPUT_RATE / (2 ** output["inputs"][0]["samplingRate"]) / self.sample_rate 

1103 ) 

1104 print(f"Output overampling factor: {self.oversample_factor}x") 

1105 

1106 def close(self, reboot=False): 

1107 """Method to close down the hardware""" 

1108 for _, socket_dict in self.sockets.items(): 

1109 for _, socket_handle in socket_dict.items(): 

1110 socket_handle.close() 

1111 if reboot: 

1112 self._reboot_all() 

1113 else: 

1114 self._close_recorders( 

1115 [self.master_address] + [address for address in self.slave_addresses] 

1116 ) 

1117 # self._reboot_all() 

1118 # self._close_recorder(self.master_address) 

1119 # for slave_address in self.slave_addresses: 

1120 # self._close_recorder(slave_address) 

1121 

1122 def ready_for_new_output(self): 

1123 """Method that returns true if the hardware should accept a new signal""" 

1124 # print('Time until output buffer empty {:}, time per write {:}'.format( 

1125 # self.empty_time - time.time(),self.write_size / self.output_rate)) 

1126 if (self.empty_time - time.time()) < ( 

1127 self.write_size / self.output_rate 

1128 ) * self.ready_signal_factor: # TODO: Might need to increase buffer 

1129 # print('Need new output') 

1130 return True 

1131 else: 

1132 # print('No output needed') 

1133 return False 

1134 

1135 def _close_recorders(self, hosts): 

1136 with mp.Pool( 

1137 len(hosts) 

1138 ) as pool: # Not sure if this can be len(hosts) or if it should be self.maximum_processes 

1139 pool.map(close_recorder, hosts) 

1140 # host_states = {} 

1141 # while True: 

1142 # for host in hosts: 

1143 # # print('Getting state from host {:}'.format(host)) 

1144 # response = requests.get('http://'+host+'/rest/rec/onchange') 

1145 # state_data = response.json() 

1146 # current_state = state_data['moduleState'] 

1147 # # print('Got state from host {:}'.format(host)) 

1148 # if host in host_states and host_states[host] == current_state: 

1149 # continue 

1150 # host_states[host] = current_state 

1151 # try: 

1152 # operation = LANXI_STATE_SHUTDOWN[current_state] 

1153 # except KeyError: 

1154 # print('Unknown State {:} for host {:}. Rebooting'.format(current_state,host)) 

1155 # requests.put('http://'+host+'/rest/rec/reboot') 

1156 # continue 

1157 # if not operation is None: 

1158 # print('Host {:} at {:} state: {:}'.format(host,current_state,operation)) 

1159 # requests.put('http://'+host+operation) 

1160 # # Check if all hosts are idle 

1161 # if all([v == 'Idle' for k,v in host_states.items()]): 

1162 # print('All hosts are idle') 

1163 # break 

1164 # time.sleep(0.2) 

1165 

1166 def _get_states(self): 

1167 for host in list(self.slave_addresses) + [self.master_address]: 

1168 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60) 

1169 state_data = response.json() 

1170 print( 

1171 f"Host {host}: Recorder State {state_data['moduleState']}, Input State " 

1172 f"{state_data['inputStatus']}, PTP State {state_data['ptpStatus']}, Recording Mode" 

1173 ) 

1174 

1175 def _reboot_all(self): 

1176 for host in list(self.slave_addresses) + [self.master_address]: 

1177 requests.put("http://" + host + "/rest/rec/reboot", timeout=60)