Source code for peat.modules.woodward.wdw_svl

"""
Woodward proprietary Servlink serial protocol

SVL TCP

- ERROR RESPONSE: 0xE10000
- BAD/EMPTY RESPONSE: 0x0100020001

Authors

- Peter Shurtz
"""

import socket
import struct

from peat import log
from peat.protocols import pretty_hex_bytes, serial_txn

# Sequence bytes indexed by serial address
svl_seq_bytes = {}

SVL_SER_DEFAULT_DAT_DLM = b"\xf0"

SVL_TCP_DEFAULT_PORT = 666
SVL_TCP_DEFAULT_DAT_DLM = b"\x50"
SVL_TCP_DEFAULT_BUF_SIZE = 4096

svl_socket_cxns = {}  # type: dict[str, dict]

# Data type specifiers for data command parameters
svl_data_types = {
    str: {"mask": b"\x00", "len": 0},
    bytes: {"mask": b"\x00", "len": 0},
    bool: {"mask": b"\x01", "len": 3},
    float: {"mask": b"\x02", "len": 6},
    int: {"mask": b"\x04", "len": 6},
}

# System command bytes for _svlser_seq_txn()
svl_sys_cmds = {
    "Protocol": {"cmd": b"\x50", "type": str, "len": 0},  # System\\Protocol
    "System": {"cmd": b"\x51", "type": str, "len": 0},  # System\\System
    "Control": {"cmd": b"\x52", "type": str, "len": 0},  # System\\Control
    "Application": {"cmd": b"\x54", "type": str, "len": 0},  # System\\Application
    "Configuration": {"cmd": b"\x55", "type": str, "len": 0},  # Configuration
    "Product": {"cmd": b"\x57", "type": str, "len": 0},  # System\\Product
}

# Data command bytes for _svlser_seq_txn()
svl_dat_cmds = {
    "write": {"cmd": b"\x25"},
    "read": {"cmd": b"\x28"},
}


def _svlser_sys_txn(address: str, sys_cmd_itm: dict) -> dict:
    raw_bytes = _svlser_seq_txn(address, sys_cmd_itm["cmd"])
    return _svlser_parse_rsp(raw_bytes)


def _svlser_dat_txn(
    address: str,
    dat_cmd_itm: dict,
    type_bytes: bytes | None = None,
    delim: bytes | None = None,
) -> dict:
    raw_bytes = _svlser_seq_txn(address, _svlser_dat_fmt(dat_cmd_itm, type_bytes, delim))
    return _svlser_parse_rsp(raw_bytes)


def _svlser_seq_txn(address: str, wr_bytes: bytes) -> bytearray | None:
    """
    Send a sequential Servlink serial message.
    Also track the Servlink sequence.

    Args:
        address: The serial address
        wr_bytes: The data to write (data payload)
        rfmt: The expected response format
        rlen: The expected response length

    Returns:
        The parsed response
    """
    seq_bytes = _svlser_seq_fmt(address, wr_bytes)
    txn_bytes = _svlser_raw_txn(address, seq_bytes, False)

    if address in svl_seq_bytes and svl_seq_bytes[address] == b"\x21":
        svl_seq_bytes[address] = b"\x20"
    else:
        svl_seq_bytes[address] = b"\x21"

    return txn_bytes  # rsp


def _svlser_raw_txn(address: str, wr_bytes: bytes, reset: bool = False) -> bytearray | None:
    """
    Send a raw Servlink serial message.
    Optionally reset the Servlink sequence.

    Args:
        address: The serial address
        wr_bytes: The data to write (data payload)
        reset: Whether to reset the Servlink sequence

    Returns:
        The raw serial response
    """
    if reset:
        svl_seq_bytes[address] = b"\x21"

    svl_bytes = b"\x00\x00\x00"  # 3-char dead time
    svl_bytes += _svlser_raw_fmt(wr_bytes)
    svl_bytes += b"\x00\x00\x00"  # 3-char dead time

    return serial_txn(svl_bytes, address)


