import bisect
import time
from typing import Final
import serial
from serial.tools import list_ports
from peat import config, consts, log, state, utils
# Serial port connections if open, or None if not, keyed by address strings
serial_cxns = {}
# List of standardized baud rates
std_b: Final[list[int]] = [
50,
75,
110,
134,
150,
200,
300,
600,
1200,
1800,
2400,
4800,
9600,
19200,
38400,
57600,
115200,
]
[docs]
def find_serial_ports(filter_list: list[str] | None = None) -> list[str]:
"""
Find host serial ports enumerated by the operating system.
Filter by specified list, if any.
"""
log.info(f"Searching for active serial ports... (filter_list: {filter_list})")
try:
listed_ports = [port.device for port in list_ports.comports()]
except Exception as ex:
log.warning(f"Failed to scan for serial ports: {ex}")
return []
if not listed_ports:
log.warning("No serial ports on host")
return []
if filter_list is None:
filter_list = listed_ports
listed_ports.sort()
active_port_set = set()
for serial_port in list(set(filter_list) & set(listed_ports)):
try:
log.info(f"Trying serial port {serial_port}")
# Don't bother with parameters, just see if it opens
ser = serial.Serial(port=serial_port)
if ser.isOpen():
active_port_set.add(serial_port)
log.info(f"Found active serial port: {serial_port}")
ser.close()
except serial.SerialException as ex:
handle_scan_serial_exception(serial_port, ex)
except OSError:
log.warning(f"Hardware error accessing port {serial_port}, aborting")
return []
except Exception as ex:
log.warning(f"Unknown error accessing port {serial_port}: {ex}")
return sorted(active_port_set)
[docs]
def open_serial_port(address: str, baudrate: int, timeout: float) -> bool:
"""
Open the specified serial port.
Since the code where the connection should be closed may not be
able to communicate with the code where it needs to be opened,
just be robust about closing and re-opening the connection.
Args:
address: The address string of the port to close
baudrate: The baud rate
timeout: The timeout in seconds
Returns:
:obj:`True` if the port is opened successfully, :obj:`False` otherwise
"""
log.info(f"Attempting to open serial port {address} at baudrate {baudrate}")
close_serial_port(address)
try:
serial_cxns[address] = serial.Serial(
port=address,
baudrate=baudrate,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=timeout,
write_timeout=timeout,
)
if serial_cxns[address].isOpen():
log.info(f"Opened serial port {address} at {baudrate} baud")
return True
else:
log.debug(f"Couldn't open serial port {address} at baud {baudrate}")
except serial.SerialException as ex:
raise ex
except Exception as ex:
log.debug(f"Error opening serial port {address} at baud {baudrate}: {ex}")
return False
[docs]
def close_serial_port(address: str) -> bool:
"""
Close the specified serial port.
Handle closing an un-opened port gracefully.
Args:
address: The address string of the port to close
Returns:
:obj:`True` if the port is closed successfully, :obj:`False` otherwise
"""
if address not in serial_cxns or serial_cxns[address] is None:
return True
if not serial_cxns[address].isOpen():
serial_cxns[address] = None
return True
try:
serial_cxns[address].close()
serial_cxns[address] = None
return True
except Exception as ex:
log.error(f"Error closing serial port {address}: {ex}")
return False
[docs]
def port_nums_to_addresses(port_values: list[str]) -> list[str]:
"""
Converts a list of mixed port value strings to a list of unique serial
port address strings.
.. code-block:: python
>>> import platform
>>> from peat.protocols.serial import port_nums_to_addresses
>>> platform.system() # doctest: +SKIP
'Linux'
>>> port_nums_to_addresses(["0-1", "0", "1-1"]) # doctest: +SKIP
['/dev/ttyS0', '/dev/ttyS1', '/dev/ttyUSB0', '/dev/ttyUSB1']
>>> port_nums_to_addresses(["/dev/ttyS0", "/dev/ttyUSB0"]) # doctest: +SKIP
['/dev/ttyS0', '/dev/ttyUSB0']
Args:
port_values: Mixed port number or dash-separated range strings to convert
Returns:
Sorted :class:`list` of unique platform-specific serial port address strings
"""
if not isinstance(port_values, list):
log.critical(f"Expected a list of serial port values, got '{type(port_values).__name__}'")
state.error = True
return []
port_addr_set = set()
try:
for value in port_values:
v = value.strip().split("-")
if len(v) == 1 and isint(v[0]):
# single number - convert to port string(s) and add to set
port_addr_set.update(platform_port_fmt(int(v[0])))
elif len(v) == 1 and ("/" in v[0] or "COM" in v[0]):
# platform-specific port string, e.g. /dev/ttyS0 or COM0
port_addr_set.add(v[0].strip())
elif len(v) == 2 and isint(v[0]) and isint(v[1]) and int(v[0]) <= int(v[1]):
# ascending range - convert each number in it and add to set
for i in range(int(v[0]), int(v[1]) + 1):
port_addr_set.update(platform_port_fmt(i))
else:
log.warning(f"Unable to parse port value: {value}")
except OSError as err:
log.error(f"Failed to convert serial port strings: {err}")
return []
sorted_ports = sorted(port_addr_set)
log.info(
f"Converted {len(port_values)} strings to {len(sorted_ports)} "
f"platform-specific serial ports"
)
log.trace(
f"Converted {len(port_values)} serial port strings"
f"\nInput: {port_values}\nOutput: {sorted_ports}"
)
return sorted_ports
[docs]
def isint(s: str) -> bool:
"""
If a string is a valid :class:`int`.
"""
try:
int(s)
return True
except ValueError:
return False
[docs]
def parse_baudrates(baudrate_values: list[str]) -> list[int]:
"""
Converts a list of mixed baud rate value strings to a list of
standardized baud rates with duplicates removed.
.. code-block:: python
>>> from peat.protocols.serial import parse_baudrates
>>> parse_baudrates(["9600"])
[9600]
>>> parse_baudrates(["9600-115200"])
[9600, 19200, 38400, 57600, 115200]
>>> parse_baudrates(["9600-115200", "57600"])
[9600, 19200, 38400, 57600, 115200]
Args:
baudrate_values: Mixed baud rate number or dash-separated
range strings to convert
Returns:
A :class:`list` of unique standardized baud rate integers sorted in
reverse order (highest to lowest)
"""
# TODO: generalize and deduplicate the core logic between parse_baudrates
# and port_nums_to_addresses, they're doing basically the same thing
# with slightly different calls.
if not isinstance(baudrate_values, list):
log.critical(
f"Expected a list of baud rate values, got '{type(baudrate_values).__name__}'"
)
state.error = True
return []
baudrate_set = set()
for value in baudrate_values:
v = value.strip().split("-")
if len(v) == 1 and isint(v[0]):
# single number - standardize and add to set
baudrate_set.add(std_b[std_b_idx(int(v[0]))])
elif len(v) == 2 and isint(v[0]) and isint(v[1]) and int(v[0]) <= int(v[1]):
# ascending range - add lowest and highest rates and any in between
for i in range(std_b_idx(int(v[0])), std_b_idx(int(v[1])) + 1):
baudrate_set.add(std_b[i])
else:
log.warning(f"Unable to parse baud rate: {value}")
return sorted(baudrate_set)
[docs]
def std_b_idx(baudrate: int) -> int:
"""
Convert an arbitrary integer to the appropriate ``std_b`` index.
"""
if baudrate >= std_b[-1]:
return len(std_b) - 1
return bisect.bisect_right(std_b, baudrate) - 1
[docs]
def serial_txn(wr_bytes: bytes, address: str) -> bytearray | None:
"""
Performs a serial transaction (writes and then reads).
Args:
wr_bytes: The :class:`bytes` to write
address: The serial port string (``/dev/ttyS1``, ``COM1``, etc)
Returns:
The :class:`bytearray` that was read, if any
"""
if serial_write(wr_bytes, address) >= 0:
time.sleep(0.5)
return serial_read(address)
return None
[docs]
def serial_write(wr_bytes: bytes, address: str) -> int:
"""
Writes bytes to an open serial port.
Args:
wr_bytes: The :class:`bytes` to write
address: The serial port string (``/dev/ttyS1``, ``COM1``, etc)
Returns:
The number of :class:`bytes` written, or ``-1`` if there was an error
"""
if address not in serial_cxns or not serial_cxns[address].isOpen():
log.error(f"Error writing to serial port {address}: port not open")
return -1
try:
serial_cxns[address].reset_input_buffer()
serial_cxns[address].reset_output_buffer()
br = serial_cxns[address].baudrate
if config.DEBUG >= 3:
log.trace3(f"Writing to {address} @ {br} baud: {pretty_hex_bytes(wr_bytes)}")
num_written = serial_cxns[address].write(wr_bytes)
if config.DEBUG >= 3:
log.trace3(f"Wrote {num_written} bytes to {address} @ {br} baud")
num_expected = len(wr_bytes)
if num_written != num_expected:
log.warning(
f"Incomplete write to {address} @ {br} baud, expected "
f"{num_expected} bytes, wrote {num_written} bytes"
)
serial_cxns[address].flush()
return num_written
except Exception as ex:
log.error(f"Error writing to serial port {address}: {ex}")
return -1
[docs]
def serial_read(address: str) -> bytearray | None:
"""
Reads bytes from an open serial port.
Args:
address: The serial port string (``/dev/ttyS1``, ``COM1``, etc)
Returns:
The :class:`bytearray` that was read, including an empty array if
nothing was read but there were no errors, or :obj:`None` if there were errors
"""
if address not in serial_cxns or not serial_cxns[address].isOpen():
log.error(f"Error reading from serial port {address}: port not open")
return None
try:
rd_bytes = bytearray()
while serial_cxns[address].in_waiting > 0:
rd_bytes.extend(serial_cxns[address].read())
time.sleep(0.01)
if config.DEBUG >= 2:
br = serial_cxns[address].baudrate
log.trace2(
f"Read {len(rd_bytes)} bytes from {address} @ {br} baud: "
f"{pretty_hex_bytes(rd_bytes)}"
)
return rd_bytes
except Exception as ex:
log.error(f"Error reading from serial port {address}: {ex}")
[docs]
def pretty_hex_bytes(b: bytearray | bytes) -> str:
if not b:
return "0x None"
return "0x " + " ".join([f"{x:02X}" for x in bytes(b)])
[docs]
def handle_scan_serial_exception(port: str, ex: Exception) -> bool:
"""
Handle pyserial's ``serial.SerialException`` instances.
Returns:
:obj:`True` if the exception is regular, :obj:`False` if it's related to
the port being in use by another program or the port not
existing on the host.
"""
ex_str = str(ex).lower()
is_regular = False
# PermissionError(13, 'Access is denied.', None, 5)
if "access is denied" in ex_str or "permissionerror(13" in ex_str:
# TODO: Linux?
msg = (
"the port is unable to be opened due to a permission issue. "
"Make sure no other programs are currently using the serial "
"port, such as PuTTY or SEL Quickset. The Process Explorer tool "
"(part of Microsoft SysInternals) can be used to see what "
"processes are using a serial port."
)
# If we're admin, then mark port as active and note about assumption
# we're making. Otherwise, it's an error.
if utils.are_we_superuser():
log.warning(
f"Note: {msg} -- since PEAT is running as Administrator "
f"and the port is in use, it is being marked as active."
)
is_regular = True
else:
log.error(f"Failed to scan serial port {port}: {msg}")
# Windows: "FileNotFoundError(2, 'The system cannot find the file specified.', None, 2)"
# Linux: "[Errno 2] No such file or directory: '/dev/ttyUSB4'"
elif "filenotfounderror" in ex_str or "no such file or directory" in ex_str:
log.warning(
f"Failed to scan serial port {port}: the port either isn't connected or doesn't exist"
)
else:
log.warning(f"Unknown SerialException on {port}: {ex}")
is_regular = True
return is_regular
__all__ = [
"close_serial_port",
"find_serial_ports",
"handle_scan_serial_exception",
"open_serial_port",
"parse_baudrates",
"port_nums_to_addresses",
"pretty_hex_bytes",
"serial_read",
"serial_txn",
"serial_write",
]