Source code for peat.modules.schneider.m340.umas_packets

"""
Scapy packets for the Schneider-proprietary UMAS protocol.

This protocol is used by the Schneider Modicon M340 PLC.

Authors

- Patrica Schulz
- Christopher Goes
"""

import binascii
import socket

from scapy.all import (
    ByteField,
    FieldLenField,
    LEShortField,
    MayEnd,
    Packet,
    ShortField,
    StrLenField,
    XByteField,
)

from peat import CommError, config, utils
from peat import log as peat_logger

# TODO: refactor the UMAS packets to properly implement the UMAS protocol
#   based on what we know now about it. hopefully improve reliability.


[docs] class Modbus(Packet): name = "Modbus/TCP" fields_desc = [ ShortField("transactionID", 1), ShortField("protocolID", 0), FieldLenField( "length", None, length_of="data", adjust=lambda pkt, x: x + 2, # noqa: ARG005 ), XByteField("unitID", 0), ByteField("functionCode", 90), StrLenField("data", "", length_from=lambda pkt: pkt.length - 2), ]
[docs] class UMASQuery(Packet): name = "UMAS query" fields_desc = [ XByteField("connectionCode", 0x6D), XByteField("commandCode", 0x00), ShortField("num", 0x01), LEShortField("data", 0x0000), ]
[docs] class UMASResponse(Packet): name = "UMAS response" fields_desc = [ XByteField("connectionCode", 0x6D), # NOTE: MayEnd required for scapy 2.6 # num and dataLen are optional? MayEnd(XByteField("commandCode", 0x00)), ShortField("num", 0x00), LEShortField("dataLen", 0x00), ]
[docs] class UMASPoll(Packet): name = "UMAS poll" fields_desc = [ XByteField("connectionCode", 0x00), XByteField("commandCode", 0x00), ]
[docs] class UMASConnection(Packet): name = "UMAS connection" fields_desc = [ XByteField("connectionCode", 0x00), XByteField("commandCode", 0x10), LEShortField("unknown1", 0xAF8C), LEShortField("unknown2", 0x0204), FieldLenField("length", None, length_of="name"), StrLenField("name", "", length_from=lambda pkt: pkt.length), ]
[docs] class UMASConnectionResponse(Packet): name = "UMAS connection response" fields_desc = [ XByteField("connectionCode", 0x00), XByteField("commandCode", 0x10), XByteField("newConnectionCode", 0x10), ]
[docs] def start_pull_packet(cid: int) -> Modbus: p = {"connectionCode": cid, "commandCode": 0x33, "num": 1, "data": 0x03FB} mp = {"data": bytes(UMASQuery(**p))} return Modbus(**mp)
[docs] def pull_packet(cid: int, seq: int) -> Modbus: p = {"connectionCode": cid, "commandCode": 0x34, "num": 0x1, "data": seq} mp = {"data": bytes(UMASQuery(**p))} return Modbus(**mp)
[docs] def stop_pull_packet(cid: int, seq: int) -> Modbus: p = {"connectionCode": cid, "commandCode": 0x35, "num": 0x1, "data": seq} mp = {"data": bytes(UMASQuery(**p))} return Modbus(**mp)
[docs] def poll_packet(cid: int) -> Modbus: p = {"connectionCode": cid, "commandCode": 0x04} mp = {"data": bytes(UMASPoll(**p))} return Modbus(**mp)
[docs] def connect_packet() -> Modbus: # NOTE(cegoes): must call bytes() on a packet # layer before passing it to the next layer sch_connect = bytes(UMASConnection(name=b"PEAT - Unity Loader")) return Modbus(data=sch_connect)
[docs] def send_umas_packet( sock: socket.socket, packet: Packet | bytes, response_class: type[Packet], tracker: list | None = None, ) -> Packet: """ Send a packet and receive the response from a UMAS device. Args: sock: The TCP socket to use packet: The packet to send response_class: The Packet subclass to process the response as tracker: tracks all packets sent, including raw payloads and metadata Returns: The data field of the response """ payload = bytes(packet) # for logging ip, port = sock.getpeername() device = f"{ip}:{port}" log = peat_logger.bind(target=device) if config.DEBUG >= 3: log.trace3( f"Sending packet from local interface {sock.getsockname()} to " f"remote device {device}\nLength: {len(packet)}" f"\nContents: {binascii.hexlify(payload)}" ) try: update_tracker("send", packet, payload, sock, tracker) bytes_sent = sock.send(payload) if config.DEBUG >= 2: log.trace2(f"{bytes_sent} bytes were sent to {device}") if bytes_sent != len(payload): log.error(f"Only {bytes_sent} bytes were sent out of {len(payload)} total bytes") except OSError as err: log.exception("Could not send UMAS packet") raise err # TODO: fix for scapy 2.6.1 raw_response = sock.recv(4096) # type: bytes modbus_layer = Modbus(raw_response) data = response_class(modbus_layer.data) # type: Packet update_tracker("receive", data, raw_response, sock, tracker) if config.DEBUG >= 3: log.trace3( f"Raw data from remote device {device}" f"\nLength: {len(packet)}\nContents: " f"{binascii.hexlify(raw_response)}" ) if data.commandCode != 0xFE: if data.commandCode == 253: # 0xfd raise CommError( f"Response code 0xfd (253). Someone is currently connected to the " f"device in the Unity editor or a PEAT pull is happening at " f"the same time against this module or another communication " f"module on the same chassis! (device: {device})" ) log.warning(f"Received an invalid command code: {hex(data.commandCode)}") if config.DEBUG: data.show2() return data
[docs] def update_tracker( direction: str, packet: Packet, payload: bytes, sock: socket.socket, tracker: list | None, ): # TODO: expand this technique globally to PEAT for traffic capturing/logging if tracker is not None: # TODO: store UMAS function code # TODO: store UMAS response status (for responses) # TODO: resolve UMAS function name (from .umas_codes.py) dataset = { "direction": direction, "local_ip": sock.getsockname()[0], "local_port": sock.getsockname()[1], "remote_ip": sock.getpeername()[0], "remote_port": sock.getpeername()[1], # "packet_object": packet, "packet_length": len(packet), "payload": binascii.hexlify(payload), "data": binascii.hexlify(packet.data) if hasattr(packet, "data") else b"", "load": binascii.hexlify(packet.load) if hasattr(packet, "load") else b"", "timestamp": utils.utc_now(), } tracker.append(dataset)