"""Woodward 2301e Speed Controller.
Services
- Servlink (Woodward proprietary protocol) on RS232 and RS422
- Modbus (optional) on RS232 and RS422
Authors
- Peter Shurtz
"""
import struct
from pathlib import Path
from typing import Any
from peat import DeviceData, DeviceModule, Interface, SerialMethod, datastore, log
from peat.protocols import open_serial_port, pretty_hex_bytes, serial_txn
from .wdw_2301e_svl import *
# TODO: remove '*' imports
from .wdw_svl import *
from .wdw_tc import *
# Track the Servlink sequence
svl_seq_odd = True
# "Magic" bytes for Servlink "hello" for _servlink_raw_txn()
SVL_HELLO_ACK = b"\x01\x60\x10\x21\x00\x80\x00\x80\x03\x33\x34"
# Data type specifiers for Servlink
# This is probably not 2301E-specific
# TODO: are these old?
svl_data_type = {
bool: b"\x81",
float: b"\x82",
int: b"\x84",
bytes: b"\xd4",
}
[docs]
class WDW2301E(DeviceModule):
"""Woodward 2301e Speed Controller."""
device_type = "Controller"
vendor_id = "Woodward"
vendor_name = "Woodward, Inc"
brand = "WDW"
model = "2301E"
filename_patterns = ["*.wset"] # TODO: should wset and tc parsing belong here or 3500XT?
# TODO: combine some common functions of 3500XT and 2301E?
woodward_fallback_baudrates = [9600]
@classmethod
def _verify_serial(cls, dev: DeviceData) -> bool:
"""
Check if a device is a 2301E via the Woodward-proprietary Servlink
protocol over a serial connection.
"""
baudrates = dev.options["baudrates"]
if not baudrates:
baudrates = cls.woodward_fallback_baudrates
timeout = dev.options["servlink_serial"]["timeout"]
for baudrate in baudrates:
if (
open_serial_port(dev.serial_port, baudrate, timeout)
and _servlink_hello_txn(dev.serial_port) == SVL_HELLO_ACK
and "2301e"
in str(_servlink_sys_txn(svl_sys_cmds["Product"], dev.serial_port)).lower()
):
cls.log.debug(f"Verified {dev.serial_port} (baudrate: {baudrate})")
iface = Interface(
connected=True,
type="rs_232", # TODO: detect serial interface type
serial_port=dev.serial_port,
baudrate=baudrate,
parity="none",
stop_bits=1,
flow_control="none",
)
dev.store("interface", iface, lookup="serial_port")
return True
cls.log.warning(f"Failed to verify {dev.serial_port} (baudrates: {baudrates})")
return False
@classmethod
def _pull(cls, dev: DeviceData) -> bool:
serial_info = cls._get_serial_data(dev.serial_port)
if not serial_info:
return False
dev.extra.update(serial_info)
dev.write_file(serial_info, "pulled-config.json")
return True
@classmethod
def _get_serial_data(cls, address: str) -> dict[str, str]:
"""Get the system information and configuration using Servlink serial.
It is assumed that the serial connection has been opened and the
Servlink sequence has been reset previously e.g. in identify_serial.
Args:
address: The serial address
Returns:
A dictionary containing the system information
"""
cls.log.info(f"Pulling Serial information from {address}...")
serial_info: dict[str, Any] = {"system": {}, "config": {}}
# Get the system information
for k in svl_sys_cmds.keys():
cls.log.debug(f"Pulling System\\{k}")
serial_info["system"][k] = _servlink_sys_txn(svl_sys_cmds[k], address)
cls.log.trace2(f'{k} = "{serial_info["system"][k]}"')
# Get the configuration
# TODO: should this only be the keys that start with "CfgA\\"?
for a in svl_dat_prms.keys():
cls.log.debug(f"Pulling {a}")
serial_info["config"][a] = _servlink_rw_txn(
svl_dat_cmds["read"], svl_dat_prms[a], address
)
cls.log.trace2(f'{a} = "{serial_info["config"][a]}"')
cls.log.info(f"Finished pulling Serial information from {address}")
return serial_info
@classmethod
def _parse(cls, file: Path, dev: DeviceData | None = None) -> DeviceData | None:
to_parse = file.read_bytes()
# TODO: detect 2301E wset file
try:
tc_file_text = to_parse.decode() # NOTE(cegoes): this may be iso8859-1
parsed_config = _parse_tc_file(tc_file_text)
except Exception as ex:
cls.log.exception(f"Critical parsing error: {ex}")
return None
# TODO: this is a temporary hack since we currently don't have examples of 2301E files
if parsed_config:
if not dev:
dev = datastore.get(file.stem, "id")
dev.extra.update(parsed_config)
cls.update_dev(dev)
dev.write_file(parsed_config, "parsed-config.json")
return dev
else:
cls.log.warning("No project parsed")
return None
WDW2301E.serial_methods = [
SerialMethod(
name="wdw2301e_servlink_serial",
description=str(WDW2301E._verify_serial.__doc__).strip(),
type="direct",
identify_function=WDW2301E._verify_serial,
reliability=3,
)
]
# TODO: this is in separate file now?
# This is likely not 2301E-specific
def _parse_tc_file(project: str) -> dict:
"""Parse a *.tc device logic file.
Args:
project: The .tc file prepared by parse_project
Returns:
A dictionary containing the device logic
"""
if not isinstance(project, str):
log.error(f"Project type error: {type(project)!s}")
return {}
bad_lines = 0
parsed_logic = {}
lines = project.strip().split("\r\n")
file_ver_line = lines.pop(0)
fw_prefix_line = lines.pop(0)
fw_date_line = lines.pop(0)
header_line = lines.pop(0)
if (
"File Version" in file_ver_line
and "-" in fw_prefix_line
and "UTC" in fw_date_line
and len(header_line.split("\t")) == 9
and header_line.startswith("Mode")
):
parsed_logic["file_version"] = file_ver_line
parsed_logic["firmware_prefix"] = fw_prefix_line
parsed_logic["firmware_date"] = fw_date_line
else:
log.warning("Unknown tc logic format")
return {}
parsed_logic["logic"] = {}
for line in lines:
fields = line.split("\t")
if not len(fields) == 9:
log.debug(f"Malformed line: '{line}'")
bad_lines += 1
continue
# Build Mode, Category, Block Name, and Field Name organization,
# then populate with Type, Current, Initial, Low, and High
ref = parsed_logic["logic"]
# Walk through each layer of the organization
for n in range(4):
if fields[n] not in ref:
ref[fields[n]] = {} # create a new layer if necessary
ref = ref[fields[n]] # walk to the next layer
# Now populate the top layer
ref["Type"] = fields[4]
ref["Current"] = fields[5]
ref["Initial"] = fields[6]
ref["Low"] = fields[7]
ref["High"] = fields[8]
if bad_lines > 0:
log.warning(f"Skipped {bad_lines} malformed lines")
return parsed_logic
# Not sure if this is 2301E-specific
def _servlink_hello_txn(address: str):
return _servlink_raw_txn(SVLSER_INIT_MSG, address, True)
# Not sure if this is 2301E-specific
def _servlink_sys_txn(sys_dict: dict, address: str):
return _servlink_seq_txn(sys_dict["cmd"], address, sys_dict["type"])
# Not sure if this is 2301E-specific
def _servlink_rw_txn(rw_dict: dict, addr_dict: dict, address: str): # only one addr for now
rw_bytes = _servlink_rw_fmt(rw_dict["cmd"], [addr_dict])
return _servlink_seq_txn(rw_bytes, address, addr_dict["type"])
# Not sure if this is 2301E-specific
def _servlink_seq_txn(
cmd_bytes: bytes, address: str, fmt: "type | None" = None, options: bool = False
):
"""Send a sequential Servlink serial message. Also track the Servlink
sequence.
Args:
cmd_bytes: The data to write (data payload)
address: The serial address
fmt: The desired response type
options: The options
Returns:
The parsed response
"""
seq_bytes = _servlink_seq_fmt(cmd_bytes)
txn_bytes = _servlink_raw_txn(seq_bytes, address, options)
rsp = _servlink_rsp_trns(txn_bytes, fmt)
global svl_seq_odd
svl_seq_odd = not svl_seq_odd
return rsp
# Not sure if this is 2301E-specific
def _servlink_raw_txn(wr_bytes: bytes, address: str, reset: bool = False):
"""Send a raw Servlink serial message. Optionally reset the Servlink
sequence.
Args:
wr_bytes: The data to write (data payload)
address: The serial address
reset: Whether to reset the Servlink sequence
Returns:
The raw serial response
"""
global svl_seq_odd
if reset:
svl_seq_odd = True
svl_bytes = b"\x00\x00\x00" # 3-char dead time
svl_bytes += _servlink_raw_fmt(wr_bytes)
svl_bytes += b"\x00\x00\x00" # 3-char dead time
return serial_txn(svl_bytes, address)
# Not sure if this is 2301E-specific
def _servlink_seq_fmt(cmd_bytes: bytes) -> bytes:
"""Apply Servlink sequence byte"""
if svl_seq_odd:
seq = b"\x21"
else:
seq = b"\x20"
return seq + cmd_bytes
# Not sure if this is 2301E-specific
def _servlink_raw_fmt(wr_bytes: bytes) -> bytes:
"""Apply Servlink format"""
svl_bytes = b"\x01" # slave address?
svl_bytes += wr_bytes
svl_bytes += b"\x03" # delimiter?
# TODO: need to inject 0x10 0x20 sometimes, just not sure when
svl_bytes += _servlink_crc(wr_bytes)
return svl_bytes
# Not sure if this is 2301E-specific
def _servlink_rw_fmt(cmd_byte: bytes, addrs: list) -> bytes:
"""Compose Servlink R/W commands. Multiple address specifiers per command
are allowed, but parsing the reply is not implemented in
_servlink_rsp_trns() yet, so you'll just get back bytes.
"""
svl_bytes = cmd_byte
for d in addrs:
svl_bytes += svl_data_type[d["type"]]
svl_bytes += b"\xf0\x09" # ?
svl_bytes += d["addr"]
return svl_bytes
# Not sure if this is 2301E-specific
def _servlink_rsp_trns(rd_bytes: bytes, fmt: "type") -> str:
"""Parse the Servlink response to transform it to the specified type.
Multi-address responses are not yet implemented.
Args:
rd_bytes: The data to parse (raw response bytes)
fmt: The desired response type
Returns:
The parsed response in the specified type
"""
# Remove formatting bytes
if len(rd_bytes) >= 7 and rd_bytes[-3] == 16 and rd_bytes[-4] == 3:
# Sometimes the checksum has a 0x10 inserted before and a 0x20 ORed
# with the MSB of the data. Note that this works just like, but is NOT
# the same as, the 0x10 0x20 occurrence in the data described below.
# Discard these bytes with the "normal" Servlink format bytes for now.
# 0 - echoes byte 0 of command
# 1 - echoes byte 1 of command
# -4 - 0x03 - delimiter?
# -3 - 0x10 - ?
# -2 - checksum first byte ORed with 0x20?
# -1 - checksum second byte
data_bytes = rd_bytes[2:-4]
elif len(rd_bytes) >= 6:
# Normal response, discard Servlink format bytes for now.
# 0 - echoes byte 0 of command
# 1 - echoes byte 1 of command
# -3 - always 0x03 - delimiter?
# -2 - checksum first byte
# -1 - checksum second byte
data_bytes = rd_bytes[2:-3]
else:
# Not enough bytes left! No data?
return ""
# Sometimes the data has a 0x10 inserted before and a 0x20 ORed with the
# MSB of the data. For example, instead of looking like:
# 0x 01 21 00 00 01 03 D8 CB
# it looks like:
# 0x 01 21 00 00 10 21 03 D8 CB
# where the real data is "00 00 01", not "00 00 10 21" (the checksum agrees
# with this as well).
# This is currently detected by length, since each data type has an
# expected length. It COULD be checked for when the CRC doesn't match, but
# that starts to get muddy...
# Note that this works just like, but is NOT the same as, the 0x10 0x20
# occurrence in the checksum described above.
#
# Remove these "extra" values.
if (fmt is int and len(data_bytes) == 7) or (fmt is bool and len(data_bytes) == 4):
# Find the first non-zero byte, this should be the extra 0x10
nz = next((i for i, x in enumerate(data_bytes) if x), None)
# Remove the byte at index nz
del data_bytes[nz]
# The 0x20-ORed byte should now be at index nz, un-OR it
data_bytes[nz] -= 32
# Parse the data
try:
if fmt is str:
if data_bytes[0] == 0 and data_bytes[-1] == 0:
# strip pre/post 0x00's if any
data_bytes = data_bytes[1:-1]
return data_bytes.decode("utf-8", "replace").strip()
elif fmt is int and len(data_bytes) == 6:
return int.from_bytes(data_bytes, "big")
elif fmt is bool and len(data_bytes) == 3:
return int.from_bytes(data_bytes, "big") != 0
elif (
fmt is float and len(data_bytes) == 6 and data_bytes[0] == 0 and data_bytes[1] == 0
): # we get back 6 bytes but floats are 4
return struct.unpack(">f", data_bytes[2:])[0]
elif fmt is not bytes:
# Working as designed, but log to debug
log.debug(f"Unmatched response type for {fmt} {pretty_hex_bytes(data_bytes)}")
except Exception as ex:
log.error(f"Error parsing response for {pretty_hex_bytes(data_bytes)}: {ex}")
# If we didn't return some other type, just return the bytes
return pretty_hex_bytes(data_bytes)
# Not sure if this is 2301E-specific
def _servlink_crc(data_bytes: bytes):
"""Calculate Servlink CRC (little-endian CRC-16-MODBUS of the bytes
between 0x01 and 0x03). Use int's so we can use bitwise operations.
Args:
data_bytes: The data bytes to checksum
Returns:
The two-byte checksum of data_bytes
"""
crc = 65535 # 0xFFFF' -> 65535
for i in range(len(data_bytes)):
crc ^= data_bytes[i]
for _j in range(8):
if (crc & 1) != 0: # 0x0001' -> 1
crc >>= 1
crc ^= 40961 # 0xA001' -> 40961
else:
crc >>= 1
# byteorder='big' is the endianness of the Python int->byte conversion,
# NOT the endianness of the checksum calculation!
return crc.to_bytes(2, byteorder="big")
__all__ = ["WDW2301E"]