def _svlser_seq_fmt(address: str, cmd_bytes: bytes) -> bytes:
    """
    Apply Servlink sequence byte.
    """
    if address not in svl_seq_bytes:
        log.warning(f"Servlink sequence for {address} not initialized. Initializing...")
        # TODO: should this be SVLTCP_INIT_MSG? using SVLSER_INIT_MSG for now
        _svlser_raw_txn(address, b"\x60\x0f\xff", True)

    return svl_seq_bytes[address] + cmd_bytes


def _svlser_raw_fmt(wr_bytes: bytes) -> bytes:
    """
    Apply Servlink format.
    """
    # Start with initial byte (slave address?)
    svl_bytes = b"\x01"
    # Add content bytes. Sometimes a 0x10 needs to be inserted before and a
    # 0x20 ORed with one or more of these bytes (but NOT the initial 0x01 or
    # the 0x03 delimiter). The CRC must be calculated from the unmodified
    # bytes, so either they must be preserved or the CRC must be calculated
    # first.
    svl_bytes += _svlser_ins_1020(wr_bytes)
    # Add delimiter (?)
    svl_bytes += b"\x03"
    # Calculate the CRC from the unmodified content bytes.
    crc_bytes = _svlser_crc(wr_bytes)
    # Add CRC. Sometimes a similar process to insert a 0x10 before and to OR a
    # 0x20 with one or more bytes must be performed separately on the CRC.
    svl_bytes += _svlser_ins_1020(crc_bytes)

    return svl_bytes


def _svlser_dat_fmt(
    dat_cmd_itm: dict, type_bytes: bytes | None = None, delim: bytes | None = None
) -> bytes:
    # Start with command byte
    svl_bytes = svl_dat_cmds["read"]["cmd"]
    # Add byte composed of mask and type. Oddly, Python does not support
    # bitwise operations on bytes, so convert to int"s and back.
    mask_int = int.from_bytes(dat_cmd_itm["mask"], byteorder="big")
    if type_bytes is None:
        if dat_cmd_itm["type"] in svl_data_types:
            type_bytes = svl_data_types[dat_cmd_itm["type"]]["mask"]
        else:
            type_bytes = b"\x00"
    type_int = int.from_bytes(type_bytes, byteorder="big")
    svl_bytes += (mask_int | type_int).to_bytes(1, byteorder="big")
    # Add delimiter
    if delim is None:
        svl_bytes += SVL_SER_DEFAULT_DAT_DLM
    else:
        svl_bytes += delim
    # Add data address
    svl_bytes += dat_cmd_itm["addr"]

    return svl_bytes


def _svlser_parse_rsp(rd_bytes: bytes | None) -> dict:
    """Parse the Servlink response to transform it to the specified type.
    Multiple data fields are not yet implemented.

    Args:
        rd_bytes: The data to parse (raw response bytes)
        rfmt: The expected response format
        rlen: The expected response length. Use 0 to ignore length.

    Returns:
        The parsed response in the specified format
    """
    ser_rsp = {"hdr": None, "rbytes": None, "crc": None}

    if rd_bytes is None:
        return ser_rsp

    # Remove formatting bytes and get CRC
    if len(rd_bytes) >= 7 and rd_bytes[-4] == 3 and (rd_bytes[-3] == 16 or rd_bytes[-2] == 16):
        # Sometimes the CRC has a 0x10 inserted before and a 0x20 ORed with one
        # of the bytes of the CRC.
        #   0 - echoes byte 0 of command
        #   1 - echoes byte 1 of command
        #  -4 - 0x03 delimiter
        #  -3 - 0x10 or CRC first byte
        #  -2 - 0x10 or CRC first byte ORed with 0x20
        #  -1 - CRC second byte or CRC second byte ORed with 0x20
        calc_bytes = rd_bytes[1:-4]
        rd_crc = _svlser_rmv_1020(rd_bytes[-3:])
        log.trace3(f"0x1020 crc: {pretty_hex_bytes(rd_crc)}")
    elif len(rd_bytes) >= 6:
        # Normal response.
        #   0 - echoes byte 0 of command
        #   1 - echoes byte 1 of command
        #  -3 - 0x03 delimiter
        #  -2 - CRC first byte
        #  -1 - CRC second byte
        calc_bytes = rd_bytes[1:-3]
        rd_crc = rd_bytes[-2:]
    else:
        # Not enough bytes left - no data?
        log.trace3(f"No data found for {pretty_hex_bytes(rd_bytes)}")
        return ser_rsp

    ser_rsp["hdr"] = rd_bytes[0:2]
    ser_rsp["crc"] = rd_crc

    # Sometimes the data has a 0x10 inserted before and a 0x20 ORed with one or
    # more bytes of the data. For example, instead of looking like:
    #       0x 01 21 00 00 01 03 D8 CB
    # it looks like:
    #       0x 01 21 00 00 10 21 03 D8 CB
    # where the real data is "00 00 01", not "00 00 10 21" (the CRC agrees with
    # this as well).
    #
    # Remove 0x1020"s.
    calc_bytes = _svlser_rmv_1020(calc_bytes)  # for checking the CRC
    data_bytes = calc_bytes[1:]  # for everything else
    ser_rsp["rbytes"] = data_bytes

    # Check CRC
    calc_crc = _svlser_crc(calc_bytes)
    if calc_crc != rd_crc:
        log.debug(
            f"Unexpected CRC, expected {pretty_hex_bytes(rd_crc)}, "
            f"got {pretty_hex_bytes(calc_crc)} ({pretty_hex_bytes(calc_bytes)})"
        )

    return ser_rsp


