Source code for peat.protocols.enip.enip_socket
from __future__ import annotations
import io
import socket
import struct
from pathlib import Path
from peat import config, exit_handler, log, state, utils
from .enip_const import HEADER_SIZE, EnipCommError
[docs]
class EnipSocket:
"""
Ethernet/IP (ENIP) socket.
Authors
- Christopher Goes
"""
def __init__(self, ip: str, port: int, timeout: float = 5.0) -> None:
self.ip: str = ip
self.port: int = port
self.timeout: float = timeout
self.log = log.bind(
classname=self.__class__.__name__,
target=f"{self.ip}:{self.port}",
)
self.is_connected: bool = False
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.fmt_log_fp: io.FileIO | None = None
self.raw_log_fp: io.FileIO | None = None
self.fmt_log_path: Path | None = None
self.raw_log_path: Path | None = None
self.log_to_file: bool = False
if config.DEBUG >= 2 and config.LOG_DIR:
self.log_to_file = True
d_ip = self.ip.replace(".", "-")
self.fmt_log_path = config.LOG_DIR / "enip" / f"{d_ip}_enip-data-formatted.log"
self.raw_log_path = config.LOG_DIR / "enip" / f"{d_ip}_enip-data-raw.csv"
self.log.trace(f"Initialized {repr(self)}")
def __enter__(self) -> EnipSocket:
if not self.connect():
raise EnipCommError(f"failed to connect to {str(self)}")
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
if exc_type:
self.log.debug(f"{exc_type.__name__}: {exc_val}")
def __str__(self) -> str:
return f"{self.ip}:{self.port}"
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.ip}, {self.port}, {self.timeout})"
[docs]
def connect(self) -> bool:
"""
Connect to ENIP socket.
Returns:
If connection was successful
Raises:
EnipCommError: if the socket timed out during connection
"""
try:
self.sock.connect((self.ip, self.port))
except TimeoutError:
self.log.debug(f"Socket timed out during connect (timeout: {self.timeout} seconds)")
raise EnipCommError(f"socket timeout during connection to {str(self)}") from None
self.is_connected = True
return self.is_connected
[docs]
def close(self) -> None:
"""
Close the Python socket and log files (if debugging).
"""
if self.is_connected:
self.is_connected = False
try:
self.sock.close()
except Exception:
pass
self._close_log_files()
[docs]
def send(self, message: bytes) -> int:
"""
Send a ENIP message.
Returns:
Number of bytes sent
Raises:
EnipCommError: Exception occured while sending the message
"""
if self.log_to_file:
self._log_protocol_msg(message, "SEND")
total_sent = 0
while total_sent < len(message):
try:
sent = self.sock.send(message[total_sent:])
if not sent:
raise EnipCommError(f"socket connection broken during send to {str(self)}")
total_sent += sent
except TimeoutError:
self.log.warning(f"Socket timed out during send (timeout: {self.timeout} seconds)")
raise EnipCommError(f"socket timeout during send to {str(self)}") from None
except OSError:
raise EnipCommError(
f"socket connection broken during send to {str(self)}"
) from None
return total_sent
[docs]
def receive(self) -> bytes:
"""
Receive and unpack an ENIP response.
Returns:
The ENIP response as bytes
Raises:
EnipCommError: Exception occured while recieving the response
"""
msg_len = 28
chunks = []
bytes_received = 0
one_shot = True
while bytes_received < msg_len:
try:
chunk = self.sock.recv(min(msg_len - bytes_received, 2048))
if not chunk:
raise EnipCommError(
f"socket connection broken during receive from {str(self)}"
)
if one_shot:
data_size = int(struct.unpack("<H", chunk[2:4])[0])
msg_len = HEADER_SIZE + data_size
one_shot = False
chunks.append(chunk)
bytes_received += len(chunk)
except TimeoutError:
self.log.warning(
f"Socket timed out during receive (timeout: {self.timeout} seconds)"
)
raise EnipCommError(f"socket timeout during receive from {str(self)}") from None
except OSError:
raise EnipCommError(
f"socket connection broken during receive from {str(self)}"
) from None
message = b"".join(chunks)
if self.log_to_file:
self._log_protocol_msg(message, "RECEIVE")
return message
[docs]
def _close_log_files(self) -> None:
if self.fmt_log_fp:
self.fmt_log_fp.close()
self.fmt_log_fp = None
if self.raw_log_fp:
self.raw_log_fp.close()
self.raw_log_fp = None
[docs]
def _log_protocol_msg(self, msg: bytes, direction: str) -> None:
"""
Logs protocol messages to two files: formatted bytes and raw bytes.
"""
if not self.log_to_file: # Skip if file output is disabled
return
if not self.fmt_log_fp:
# Create directory and open files on first write
# We don't do this in __init__ in case no sends/receives occur
if not self.fmt_log_path.parent.exists():
self.fmt_log_path.parent.mkdir(parents=True, exist_ok=True)
state.written_files.add(self.fmt_log_path.as_posix())
state.written_files.add(self.raw_log_path.as_posix())
add_csv_header = False
if not self.raw_log_path.exists():
add_csv_header = True
self.fmt_log_fp = self.fmt_log_path.open("w")
self.raw_log_fp = self.raw_log_path.open("w")
# Close files when peat exits
exit_handler.register(self._close_log_files, "FILE")
if add_csv_header:
self.raw_log_fp.write("TIMESTAMP,IP,PORT,DIRECTION,BYTES\n")
ts = utils.utc_now()
# write raw bytes in a comma-separated format to a single line
raw = f"{ts.isoformat()},{self.ip},{self.port},{direction},{msg.hex()}\n"
self.raw_log_fp.write(raw)
# format bytes in a hexdump format
header = f" {'-' * 16} {ts.strftime('%H:%M:%S.%f')} {'-' * 15}\n"
if direction == "SEND":
header += " --------------------- SEND ---------------------"
else:
header += " -------------------- RECEIVE -------------------"
fmt = f"{self._format_bytes_msg(msg, header)}\n\n\n"
self.fmt_log_fp.write(fmt)
__all__ = ["EnipSocket"]