1# -*- coding: utf-8 -*-
2"""
3Implementation for the HBK LAN-XI Hardware
4
5Rattlesnake Vibration Control Software
6Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC
7(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
8Government retains certain rights in this software.
9
10This program is free software: you can redistribute it and/or modify
11it under the terms of the GNU General Public License as published by
12the Free Software Foundation, either version 3 of the License, or
13(at your option) any later version.
14
15This program is distributed in the hope that it will be useful,
16but WITHOUT ANY WARRANTY; without even the implied warranty of
17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18GNU General Public License for more details.
19
20You should have received a copy of the GNU General Public License
21along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""
23
24import multiprocessing as mp
25import re
26import socket
27import time
28from typing import List
29
30import numpy as np
31import requests
32
33from .abstract_hardware import HardwareAcquisition, HardwareOutput
34from .lanxi_stream import OpenapiHeader, OpenapiMessage
35from .utilities import Channel, DataAcquisitionParameters
36
37OUTPUT_RATE = 131072
38LANXI_STATE_TIMEOUT = 255.0
39LANXI_STATE_REPORT = 10
40VALID_FILTERS = ["DC", "0.7 Hz", "7.0 Hz", "22.4 Hz", "Intensity"]
41VALID_RANGES = ["0.316", "1", "10", "31.6"]
42HEADER_LENGTH = 28
43BUFFER_SIZE = 2.5
44
45LANXI_STATE_SHUTDOWN = {
46 "RecorderRecording": "/rest/rec/measurements/stop",
47 "RecorderStreaming": "/rest/rec/finish",
48 "RecorderOpened": "/rest/rec/close",
49 "Idle": None,
50}
51
52IPV4_PATTERN = r"^((25[0-5]|(2[0-4]|1[0-9]|[1-9]|)[0-9])(\.(?!$)|$)){4}$"
53IPV6_PATTERN = r"\[\s*([0-9a-fA-F]{1,4}:){0,7}(:[0-9a-fA-F]{1,4})*%?\d*\s*\]"
54
55# TO DO List
56# TODO Get responses each time a get or put is done so we know if it was successful
57# TODO Shut down the data acquisition more quickly
58
59
60class LanXIError(Exception):
61 """Exception to signify an error using LAN-XI"""
62
63
64def read_lanxi(socket_handle: socket.socket):
65 """
66 Reads and interprets data from a Lan-XI
67
68 Parameters
69 ----------
70 socket_handle : socket.socket
71 Handle to the socket that the communication is happening over.
72
73 Returns
74 -------
75 message_type : OpenapiStream.Header.EMessageType
76 Enum determining what the data type is
77 data : np.ndarray or dict
78 The data read from the device. Will be a np.ndarray for a signal and
79 a dictionary for a interpretation
80
81 """
82 # print('Reading data from {:}:{:}'.format(*socket_handle.getpeername()))
83 data = socket_handle.recv(HEADER_LENGTH, socket.MSG_WAITALL)
84 if len(data) == 0:
85 raise LanXIError("Socket is not connected anymore")
86 wstream = OpenapiHeader.from_bytes(data)
87 content_length = wstream.message_length + HEADER_LENGTH
88 # We use the header's content_length to collect the rest of a package
89 while len(data) < content_length:
90 packet = socket_handle.recv(content_length - len(data))
91 data += packet
92 # Now we parse the package
93 try:
94 package = OpenapiMessage.from_bytes(data)
95 except EOFError as e:
96 print(f"Data Invalid {data}")
97 raise e
98 if package.header.message_type == OpenapiMessage.Header.EMessageType.e_interpretation:
99 interpretation_dict = {}
100 for interpretation in package.message.interpretations:
101 interpretation_dict[interpretation.descriptor_type] = interpretation.value
102 return (package.header.message_type, interpretation_dict)
103 elif (
104 package.header.message_type == OpenapiMessage.Header.EMessageType.e_signal_data
105 ): # If the data contains signal data
106 array = []
107 for signal in package.message.signals: # For each signal in the package
108 array.append(np.array([x.calc_value for x in signal.values]) / 2**23)
109 return package.header.message_type, np.concatenate(array, axis=-1)
110 # If 'quality data' message, then record information on data quality issues
111 elif package.header.message_type == OpenapiMessage.Header.EMessageType.e_data_quality:
112 ip, port = socket_handle.getpeername()
113 for q in package.message.qualities:
114 if q.validity_flags.overload:
115 print(f"Overload Detected on {ip}:{port}")
116 if q.validity_flags.invalid:
117 print(f"Invalid Data Detected on {ip}:{port}")
118 if q.validity_flags.overrun:
119 print(f"Overrun Detected on {ip}:{port}")
120 return None, None
121 else:
122 raise LanXIError(f"Unknown Message Type: {package.header.message_type}")
123
124
125def lanxi_multisocket_reader(
126 socket_handles: List[socket.socket],
127 active_channels_list: List[int],
128 data_queues: List[mp.queues.Queue],
129):
130 """
131 Reads data from all channels on multiple modules.
132
133 This function is designed to be run by a multiprocessing Process
134
135 Parameters
136 ----------
137 socket_handles : List[socket.socket]
138 A list of the sockets for the cards on this module.
139 active_channels_list : List[int]
140 A list of number of active channels on each module.
141 data_queues : List[mp.queues.Queue]
142 A set of queues to pass data back to the main process.
143
144 """
145 print(
146 "Starting to record from:\n {:}".format(
147 "\n ".join(
148 ["{:}:{:}".format(*socket_handle.getpeername()) for socket_handle in socket_handles]
149 )
150 )
151 )
152 try:
153 while True:
154 for socket_handle, active_channels, data_queue in zip(
155 socket_handles, active_channels_list, data_queues
156 ):
157 socket_data = []
158 socket_data_types = []
159 while len(socket_data) < active_channels:
160 message_type, data = read_lanxi(socket_handle)
161 # print('Reading {:}:{:} Data Type {:}'.format(
162 # *socket_handle.getpeername(),message_type))
163 if message_type is not None:
164 socket_data.append(data)
165 socket_data_types.append(message_type)
166 # Make sure they are all the same type
167 assert all([data_type == socket_data_types[0] for data_type in socket_data_types])
168 if socket_data_types[0] == OpenapiMessage.Header.EMessageType.e_interpretation:
169 # print('{:}:{:} Putting Interpretation to Queue'.format(
170 # *socket_handle.getpeername()))
171 data_queue.put(("Interpretation", socket_data))
172 elif socket_data_types[0] == OpenapiMessage.Header.EMessageType.e_signal_data:
173 # print('{:}:{:} Putting Signal to Queue'.format(
174 # *socket_handle.getpeername()))
175 data_queue.put(("Signal", socket_data))
176 else:
177 raise ValueError(
178 "Unknown Signal Type {:} in {:}:{:}".format( # pylint: disable=consider-using-f-string
179 socket_data_types[0], *socket_handle.getpeername()
180 )
181 )
182 except LanXIError:
183 for socket_handle, data_queue in zip(socket_handles, data_queues):
184 # The socket has closed, so gracefully close down
185 ip, port = socket_handle.getpeername()
186 print(f"Closing Socket {ip}:{port}")
187 while True:
188 try:
189 print(f"Emptying Queue {ip}:{port}")
190 data_queue.get(False)
191 except mp.queues.Empty:
192 print(f"Returning {ip}:{port}")
193 break
194 return
195
196
197def create_harware_maps(acquisition_map, output_map, channel_list):
198 """Creates mapping between the LAN-XI channels and the rattlesnake channel list
199
200 Parameters
201 ----------
202 acquisition_map : dict
203 A dictionary that will be populated with the acquisition map information. The dictionary
204 will be nested, with the first key being the physical device and the second key being the
205 physical channel. The value will be a tuple containing the channel index and the Channel
206 itself.
207 output_map : dict
208 A dictionary that will be populated with the acquisition map information. The dictionary
209 will be nested, with the first key being the feedback device and the second key being the
210 feedback channel. The value will be a tuple containing the channel index and the Channel
211 itself.
212 channel_list : list of Channel objects
213 A list of channels in the rattlesnake test
214 """
215 for i, channel in enumerate(channel_list):
216 if channel.physical_device not in acquisition_map:
217 acquisition_map[channel.physical_device] = {}
218 acquisition_map[channel.physical_device][int(channel.physical_channel)] = (
219 i,
220 channel,
221 )
222 for i, channel in enumerate(
223 [channel for channel in channel_list if channel.feedback_device is not None]
224 ):
225 if channel.feedback_device not in output_map:
226 output_map[channel.feedback_device] = {}
227 output_map[channel.feedback_device][int(channel.feedback_channel)] = i, channel
228
229
230def wait_for_ptp_state(host: str, state: str):
231 """Waits until hardware is at a current state
232
233 Parameters
234 ----------
235 host : str
236 The address of the host to wait for
237 state : str
238 The name of the state to wait until.
239
240 Returns
241 -------
242 bool
243 True if the state has changed, False if the hardware timed out.
244
245 """
246 start_time = time.time()
247 current_state = ""
248 iteration = 0
249 while True:
250 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60)
251 state_data = response.json()
252 current_state = state_data["ptpStatus"]
253 if current_state == state:
254 result = True
255 break
256 if time.time() - start_time > LANXI_STATE_TIMEOUT:
257 result = False
258 break
259 time.sleep(1)
260 iteration += 1
261 if iteration % LANXI_STATE_REPORT == 0:
262 print(f"Host {host} at {current_state} state, waiting for {state}")
263 if not result:
264 raise LanXIError(
265 f"Wait for PTP State {state} timed out on host {host}. Last retrieved "
266 f"state: {current_state}"
267 )
268 return result
269
270
271def wait_for_recorder_state(host: str, state: str):
272 """Waits until hardware is at a current state
273
274 Parameters
275 ----------
276 host : str
277 The address of the host to wait for
278 state : str
279 The name of the state to wait until.
280
281 Returns
282 -------
283 bool
284 True if the state has changed, False if the hardware timed out.
285
286 """
287 start_time = time.time()
288 current_state = ""
289 iteration = 0
290 while True:
291 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60)
292 state_data = response.json()
293 current_state = state_data["moduleState"]
294 if current_state == state:
295 result = True
296 break
297 if current_state == "PostFailed":
298 result = False
299 break
300 if time.time() - start_time > LANXI_STATE_TIMEOUT:
301 result = False
302 break
303 time.sleep(1)
304 iteration += 1
305 if iteration % LANXI_STATE_REPORT == 0:
306 print(f"Host {host} at {current_state} state, waiting for {state}")
307 if not result:
308 raise LanXIError(
309 f"Wait for Recorder State {state} timed out on host {host}. Last retrieved "
310 f"state: {current_state}"
311 )
312 return result
313
314
315def wait_for_input_state(host: str, state: str):
316 """Waits until hardware is at a current state
317
318 Parameters
319 ----------
320 host : str
321 The address of the host to wait for
322 state : str
323 The name of the state to wait until.
324
325 Returns
326 -------
327 bool
328 True if the state has changed, False if the hardware timed out.
329
330 """
331 start_time = time.time()
332 current_state = ""
333 iteration = 0
334 while True:
335 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60)
336 state_data = response.json()
337 current_state = state_data["inputStatus"]
338 if current_state == state:
339 result = True
340 break
341 if time.time() - start_time > LANXI_STATE_TIMEOUT:
342 result = False
343 break
344 time.sleep(1)
345 iteration += 1
346 if iteration % LANXI_STATE_REPORT == 0:
347 print(f"Host {host} at {current_state} state, waiting for {state}")
348 if not result:
349 raise LanXIError(
350 f"Wait for Input State {state} timed out on host {host}. Last retrieved state: "
351 f"{current_state}"
352 )
353 return result
354
355
356def close_recorder(host):
357 """Closes the host based on its current state"""
358 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60)
359 state_data = response.json()
360 current_state = state_data["moduleState"]
361 if current_state == "RecorderRecording":
362 print(f"Stopping Measurement on {host}")
363 requests.put("http://" + host + "/rest/rec/measurements/stop", timeout=60)
364 wait_for_recorder_state(host, "RecorderStreaming")
365 close_recorder(host)
366 elif current_state == "RecorderConfiguring":
367 response = requests.get("http://" + host + "/rest/rec/channels/input/default", timeout=60)
368 channel_settings = response.json()
369 response = requests.put(
370 "http://" + host + "/rest/rec/channels/input", json=channel_settings, timeout=60
371 )
372 wait_for_recorder_state(host, "RecorderStreaming")
373 close_recorder(host)
374 elif current_state == "RecorderStreaming":
375 print(f"Finishing Streaming on {host}")
376 requests.put("http://" + host + "/rest/rec/finish", timeout=60)
377 wait_for_recorder_state(host, "RecorderOpened")
378 close_recorder(host)
379 elif current_state == "RecorderOpened":
380 print(f"Closing Recorder on {host}")
381 requests.put("http://" + host + "/rest/rec/close", timeout=60)
382 wait_for_recorder_state(host, "Idle")
383 close_recorder(host)
384 elif current_state == "Idle":
385 print(f"Recorder {host} Idle")
386 else:
387 raise LanXIError(f"Unknown State {current_state} on {host}")
388 return
389
390
391class LanXIAcquisition(HardwareAcquisition):
392 """Class defining the interface between LAN-XI acquisition and the controller
393
394 This class defines the interfaces between the controller and the
395 data acquisition portion of the hardware. It is run by the Acquisition
396 process, and must define how to get data from the test hardware into the
397 controller."""
398
399 def __init__(self, maximum_processes):
400 """
401 Constructs the LAN-XI Acquisition class and specifies values to null.
402 """
403 self.acquisition_map = {}
404 self.output_map = {}
405 self.sockets = {}
406 self.processes = {}
407 self.process_data_queues = {}
408 self.interpretations = None
409 self.master_address = None
410 self.slave_addresses = set([])
411 self.samples_per_read = None
412 self.last_acquisition_time = None
413 self.maximum_processes = maximum_processes
414 self.modules_per_process = None
415 self.total_processes = None
416 self.acquisition_delay = None
417
418 def set_up_data_acquisition_parameters_and_channels(
419 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
420 ):
421 """
422 Initialize the hardware and set up channels and sampling properties
423
424 The function must create channels on the hardware corresponding to
425 the channels in the test. It must also set the sampling rates.
426
427 Parameters
428 ----------
429 test_data : DataAcquisitionParameters :
430 A container containing the data acquisition parameters for the
431 controller set by the user.
432 channel_data : List[Channel] :
433 A list of ``Channel`` objects defining the channels in the test
434 Returns
435 -------
436 None.
437
438 """
439 # Now create a hardware map that will help us do bookkeeping
440 create_harware_maps(self.acquisition_map, self.output_map, channel_data)
441 # Go through the channel table and get the hardware and channel
442 # information
443 host_addresses = [channel.physical_device for channel in channel_data]
444 host_addresses += [
445 channel.feedback_device
446 for channel in channel_data
447 if (
448 not (channel.feedback_device is None)
449 and not (channel.feedback_device.strip() == "")
450 )
451 ]
452 self.master_address = host_addresses[0]
453 self.slave_addresses = set(
454 [address for address in host_addresses if not address == self.master_address]
455 )
456 self.samples_per_read = test_data.samples_per_read
457 modules_per_process_floor = len(self.acquisition_map) // self.maximum_processes
458 modules_per_process_remainder = len(self.acquisition_map) % self.maximum_processes
459 if modules_per_process_remainder == 0:
460 self.modules_per_process = modules_per_process_floor
461 else:
462 self.modules_per_process = modules_per_process_floor + 1
463 self.total_processes = (len(self.acquisition_map) // self.modules_per_process) + (
464 0 if len(self.acquisition_map) % self.modules_per_process == 0 else 1
465 )
466 self.acquisition_delay = (
467 (BUFFER_SIZE + 2) * test_data.samples_per_write / test_data.output_oversample
468 )
469
470 def start(self):
471 """Method to start acquiring data from the hardware"""
472 self.sockets = {}
473 self.processes = {}
474 self.process_data_queues = {}
475 # Apply the trigger for multi-frame acquisition
476 if len(set(self.acquisition_map) | set(self.output_map)) > 1:
477 requests.put("http://" + self.master_address + "/rest/rec/apply", timeout=60)
478 # Collect the sockets
479 for acquisition_device in self.acquisition_map:
480 response = requests.get(
481 "http://" + acquisition_device + "/rest/rec/destination/socket", timeout=60
482 )
483 port = response.json()["tcpPort"]
484 # Connect to the socket
485 is_ipv4 = re.search(IPV4_PATTERN, acquisition_device) is not None
486 is_ipv6 = re.search(IPV6_PATTERN, acquisition_device) is not None
487 if is_ipv4:
488 self.sockets[acquisition_device] = socket.socket(
489 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP
490 )
491 elif is_ipv6:
492 self.sockets[acquisition_device] = socket.socket(
493 socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP
494 )
495 else: # This will crash but is fixed in overhaul version so...
496 self.sockets[acquisition_device] = socket.socket(
497 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP
498 )
499 self.sockets[acquisition_device].connect((acquisition_device, port))
500 for slave_address in self.slave_addresses:
501 if slave_address in self.acquisition_map:
502 requests.post("http://" + slave_address + "/rest/rec/measurements", timeout=60)
503 if self.master_address in self.acquisition_map:
504 requests.post("http://" + self.master_address + "/rest/rec/measurements", timeout=60)
505 print("Started Measurements")
506 # Wait for the module state to be recorder streaming
507 for slave_address in self.slave_addresses:
508 if slave_address in self.acquisition_map:
509 wait_for_recorder_state(slave_address, "RecorderRecording")
510 if self.master_address in self.acquisition_map:
511 wait_for_recorder_state(self.master_address, "RecorderRecording")
512
513 # Here we need to start the processes
514 # Split it up into multiple processes
515 socket_handles = []
516 active_channels_list = []
517 data_queues = []
518 for acquisition_device, channel_dict in self.acquisition_map.items():
519 self.process_data_queues[acquisition_device] = mp.Queue()
520 active_channels = len(channel_dict)
521
522 socket_handles.append(self.sockets[acquisition_device])
523 active_channels_list.append(active_channels)
524 data_queues.append(self.process_data_queues[acquisition_device])
525
526 if len(socket_handles) % self.modules_per_process == 0:
527 self.processes[acquisition_device] = mp.Process(
528 target=lanxi_multisocket_reader,
529 args=(socket_handles, active_channels_list, data_queues),
530 )
531 self.processes[acquisition_device].start()
532 socket_handles = []
533 active_channels_list = []
534 data_queues = []
535 if len(socket_handles) > 0:
536 self.processes[acquisition_device] = mp.Process(
537 target=lanxi_multisocket_reader,
538 args=(socket_handles, active_channels_list, data_queues),
539 )
540 self.processes[acquisition_device].start()
541 socket_handles = []
542 active_channels_list = []
543 data_queues = []
544
545 def get_acquisition_delay(self) -> int:
546 """
547 Get the number of samples between output and acquisition.
548
549 This function returns the number of samples that need to be read to
550 ensure that the last output is read by the acquisition. If there is
551 buffering in the output, this delay should be adjusted accordingly.
552
553 Returns
554 -------
555 int
556 Number of samples between when a dataset is written to the output
557 and when it has finished playing.
558
559 """
560 return self.acquisition_delay
561
562 def read(self):
563 """Method to read a frame of data from the hardware"""
564 samples = 0
565 full_read_data = []
566 while samples < self.samples_per_read:
567 read_data = []
568 acquisition_indices = []
569 for acquisition_device, _ in self.process_data_queues.items():
570 # We are going to loop until we get a signal which should be every time except the
571 # first, which will pass the interpretation.
572 while True:
573 # Get the data from the queues
574 # print('Reading from queue')
575 data_type, data = self.process_data_queues[acquisition_device].get()
576 if data_type == "Interpretation":
577 if self.interpretations is None:
578 self.interpretations = {}
579 self.interpretations[acquisition_device] = data # Store the interpretation
580 elif data_type == "Signal":
581 for signal, channel_number, interpretation in zip(
582 data,
583 sorted(self.acquisition_map[acquisition_device]),
584 self.interpretations[acquisition_device],
585 ):
586 acquisition_index, _ = self.acquisition_map[acquisition_device][
587 channel_number
588 ]
589 array = (
590 signal
591 * interpretation[
592 OpenapiMessage.Interpretation.EDescriptorType.scale_factor
593 ] # This is the scale factor
594 + interpretation[
595 OpenapiMessage.Interpretation.EDescriptorType.offset
596 ] # This is the offset
597 )
598 read_data.append(array)
599 acquisition_indices.append(acquisition_index)
600 break # Exit the loop because we found the signal
601 # Check if all the data are the same length
602 index_map = np.empty(len(acquisition_indices), dtype=int)
603 index_map[acquisition_indices] = np.arange(len(acquisition_indices))
604 read_data = np.array(read_data)[index_map]
605 full_read_data.append(read_data)
606 samples += read_data.shape[-1]
607 full_read_data = np.concatenate(full_read_data, axis=-1)
608 current_time = time.time()
609 if self.last_acquisition_time is not None:
610 dtime = current_time - self.last_acquisition_time
611 print(f"Took {dtime:0.4f}s to read {full_read_data.shape[-1]} samples")
612 self.last_acquisition_time = current_time
613 return full_read_data
614
615 def read_remaining(self):
616 """Method to read the rest of the data on the acquisition from the hardware"""
617 return np.zeros(
618 (
619 sum(
620 [
621 1
622 for acquisition_device, acquisition_dict in self.acquisition_map.items()
623 for channel_number in acquisition_dict
624 ]
625 ),
626 1,
627 )
628 )
629
630 def stop(self):
631 """Method to stop the acquisition"""
632 if self.master_address in self.acquisition_map:
633 requests.put(
634 "http://" + self.master_address + "/rest/rec/measurements/stop", timeout=60
635 )
636 for slave_address in self.slave_addresses:
637 if slave_address in self.acquisition_map:
638 requests.put("http://" + slave_address + "/rest/rec/measurements/stop", timeout=60)
639 # Wait for the module state to be recorder streaming
640 for slave_address in self.slave_addresses:
641 if slave_address in self.acquisition_map:
642 wait_for_recorder_state(slave_address, "RecorderStreaming")
643 if self.master_address in self.acquisition_map:
644 wait_for_recorder_state(self.master_address, "RecorderStreaming")
645 # Join the processes
646 for acquisition_device, process in self.processes.items():
647 print(f"Recovering process {acquisition_device}")
648 process.join(timeout=5)
649 if process.is_alive():
650 process.terminate()
651 process.join()
652 print(f"Process {acquisition_device} recovered")
653 print("All processes recovered, ready for next acquire.")
654 self.processes = {}
655 self.process_data_queues = {}
656 self.interpretations = None
657 self.last_acquisition_time = None
658
659 def close(self):
660 """Method to close down the hardware"""
661 if len(self.processes) > 0: # This means we are still running!
662 self.stop()
663
664 def _get_states(self):
665 for host in list(self.slave_addresses) + [self.master_address]:
666 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60)
667 state_data = response.json()
668 print(
669 f"Host {host}: Recorder State {state_data['moduleState']}, Input State "
670 f"{state_data['inputStatus']}, PTP State {state_data['ptpStatus']}, Recording Mode"
671 )
672
673 def _reboot_all(self):
674 for host in list(self.slave_addresses) + [self.master_address]:
675 requests.put("http://" + host + "/rest/rec/reboot", timeout=60)
676
677
678class LanXIOutput(HardwareOutput):
679 """Abstract class defining the interface between the controller and output
680
681 This class defines the interfaces between the controller and the
682 output or source portion of the hardware. It is run by the Output
683 process, and must define how to get write data to the hardware from the
684 control system"""
685
686 def __init__(self, maximum_processes):
687 """Method to start up the hardware"""
688 self.sockets = {}
689 self.acquisition_map = {}
690 self.output_map = {}
691 self.master_address = None
692 self.slave_addresses = set([])
693 self.oversample_factor = None
694 self.output_rate = None
695 self.bandwidth_string = None
696 self.transfer_size = 4096 * 4
697 self.sample_rate = None
698 self.write_size = None
699 self.empty_time = 0.0
700 self.generator_sample_rate = None
701 self.buffer_size = 5
702 self.ready_signal_factor = BUFFER_SIZE
703 self.maximum_processes = maximum_processes
704
705 def set_up_data_output_parameters_and_channels(
706 self, test_data: DataAcquisitionParameters, channel_data: List[Channel]
707 ):
708 # Create a hardware map that will help us do bookkeeping
709 create_harware_maps(self.acquisition_map, self.output_map, channel_data)
710 self.write_size = test_data.samples_per_write
711 self.sample_rate = test_data.sample_rate
712 # Go through the channel table and get the hardware and channel
713 # information
714 host_addresses = [channel.physical_device for channel in channel_data]
715 host_addresses += [
716 channel.feedback_device
717 for channel in channel_data
718 if (
719 not (channel.feedback_device is None)
720 and not (channel.feedback_device.strip() == "")
721 )
722 ]
723 self.master_address = host_addresses[0]
724 self.slave_addresses = set(
725 [address for address in host_addresses if not address == self.master_address]
726 )
727 print("\nInitial States:")
728 self._get_states()
729
730 # time.sleep(10)
731
732 # Close all devices to start from scratch
733 print("Resetting Data Acquisition System")
734 self.close(reboot=False)
735
736 # time.sleep(10)
737
738 # If there are any slave addresses, need to perform PTP sync
739 if len(self.slave_addresses) > 0:
740 print("PTP Mode")
741 master_json = {
742 "synchronization": {
743 "mode": "ptp",
744 "domain": 42,
745 "preferredMaster": True,
746 }
747 }
748 requests.put(
749 "http://" + self.master_address + "/rest/rec/syncmode", json=master_json, timeout=60
750 )
751 slave_json = {
752 "synchronization": {
753 "mode": "ptp",
754 "domain": 42,
755 "preferredMaster": False,
756 }
757 }
758 for slave_address in self.slave_addresses:
759 requests.put(
760 "http://" + slave_address + "/rest/rec/syncmode", json=slave_json, timeout=60
761 )
762 print("Waiting for PTP Sync...")
763 # Wait until PTP locks
764 for slave_address in self.slave_addresses:
765 wait_for_ptp_state(slave_address, "Locked")
766 wait_for_ptp_state(self.master_address, "Locked")
767 print("PTP Synced!")
768 single_module = False
769 else:
770 print("Single Module Mode")
771 master_json = {"synchronization": {"mode": "stand-alone"}}
772 requests.put(
773 "http://" + self.master_address + "/rest/rec/syncmode", json=master_json, timeout=60
774 )
775 single_module = True
776 print("\nStates after synchronization")
777 self._get_states()
778
779 # Now we open the recorders
780 open_json = {
781 # May need to investigate this further, but for now we won't use TEDS
782 "performTransducerDetection": False,
783 "singleModule": single_module,
784 }
785 for slave_address in self.slave_addresses:
786 requests.put("http://" + slave_address + "/rest/rec/open", json=open_json, timeout=60)
787 requests.put("http://" + self.master_address + "/rest/rec/open", json=open_json, timeout=60)
788 print("\nStates after Open")
789 self._get_states()
790
791 # Now get the sample rate
792 for i, address in enumerate(self.acquisition_map):
793 response = requests.get("http://" + address + "/rest/rec/module/info", timeout=60)
794 module_info = response.json()
795 if i == 0:
796 supported_sample_rates = module_info["supportedSampleRates"]
797 else:
798 supported_sample_rates = [
799 v for v in supported_sample_rates if v in module_info["supportedSampleRates"]
800 ]
801 print(f"Supported Sample Rates {supported_sample_rates}")
802 bandwidth = test_data.sample_rate / 2.56
803 if bandwidth > 1000:
804 self.bandwidth_string = f"{bandwidth / 1000:0.1f} kHz"
805 else:
806 self.bandwidth_string = str(round(bandwidth)) + " Hz"
807 print(f"Sample Rate: {test_data.sample_rate} Hz (Bandwidth {self.bandwidth_string})")
808 if test_data.sample_rate not in supported_sample_rates:
809 raise LanXIError(
810 f"Invalid Sample Rate {test_data.sample_rate}, must be one of "
811 f"{supported_sample_rates}"
812 )
813
814 # Get the Generator Sample Rate
815 self.generator_sample_rate = round(np.log2(OUTPUT_RATE / test_data.sample_rate))
816 if self.generator_sample_rate > 3:
817 self.generator_sample_rate = 3
818 elif self.generator_sample_rate < 0:
819 self.generator_sample_rate = 0
820 self.output_rate = OUTPUT_RATE // 2 ** (self.generator_sample_rate)
821 # Now prep the generators
822 self.set_generators()
823
824 # Now we need to set up the recording configuration
825 for slave_address in self.slave_addresses:
826 if slave_address in self.acquisition_map:
827 requests.put("http://" + slave_address + "/rest/rec/create", timeout=60)
828 else:
829 print(
830 f"Skipping Creating Slave Address Recorder {slave_address}, not in acquisition"
831 )
832 if self.master_address in self.acquisition_map:
833 requests.put("http://" + self.master_address + "/rest/rec/create", timeout=60)
834 else:
835 print(
836 f"Skipping Creating Master Address Recorder "
837 f"{self.master_address}, not in acquisition"
838 )
839 for slave_address in self.slave_addresses:
840 if slave_address in self.acquisition_map:
841 wait_for_recorder_state(slave_address, "RecorderConfiguring")
842 if self.master_address in self.acquisition_map:
843 wait_for_recorder_state(self.master_address, "RecorderConfiguring")
844 print("\nStates after Recorder Create")
845 self._get_states()
846
847 # State is now in Recorder Configuring
848 print("Recorder in Configuring State")
849 # Now we have to go through and create the channels
850 for acquisition_device, device_dictionary in self.acquisition_map.items():
851 response = requests.get(
852 "http://" + acquisition_device + "/rest/rec/channels/input/default", timeout=60
853 )
854 channel_settings = response.json()
855 # Go through and disable all channels
856 for channel_json in channel_settings["channels"]:
857 channel_json["enabled"] = False
858 for channel_number, (_, channel) in device_dictionary.items():
859 _, channel_json = [
860 (i, channel_json)
861 for i, channel_json in enumerate(channel_settings["channels"])
862 if channel_json["channel"] == channel_number
863 ][0]
864 channel_json["bandwidth"] = self.bandwidth_string
865 channel_json["ccld"] = False if channel.excitation_source is None else True
866 channel_json["transducer"]["requiresCcld"] = channel_json["ccld"]
867 if channel_json["ccld"]:
868 print(f"Device {acquisition_device} channel {channel_number} has CCLD enabled")
869 channel_json["destinations"] = ["socket"]
870 channel_json["enabled"] = True
871 channel_coupling = "DC" if channel.coupling is None else channel.coupling
872 if channel_coupling not in VALID_FILTERS:
873 raise LanXIError(f"For LAN-XI, Coupling must be sent to one of {VALID_FILTERS}")
874 channel_json["filter"] = channel_coupling
875 if channel.maximum_value not in VALID_RANGES:
876 raise LanXIError(f"For LAN-XI, Maximum Value must be one of {VALID_RANGES}")
877 channel_json["range"] = channel.maximum_value + " Vpeak"
878 channel_json["transducer"]["sensitivity"] = float(channel.sensitivity) / 1000
879 # The metadata doesn't really matter here, so we just use an arbitrary number
880 # Otherwise it shold be something like this:
881 # ('' if channel.serial_number is None else channel.serial_number)
882 # +('' if channel.triax_dof is None else channel.triax_dof)
883 channel_json["transducer"]["serialNumber"] = 9999
884 channel_json["transducer"]["type"]["model"] = (
885 "" if channel.make is None else channel.make
886 ) + ("" if channel.model is None else " " + channel.model)
887 channel_json["transducer"]["unit"] = channel.unit
888 response = requests.put(
889 "http://" + acquisition_device + "/rest/rec/channels/input",
890 json=channel_settings,
891 timeout=60,
892 )
893 print(
894 f"Setting inputs to {acquisition_device} Channels, {response.status_code} "
895 f"{response.text}"
896 )
897 print("\nStates after Channel Input")
898 self._get_states()
899
900 # Now check for synchronization
901 if len(self.slave_addresses) > 0:
902 for slave_address in self.slave_addresses:
903 if slave_address in self.acquisition_map:
904 wait_for_input_state(slave_address, "Settled")
905 if self.master_address in self.acquisition_map:
906 wait_for_input_state(self.master_address, "Settled")
907 print("Recorder Settled, Synchronizing...")
908
909 for slave_address in self.slave_addresses:
910 if slave_address in self.acquisition_map:
911 requests.put("http://" + slave_address + "/rest/rec/synchronize", timeout=60)
912 if self.master_address in self.acquisition_map:
913 requests.put("http://" + self.master_address + "/rest/rec/synchronize", timeout=60)
914 for slave_address in self.slave_addresses:
915 if slave_address in self.acquisition_map:
916 wait_for_input_state(slave_address, "Synchronized")
917 if self.master_address in self.acquisition_map:
918 wait_for_input_state(self.master_address, "Synchronized")
919 print("Recorder Synchronized, Starting Streaming...")
920
921 for slave_address in self.slave_addresses:
922 if slave_address in self.acquisition_map:
923 requests.put("http://" + slave_address + "/rest/rec/startstreaming", timeout=60)
924 if self.master_address in self.acquisition_map:
925 requests.put(
926 "http://" + self.master_address + "/rest/rec/startstreaming", timeout=60
927 )
928
929 # Wait for the module state to be recorder streaming
930 for slave_address in self.slave_addresses:
931 if slave_address in self.acquisition_map:
932 wait_for_recorder_state(slave_address, "RecorderStreaming")
933 if self.master_address in self.acquisition_map:
934 wait_for_recorder_state(self.master_address, "RecorderStreaming")
935 print("Recorder Streaming")
936 self._get_states()
937
938 print("\n\nData Acquisition Ready for Acquire")
939
940 def start(self):
941 """Method to start outputting data to the hardware"""
942 self.empty_time += time.time()
943 # Now start the generators
944 master_json = None
945 for generator_device, generator_channel_dict in self.output_map.items():
946 json = {
947 "outputs": [{"number": channel_number} for channel_number in generator_channel_dict]
948 }
949 if generator_device == self.master_address:
950 master_json = (
951 json # Pull this out because the master should be assigned last I think.
952 )
953 continue
954 requests.put(
955 "http://" + generator_device + "/rest/rec/generator/start", json=json, timeout=60
956 )
957 if master_json is not None:
958 requests.put(
959 "http://" + self.master_address + "/rest/rec/generator/start",
960 json=master_json,
961 timeout=60,
962 )
963 print("States after Generator Started")
964 self._get_states()
965
966 def write(self, data):
967 """Method to write a frame of data to the hardware"""
968 for output_device, socket_dict in self.sockets.items():
969 for channel_number, socket_handle in socket_dict.items():
970 output_index, _ = self.output_map[output_device][channel_number]
971 this_data = (data[output_index] / 10 * 8372224).astype("int32").tobytes()
972 while len(this_data) > 0:
973 sent_bytes = socket_handle.send(this_data[: self.transfer_size])
974 this_data = this_data[sent_bytes:]
975 self.empty_time += self.write_size / self.output_rate
976
977 def stop(self):
978 """Method to stop the output"""
979 master_json = None
980 for generator_device, generator_channel_dict in self.output_map.items():
981 json = {
982 "outputs": [{"number": channel_number} for channel_number in generator_channel_dict]
983 }
984 if generator_device == self.master_address:
985 master_json = (
986 json # Pull this out because the master should be assigned last I think.
987 )
988 continue
989 requests.put(
990 "http://" + generator_device + "/rest/rec/generator/stop", json=json, timeout=60
991 )
992 if master_json is not None:
993 requests.put(
994 "http://" + self.master_address + "/rest/rec/generator/stop",
995 json=master_json,
996 timeout=60,
997 )
998 self.empty_time = 0.0
999 self.set_generators()
1000
1001 def set_generators(self):
1002 """Sets the generator states"""
1003 if len(self.output_map) == 0:
1004 return
1005 master_json = None
1006 for generator_device, generator_channel_dict in self.output_map.items():
1007 json = {
1008 "outputs": [{"number": channel_number} for channel_number in generator_channel_dict]
1009 }
1010 if generator_device == self.master_address:
1011 master_json = (
1012 json # Pull this out because the master should be assigned last I think.
1013 )
1014 continue
1015 requests.put(
1016 "http://" + generator_device + "/rest/rec/generator/prepare", json=json, timeout=60
1017 )
1018 if master_json is not None:
1019 requests.put(
1020 "http://" + self.master_address + "/rest/rec/generator/prepare",
1021 json=master_json,
1022 timeout=60,
1023 )
1024 print("\nStates after Generator Prepare")
1025 self._get_states()
1026
1027 # Configure the generator channels
1028 master_json = None
1029 for generator_device, generator_channel_dict in self.output_map.items():
1030 json = {
1031 "bufferSize": self.buffer_size * self.write_size, # TODO: Re-evaluate this number
1032 "outputs": [
1033 {
1034 "number": channel_number,
1035 "floating": False,
1036 "gain": 1.0,
1037 "inputs": [
1038 {
1039 "number": 1,
1040 "signalType": "stream",
1041 "gain": 1.0,
1042 "offset": 0.0,
1043 "samplingRate": self.generator_sample_rate,
1044 },
1045 {"number": 2, "signalType": "none"},
1046 ],
1047 }
1048 for channel_number in generator_channel_dict
1049 ],
1050 }
1051 if generator_device == self.master_address:
1052 # Pull this out because the master should be assigned last I think.
1053 master_json = json
1054 continue
1055 requests.put(
1056 "http://" + generator_device + "/rest/rec/generator/output", json=json, timeout=60
1057 )
1058 if master_json is not None:
1059 requests.put(
1060 "http://" + self.master_address + "/rest/rec/generator/output",
1061 json=master_json,
1062 timeout=60,
1063 )
1064 print("\nStates after Generator Output")
1065 self._get_states()
1066
1067 # Now pull the socket information for the outputs
1068 for generator_device, generator_dict in self.output_map.items():
1069 response = requests.get(
1070 "http://" + generator_device + "/rest/rec/generator/output", timeout=60
1071 )
1072 output_data = response.json()
1073 for channel_number in generator_dict:
1074 output = [out for out in output_data["outputs"] if out["number"] == channel_number][
1075 0
1076 ]
1077 if generator_device not in self.sockets:
1078 self.sockets[generator_device] = {}
1079
1080 is_ipv4 = re.search(IPV4_PATTERN, generator_device) is not None
1081 is_ipv6 = re.search(IPV6_PATTERN, generator_device) is not None
1082 if is_ipv4:
1083 self.sockets[generator_device][output["number"]] = socket.socket(
1084 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP
1085 )
1086 elif is_ipv6:
1087 self.sockets[generator_device][output["number"]] = socket.socket(
1088 socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP
1089 )
1090 else: # This will crash but is fixed in overhaul version so...
1091 self.sockets[generator_device][output["number"]] = socket.socket(
1092 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP
1093 )
1094 self.sockets[generator_device][output["number"]].connect(
1095 (generator_device, output["inputs"][0]["port"])
1096 )
1097 print(
1098 f"Output Connected to Device {generator_device} Channel {output['number']} "
1099 f"on Port {output['inputs'][0]['port']}"
1100 )
1101 self.oversample_factor = round(
1102 OUTPUT_RATE / (2 ** output["inputs"][0]["samplingRate"]) / self.sample_rate
1103 )
1104 print(f"Output overampling factor: {self.oversample_factor}x")
1105
1106 def close(self, reboot=False):
1107 """Method to close down the hardware"""
1108 for _, socket_dict in self.sockets.items():
1109 for _, socket_handle in socket_dict.items():
1110 socket_handle.close()
1111 if reboot:
1112 self._reboot_all()
1113 else:
1114 self._close_recorders(
1115 [self.master_address] + [address for address in self.slave_addresses]
1116 )
1117 # self._reboot_all()
1118 # self._close_recorder(self.master_address)
1119 # for slave_address in self.slave_addresses:
1120 # self._close_recorder(slave_address)
1121
1122 def ready_for_new_output(self):
1123 """Method that returns true if the hardware should accept a new signal"""
1124 # print('Time until output buffer empty {:}, time per write {:}'.format(
1125 # self.empty_time - time.time(),self.write_size / self.output_rate))
1126 if (self.empty_time - time.time()) < (
1127 self.write_size / self.output_rate
1128 ) * self.ready_signal_factor: # TODO: Might need to increase buffer
1129 # print('Need new output')
1130 return True
1131 else:
1132 # print('No output needed')
1133 return False
1134
1135 def _close_recorders(self, hosts):
1136 with mp.Pool(
1137 len(hosts)
1138 ) as pool: # Not sure if this can be len(hosts) or if it should be self.maximum_processes
1139 pool.map(close_recorder, hosts)
1140 # host_states = {}
1141 # while True:
1142 # for host in hosts:
1143 # # print('Getting state from host {:}'.format(host))
1144 # response = requests.get('http://'+host+'/rest/rec/onchange')
1145 # state_data = response.json()
1146 # current_state = state_data['moduleState']
1147 # # print('Got state from host {:}'.format(host))
1148 # if host in host_states and host_states[host] == current_state:
1149 # continue
1150 # host_states[host] = current_state
1151 # try:
1152 # operation = LANXI_STATE_SHUTDOWN[current_state]
1153 # except KeyError:
1154 # print('Unknown State {:} for host {:}. Rebooting'.format(current_state,host))
1155 # requests.put('http://'+host+'/rest/rec/reboot')
1156 # continue
1157 # if not operation is None:
1158 # print('Host {:} at {:} state: {:}'.format(host,current_state,operation))
1159 # requests.put('http://'+host+operation)
1160 # # Check if all hosts are idle
1161 # if all([v == 'Idle' for k,v in host_states.items()]):
1162 # print('All hosts are idle')
1163 # break
1164 # time.sleep(0.2)
1165
1166 def _get_states(self):
1167 for host in list(self.slave_addresses) + [self.master_address]:
1168 response = requests.get("http://" + host + "/rest/rec/onchange", timeout=60)
1169 state_data = response.json()
1170 print(
1171 f"Host {host}: Recorder State {state_data['moduleState']}, Input State "
1172 f"{state_data['inputStatus']}, PTP State {state_data['ptpStatus']}, Recording Mode"
1173 )
1174
1175 def _reboot_all(self):
1176 for host in list(self.slave_addresses) + [self.master_address]:
1177 requests.put("http://" + host + "/rest/rec/reboot", timeout=60)