def _svlser_ins_1020(in_bytes: bytes) -> bytearray:
    """
    Various parts of commands need to have a 0x10 inserted before and a 0x20
    ORed with one or more specific byte(s). Do that here.
    Use int's for bitwise
    operations.
    It seems like this is done whenever the following bytes are encountered in
    certain locations (NOT the initial 0x10 or the 0x03 delimiter):
        0x01, 0x03, 0x0D, 0x10, 0x11, 0x13
    """
    # Trigger on 0x01 -> 1, 0x03 -> 3, 0x0D -> 13, 0x10 -> 16, 0x11 -> 17, 0x13 -> 19
    trigger_bytes = [1, 3, 13, 16, 17, 19]

    out_bytes = bytearray()
    for b in in_bytes:
        if b in trigger_bytes:
            out_bytes += b"\x10"
            out_bytes += (b | 32).to_bytes(1, byteorder="big")  # 0x20 -> 32
        else:
            out_bytes += b.to_bytes(1, byteorder="big")

    if in_bytes != out_bytes:
        pretty_in = pretty_hex_bytes(in_bytes)
        pretty_out = pretty_hex_bytes(out_bytes)
        log.trace3(f"0x1020 ins: {pretty_in} -> {pretty_out}")

    return out_bytes


def _svlser_rmv_1020(data_bytes: bytes) -> bytes:
    """
    Various parts of received data have a 0x10 inserted before and a 0x20
    ORed with one or more specific byte(s). Remove those here.
    Use int's forbitwise operations.
    Right now since we don't know the actual criteria for 0x1020's, we'll
    assume that any real 0x10's would have been escaped this way, so any 0x10's
    we see with following byte >= 0x20 indicate a 0x1020 to be removed.
    """
    len_pre = len(data_bytes)
    pretty_pre = pretty_hex_bytes(data_bytes)
    i = 0
    while i < (len(data_bytes) - 1):
        if data_bytes[i] == 16 and data_bytes[i + 1] >= 32:  # 0x10 -> 16, 0x20 -> 32
            # Remove the 0x10 byte at index i
            del data_bytes[i]
            # The 0x20-ORed byte should now be at index i, un-OR it
            data_bytes[i] -= 32  # 0x20 -> 32
        i += 1

    if len(data_bytes) < len_pre:
        pretty_post = pretty_hex_bytes(data_bytes)
        log.trace3(f"0x1020 rmv: {pretty_pre} -> {pretty_post}")

    return data_bytes


