from __future__ import annotations
import random
from peat import log
from peat.protocols.cip.cip_const import (
CIP_SC_BYTES,
CLASS_CODE,
CLASS_ID,
CONNECTION_MANAGER_INSTANCE,
INSTANCE_ID,
)
from peat.protocols.common import scapy_human_field
from peat.protocols.data_packing import *
from .enip_const import (
ADDRESS_ITEM,
CONNECTION_PARAMETER,
CONNECTION_SIZE,
DATA_ITEM,
ENIP_SEQ_MAX,
ENIP_SEQ_MIN,
PRIORITY,
TIMEOUT_MULTIPLIER,
TIMEOUT_TICKS,
TRANSPORT_CLASS,
EnipCommError,
)
from .enip_packets import (
ENIP,
ENIPListIdentityResponse,
ENIPListInterfacesResponse,
ENIPListServicesResponse,
)
from .enip_socket import EnipSocket
# TODO (cegoes, 06/02/2023): rewrite packet generation using Scapy
# This will be transformed into a interface for working with ENIP endpoints
# by wrapping the packet construction, session handling, etc. in a class
# basically what pycomm did, but simpler and easier to maintain, with better
# error handling and logging.
# Combine much of the functionality currently in ab_scan and elsewhere into here.
# NOTE: this assumes unicast, need to properly handle broadcast. maybe separate file
# with generalized functions (like discovery) that can be used for unicast or broadcast.
# TODO: leverage scapy to take PCAPs?
[docs]
class EnipDriver:
"""
Ethernet/IP (ENIP) protocol handler.
"""
def __init__(
self,
enip_socket: EnipSocket,
cpu_slot: int = 0,
rpi: int = 5000,
timeout: int = 0xAF12,
backplane: int = 1,
cid_ot: bytes = b"\x00\x00\x00\x00",
cid_to: bytes = b"\x27\x04\x19\x71",
csn: bytes = b"\x8f\x00",
vid: bytes = b"\x4d\x00",
vsn: bytes = b"\x71\x6e\x4c\x0c",
) -> None:
self.log = log.bind(
classname=self.__class__.__name__,
target=f"{enip_socket.ip}:{enip_socket.port}",
)
self.enip_socket = enip_socket
self.cpu_slot = cpu_slot
self.rpi = rpi
self.timeout = timeout
self.backplane = backplane
self.cid_ot = cid_ot
self.cid_to = cid_to
self.csn = csn
self.vid = vid
self.vsn = vsn
self.sequence = random.randint(ENIP_SEQ_MIN, ENIP_SEQ_MAX)
self.session = 0
self.is_forward_opened = False
self.log.trace(f"Initialized {self.__class__.__name__}")
def __enter__(self) -> EnipDriver:
if not self.open():
raise ConnectionError(f"failed to connect to {str(self.enip_socket)}")
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
retval = self.close()
if exc_type:
self.log.debug(f"{exc_type.__name__}: {exc_val}")
return retval
def __str__(self) -> str:
return f"{self.enip_socket.ip}:{self.enip_socket.port}"
[docs]
def open(self) -> bool:
"""
Connect and open a session with the device.
Raises:
EnipCommError: exception occurred while opening the session
"""
if self.enip_socket.connect() and self.register_session() is not None:
return self.forward_open()
self.log.warning("Sessions not registered")
return False
[docs]
def close(self) -> bool:
"""
Close the session with the device and the underlying socket.
Returns:
If the close was successful
Raises:
EnipCommError: Exception occurred while unregistering the session
"""
retval = True
try:
if self.is_forward_opened:
try:
retval = self.forward_close()
except EnipCommError as ex:
self.log.warning(
f"Exception occurred during forward_close, the connection "
f"may not have been cleaned up properly. Exception: {ex}"
)
if self.enip_socket.is_connected:
self.enip_socket.close()
return False
if self.session:
self.unregister_session()
if self.enip_socket.is_connected:
self.enip_socket.close()
except Exception as err:
raise EnipCommError(err) from None
return retval
[docs]
def send(self, msg: bytes) -> int:
"""
Sends a message through the ENIP socket.
Returns:
Number of bytes sent
Raises:
EnipCommError: if the send failed
"""
return self.enip_socket.send(msg)
[docs]
def recv(self) -> bytes:
"""
Receives a message from the ENIP socket.
Returns:
The message received as bytes
Raises:
EnipCommError: if the received failed
"""
return self.enip_socket.receive()
[docs]
def _get_sequence(self) -> int:
"""
Increment and return the sequence number used with
connected messages.
Returns:
The current sequence number
"""
if self.sequence < ENIP_SEQ_MAX:
self.sequence += 1
else:
self.sequence = 0
return self.sequence
[docs]
def send_nop(self) -> None:
"""
Send a NOP to the target, gets no reply.
A NOP provides a way for either an originator or target to determine
if the TCP connection is still open.
Raises:
EnipCommError: if the message send failed
"""
packet = ENIP(
commandCode="NOP",
sessionHandle=self.session,
)
self.send(bytes(packet))
# TODO: unit test in CI
# TODO: use this function instead of the currently used one
[docs]
def list_identity(self) -> ENIPListIdentityResponse:
"""
ListIdentity command to locate and identify potential target.
Raises:
EnipCommError: if the message send failed
"""
packet = ENIP(commandCode="ListIdentity")
self.send(bytes(packet))
reply = ENIP(self.recv())
# TODO: check error code
return ENIPListIdentityResponse(reply.data)
# TODO: unit test in CI
[docs]
def list_services(self) -> list[str]:
"""
Returns list of service names returned by ListServices command.
"""
services_response = self._get_services()
service_names = [
svc.serviceName.decode().replace("\x00", "") for svc in services_response.listItems
]
return service_names
[docs]
def _get_services(self) -> ENIPListServicesResponse:
packet = ENIP(commandCode="ListServices")
self.send(bytes(packet))
reply = ENIP(self.recv())
# TODO: check error code
return ENIPListServicesResponse(reply.data)
[docs]
def list_interfaces(self) -> list[str]:
"""
Returns list of interfaces returned by ListInterfaces command.
"""
# TODO: don't know if this actually works
interfaces_response = self._get_interfaces()
interfaces = []
if interfaces_response.listItems:
interfaces = [
svc.itemData.decode().replace("\x00", "") for svc in interfaces_response.listItems
]
return interfaces
[docs]
def _get_interfaces(self) -> ENIPListServicesResponse:
packet = ENIP(commandCode="ListInterfaces")
self.send(bytes(packet))
reply = ENIP(self.recv())
# TODO: check error code
return ENIPListInterfacesResponse(reply.data)
[docs]
def send_rr_data(self, message: bytes) -> tuple[bytes, bytes]:
"""
SendRRData transfer an encapsulated request/reply packet between
the originator and target.
Args:
message: The message to be send to the target
Returns:
a :class:`tuple` where:
0: (boolean) reply is valid
1: the target CID T->O value from the response
Raises:
EnipCommError: if the message send failed
"""
rr_message = ENIP(
commandCode="SendRRData",
sessionHandle=self.session,
data=bytes(message),
)
self.send(bytes(rr_message))
reply = ENIP(self.recv())
# TODO: generalize error handling
status = scapy_human_field(reply, "status")
if status != "Success":
raise EnipCommError(f"Failed SendRRData: {status}")
target_cid_to = reply.data[20:24]
return reply.data[16:], target_cid_to
[docs]
def send_unit_data(self, message: bytes) -> bytes:
"""
SendUnitData send encapsulated connected messages.
Args:
message: The message to be sent to the target
Returns:
The reply data
Raises:
EnipCommError: if the message send failed
"""
unit_message = ENIP(
commandCode="SendUnitData",
sessionHandle=self.session,
data=bytes(message),
)
self.send(bytes(unit_message))
reply = ENIP(self.recv())
# TODO: generalize error handling
status = scapy_human_field(reply, "status")
if status != "Success":
raise EnipCommError(f"Failed SendUnitData: {status}")
return reply.data[22:]
[docs]
def send_connected_command(self, service: int, path: bytes, cmd_data: bytes) -> bytes:
"""
Sends a connected command to the device.
Args:
service: One byte value indicating the request service
path: The path to the specified instance (including path length)
cmd_data: Additional command data
Returns:
The reply data
Raises:
EnipCommError: if the message send failed
"""
message_request = [
pack_uint(self._get_sequence()),
bytes([service]),
path,
cmd_data,
]
message = self._build_common_packet_format(
DATA_ITEM["Connected"],
b"".join(message_request),
ADDRESS_ITEM["Connection Based"],
addr_data=self.cid_to,
)
return self.send_unit_data(message)
[docs]
def register_session(self) -> int:
"""
Register a new session with the communication partner.
Returns:
The session number
Raises:
EnipCommError: if the registration message send failed
"""
if self.session:
return self.session
message = bytes(ENIP(commandCode="RegisterSession"))
self.send(message)
reply = ENIP(self.recv())
# check if session registered successfully
status = scapy_human_field(reply, "status")
if status != "Success":
raise EnipCommError(f"Failed to register session: {status}")
self.session = reply.sessionHandle
self.log.debug(f"Session = 0x{self.session:0>8x} has been registered")
return self.session
[docs]
def unregister_session(self) -> None:
"""
Unregister a connection.
Raises:
EnipCommError: if the unregister message send failed
"""
message = ENIP(
commandCode="UnregisterSession",
sessionHandle=self.session,
)
self.send(bytes(message))
self.session = 0
[docs]
def forward_open(self) -> bool:
"""
CIP implementation of the forward open message.
Refer to ODVA documentation Volume 1 3-5.5.2
Returns:
:obj:`False` if any error in the replayed message
Raises:
EnipCommError: No session was registered before calling forward open
"""
if not self.session:
raise EnipCommError(
f"a session need to be registered before call to forward open to {str(self)}"
)
forward_open_msg = [
CIP_SC_BYTES["FORWARD_OPEN"],
pack_usint(2),
CLASS_ID["8-bit"],
pack_usint(CLASS_CODE["Connection Manager"]), # Volume 1: 5-1
INSTANCE_ID["8-bit"],
CONNECTION_MANAGER_INSTANCE["Open Request"],
PRIORITY,
TIMEOUT_TICKS,
self.cid_ot,
self.cid_to,
self.csn,
self.vid,
self.vsn,
TIMEOUT_MULTIPLIER,
b"\x00\x00\x00",
pack_dint(self.rpi * 1000),
pack_uint(CONNECTION_PARAMETER["Default"]),
pack_dint(self.rpi * 1000),
pack_uint(CONNECTION_PARAMETER["Default"]),
TRANSPORT_CLASS, # Transport Class
CONNECTION_SIZE["Backplane"],
pack_usint(self.backplane),
pack_usint(self.cpu_slot),
CLASS_ID["8-bit"],
pack_usint(CLASS_CODE["Message Router"]),
INSTANCE_ID["8-bit"],
pack_usint(1),
]
message = self._build_common_packet_format(
DATA_ITEM["Unconnected"], b"".join(forward_open_msg), ADDRESS_ITEM["UCMM"]
)
send_rr_data_success, target_cid_to = self.send_rr_data(message)
if send_rr_data_success:
self.cid_to = target_cid_to
self.is_forward_opened = True
self.log.info("forward_open successful")
return True
self.log.warning("send_rr_data failed")
return False
[docs]
def forward_close(self) -> bool:
"""
CIP implementation of the forward close message
Each connection opened with the froward open message need to be closed.
Refer to ODVA documentation Volume 1 3-5.5.3
Returns:
:obj:`False` if any error in the message reply
Raises:
EnipCommError: No session was registered before calling forward close
"""
if not self.session:
raise EnipCommError(
f"a session need to be registered before call to forward_close to {str(self)}"
)
forward_close_msg = [
CIP_SC_BYTES["FORWARD_CLOSE"],
pack_usint(2),
CLASS_ID["8-bit"],
pack_usint(CLASS_CODE["Connection Manager"]), # Volume 1: 5-1
INSTANCE_ID["8-bit"],
CONNECTION_MANAGER_INSTANCE["Open Request"],
PRIORITY,
TIMEOUT_TICKS,
self.csn,
self.vid,
self.vsn,
CONNECTION_SIZE["Backplane"],
b"\x00", # Reserved
pack_usint(self.backplane),
pack_usint(self.cpu_slot),
CLASS_ID["8-bit"],
pack_usint(CLASS_CODE["Message Router"]),
INSTANCE_ID["8-bit"],
pack_usint(1),
]
message = self._build_common_packet_format(
DATA_ITEM["Unconnected"], b"".join(forward_close_msg), ADDRESS_ITEM["UCMM"]
)
send_rr_data_success, _ = self.send_rr_data(message)
if send_rr_data_success:
self.is_forward_opened = False
self.log.debug("forward_close successful")
return True
self.log.warning("failed forward_close")
return False
__all__ = ["EnipDriver"]