def _svlser_crc(data_bytes: bytes) -> bytes:
    """
    Calculate Servlink CRC (little-endian CRC-16-MODBUS of the bytes
    between 0x01 and 0x03).
    Use int's so we can use bitwise operations.

    Args:
        data_bytes: The data bytes from which to calculate the CRC

    Returns:
        The two-byte CRC of data_bytes
    """
    crc = 65535  # 0xFFFF" -> 65535
    for i in range(len(data_bytes)):
        crc ^= data_bytes[i]
        for _j in range(8):
            if (crc & 1) != 0:  # 0x0001" -> 1
                crc >>= 1
                crc ^= 40961  # 0xA001" -> 40961
            else:
                crc >>= 1
    # byteorder="big" is the endianness of the Python int->byte conversion,
    # NOT the endianness of the CRC calculation!
    return crc.to_bytes(2, byteorder="big")


def _svltcp_sys_txn(ip: str, sys_cmd_itm: dict) -> dict:
    raw_bytes = _svltcp_raw_txn(ip, sys_cmd_itm["cmd"])
    return _svltcp_parse_rsp(raw_bytes)


def _svltcp_dat_txn(
    ip: str,
    dat_cmd_itm: dict,
    type_bytes: bytes | None = None,
    delim: bytes | None = None,
) -> dict:
    raw_bytes = _svltcp_raw_txn(ip, _svltcp_dat_fmt(dat_cmd_itm, type_bytes, delim))
    return _svltcp_parse_rsp(raw_bytes)


def _svltcp_raw_txn(ip: str, wr_bytes: bytes) -> bytes | None:
    return svl_socket_txn(ip, _svltcp_raw_fmt(wr_bytes))


def _svltcp_init(ip: str, wr_bytes: bytes) -> bytes:
    if not svl_socket_connected(ip):
        svl_socket_connect(ip)

    svl_socket_cxns[ip]["init"] = svl_socket_txn(ip, wr_bytes)

    return svl_socket_cxns[ip]["init"]


[docs] def svl_socket_txn(ip: str, wr_bytes: bytes) -> bytes | None: if not svl_socket_connected(ip): svl_socket_connect(ip) sock = svl_socket_cxns[ip]["sock"] try: log.trace3(f"WR {ip}: {pretty_hex_bytes(wr_bytes)}") sock.send(wr_bytes) except Exception as err: log.error(f"Error writing to Servlink/TCP on {ip}: {err}") rd_bytes = None try: rd_bytes = sock.recv(svl_socket_cxns[ip]["buf_size"]) log.trace3(f"RD {ip}: {pretty_hex_bytes(rd_bytes)}") except Exception as err: log.error(f"Error reading from Servlink/TCP on {ip}: {err}") return rd_bytes
[docs] def svl_socket_connected(ip: str) -> bool: return ( ip in svl_socket_cxns and "sock" in svl_socket_cxns[ip] and svl_socket_cxns[ip]["sock"] is not None )
[docs] def svl_socket_initialized(ip: str) -> bool: return ( ip in svl_socket_cxns and "init" in svl_socket_cxns[ip] and svl_socket_cxns[ip]["init"] is not None )
[docs] def svl_socket_connect( ip: str, port: int | None = None, timeout: float | None = None, delim: bytes | None = None, bufsize: int | None = None, ) -> socket.socket | None: # Check whether already connected if svl_socket_connected(ip): log.error(f"Already connected to Servlink/TCP on {ip}") return None # Defaults if port is None: port = SVL_TCP_DEFAULT_PORT if timeout is None: timeout = 5 if delim is None: svl_socket_cxns[ip] = {"dat_dlm": SVL_TCP_DEFAULT_DAT_DLM} else: svl_socket_cxns[ip] = {"dat_dlm": delim} if bufsize is None: svl_socket_cxns[ip] = {"buf_size": SVL_TCP_DEFAULT_BUF_SIZE} else: svl_socket_cxns[ip] = {"buf_size": bufsize} # Connect try: svl_socket_cxns[ip]["sock"] = socket.socket(socket.AF_INET, socket.SOCK_STREAM) svl_socket_cxns[ip]["sock"].settimeout(timeout) svl_socket_cxns[ip]["sock"].connect((ip, port)) return svl_socket_cxns[ip]["sock"] except Exception as err: log.error(f"Error connecting to Servlink/TCP on {ip}: {err}") svl_socket_disconnect(ip) return None
[docs] def svl_socket_disconnect(ip: str) -> None: if ip in svl_socket_cxns: if "sock" in svl_socket_cxns[ip] and svl_socket_cxns[ip]["sock"] is not None: try: svl_socket_cxns[ip]["sock"].close() except Exception as err: # Log, but give up - nothing else to do here log.error(f"Error disconnecting from Servlink/TCP on {ip}: {err}") del svl_socket_cxns[ip]
def _svltcp_dat_fmt( dat_cmd_itm: dict, type_bytes: bytes | None = None, delim: bytes | None = None ) -> bytes: # Start with command byte svl_bytes = svl_dat_cmds["read"]["cmd"] # Add byte composed of mask and type. Oddly, Python does not support # bitwise operations on bytes, so convert to int's and back. mask_int = int.from_bytes(dat_cmd_itm["mask"], byteorder="big") if type_bytes is None: if dat_cmd_itm["type"] in svl_data_types: type_bytes = svl_data_types[dat_cmd_itm["type"]]["mask"] else: type_bytes = b"\x00" type_int = int.from_bytes(type_bytes, byteorder="big") svl_bytes += (mask_int | type_int).to_bytes(1, byteorder="big") # Add delimiter if delim is None: svl_bytes += SVL_TCP_DEFAULT_DAT_DLM else: svl_bytes += delim # Add data address svl_bytes += dat_cmd_itm["addr"] return svl_bytes def _svltcp_raw_fmt(wr_bytes: bytes) -> bytes: # Start with initial byte (slave address?) svl_bytes = b"\x01" # Add two-byte count of content bytes. svl_bytes += len(wr_bytes).to_bytes(2, byteorder="big") # Add content bytes. svl_bytes += wr_bytes return svl_bytes def _svltcp_parse_rsp(rd_bytes: bytes | None) -> dict: tcp_rsp = {"hdr": None, "len": None, "rbytes": None} if rd_bytes is None: return tcp_rsp if len(rd_bytes) > 0: tcp_rsp["hdr"] = rd_bytes[0] if len(rd_bytes) > 2: tcp_rsp["len"] = rd_bytes[1:3] if len(rd_bytes) > 3: tcp_rsp["rbytes"] = rd_bytes[3:] return tcp_rsp def _svl_parse_data(data_bytes: bytes, rfmt: type) -> str | int | bool | float | bytes: rlen = svl_data_types[rfmt]["len"] # Parse the data try: if rfmt is str: # check for pre/post 0x00"s if data_bytes[0] == 0 and data_bytes[-1] == 0: return data_bytes[1:-1].decode("utf-8", "replace").strip() return data_bytes.decode("utf-8", "replace").strip() elif rfmt is int and len(data_bytes) == rlen: return int.from_bytes(data_bytes, "big") elif rfmt is bool and len(data_bytes) == rlen: return int.from_bytes(data_bytes, "big") != 0 elif rfmt is float and len(data_bytes) == rlen and len(data_bytes) > 4: # floats are 4 bytes return struct.unpack(">f", data_bytes[(rlen - 4) :])[0] elif rfmt is not bytes: # Working as designed, but log to debug. Fall through for bytes. log.debug(f"Unexpected format, expected {rfmt} for {pretty_hex_bytes(data_bytes)}") except Exception as ex: log.error(f"Error parsing response for {pretty_hex_bytes(data_bytes)}: {ex}") # If we didn"t return some other type, just return the bytes return pretty_hex_bytes(data_bytes) __all__ = [ "_svl_parse_data", "_svlser_dat_txn", "_svlser_raw_txn", "_svlser_sys_txn", "_svltcp_dat_txn", "_svltcp_init", "_svltcp_raw_txn", "_svltcp_sys_txn", "svl_dat_cmds", "svl_data_types", "svl_socket_connect", "svl_socket_disconnect", "svl_socket_initialized", "svl_sys_cmds", ]