import errno
import os
import socket
import struct
from collections.abc import Callable
from pprint import pformat
from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM
from peat import config, consts, log
from peat.protocols.snmp import SNMP
[docs]
def check_tcp_port(
ip: str,
port: int,
timeout: float | None = None,
reset: bool = False,
syn_sweep: bool = False,
) -> bool:
"""
Check if a TCP port is open on the given IP address.
By default, this is essentially a bargain-basement ``nmap -sT`` ("Connect scan").
It will finalize connections using TCP FIN, which can sometimes result
in issues with devices continuing to send data. Therefore, I recommended
enabling reset, which closes connections with TCP RST instead of FIN.
In other words (from Stack Overflow FIN vs RST):
- FIN says: "I finished talking to you, but I'll still listen to
everything you have to say until you say that you're done."
- RST says: "There is no conversation. I won't say anything and
I won't listen to anything you say."
Further reading
- nmap connect scan: https://nmap.org/book/scan-methods-connect-scan.html
- SO_LINGER: https://www.nybek.com/blog/2015/03/05/cross-platform-testing-of-so_linger/
- SO_LINGER Python: https://stackoverflow.com/a/6440364
- FIN vs RST: https://stackoverflow.com/a/13050021
.. warning::
VMware VM interfaces will cause this function to return :obj:`True` for most ports!
Args:
ip: IPv4 address of the host to check
port: TCP port number to check
timeout: Number of seconds to wait for a response
reset: Close the connection with [ACK, RST] instead of a [FIN]
by setting SO_LINGER timeout to 0. This is generally useful
to prevent issues with devices that will respond with data
and cause issues (like errno 11 "Resource temporarily unavailable").
This is similar to ``nmap -sS`` ("SYN scan").
syn_sweep: If connection refused (code 111) should be considered "open".
Commonly used when checking if a host is online and responding.
This also changes the amount of logging output to accommodate
SYN sweep scanning, which generates a lot of errors.
Returns:
If the port is open, or the host is responding (if ``syn_sweep`` is set)
"""
with socket.socket(AF_INET, SOCK_STREAM) as sock:
# NOTE(cegoes): Setting timeout on Windows results in code 10035 errors
# even if the host is online. I don't know why this is happening and
# can't afford to spend any more time investigating at the moment.
# This means scanning as a standard user (not admin) will likely
# not work reliably or at all. The workaround for now is to
# run PEAT as an Administrator.
# NOTE 2(cegoes): Setting timeout is also causing issues on POSIX
# platforms. Some of the calls were resulting in socket error code
# 11 ("EWOULDBLOCK/EAGAIN" or "Resource Temporarily Unavailable").
# The issue only seemed to occur when running threaded in
# concurrent.futures, which I'm still not sure why it's occurring.
# The issue was manifesting primarily with the FTP service on SEL
# devices, which seem to REALLY want to open a full FTP connection
# when you attempt a TCP connect, and won't let you go.
if timeout is not None:
sock.settimeout(timeout)
if reset:
# "Turn the SO_LINGER socket option on and set the linger time
# to 0 seconds. This will cause TCP to abort the connection
# when it is closed, flush the data and send a RST."
# https://stackoverflow.com/a/6440364
# "Windows uses shorts in struct linger, where Linux uses ints"
linger = struct.pack("hh" if consts.WINDOWS else "ii", 1, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, linger)
try:
result = sock.connect_ex((ip, port))
except TimeoutError: # Port is filtered
log.log(
"TRACE4" if syn_sweep else "TRACE2",
f"TCP port check timed out to {ip}:{port} (timeout: {timeout:.2f})",
)
return False
except Exception as err: # Something unexpected happened
log.debug(f"TCP scan of {ip}:{port} failed due to an exception: {err}")
return False
# 0 => Success, 111 => Connection refused (something is there)
# socket errno reference: https://gist.github.com/gabrielfalcao/4216897
# WinError codes reference: https://support.microsoft.com/en-us/help/819124
if result == 0 or (syn_sweep and result in [111, 10061]):
if config.DEBUG >= 4:
log.trace4(f"{ip}:{port} scan succeeded: {err_code_to_str(result)}")
return True
if (not syn_sweep and config.DEBUG >= 3) or config.DEBUG >= 4:
log.trace3(f"TCP scan of {ip}:{port} failed due to an error: {err_code_to_str(result)}")
return False
[docs]
def check_udp_service(
ip: str, service: str, port: int | None = None, timeout: float = 1.0
) -> bool:
"""
Check if a specific UDP service is listening.
Args:
ip: IPv4 address of the host to check
service: Name of the service to check
port: UDP port number to check
timeout: Number of seconds to wait for a response
Returns:
If the service is listening
"""
name = service.lower()
if name == "snmp":
if port is None:
port = 161 # Set to SNMP default port
# Full list of community strings: goo.gl/4kANPb
snmp_communities = ["public", "private"]
for community in snmp_communities:
snmp = SNMP(ip, port, timeout, community=community)
val = snmp.get(identity="1.3.6.1.2.1.1.1.0")
if val:
return True
log.trace2(
f"SNMP service check failed for {ip}\nCommunity "
f"strings attempted: {pformat(snmp_communities)}"
f"\tPort: {port}\tTimeout: {timeout:.2f}"
)
return False
else:
log.error(f"Unknown UDP service passed to check_udp_service: {service}")
return False
[docs]
def fingerprint(
ip: str, port: int, timeout: float, payload: bytes, finger_func: Callable
) -> dict | None:
"""
Fingerprints (verifies) a UDP device.
The provided socket, ip, port, and byte payload are used in a discovery
packet to determine if the device is eligible to be fingerprinted. If so,
it then uses the provided fingerprint function object to verify and return
details about the device.
Args:
ip: IPv4 address of device
port: UDP port to use
timeout: Timeout for function
payload: Payload to send in the discovery
finger_func: Function to use to verify the device
Returns:
The device description :class:`dict`, or :obj:`None`
if it failed or there was an error
"""
log.trace2(f"Fingerprinting device {ip}:{port}")
sock = make_udp_socket(timeout)
if sock is None:
return None
with sock:
if send_discovery_packet(sock, ip, port, payload):
try:
return finger_func(sock)
except TimeoutError:
log.trace2(
f"Discovery packet succeeded, but fingerprint "
f"function timed out to device {ip}:{port}"
)
except Exception as err:
log.debug(
f"Failed to receive a response from an unknown device due to an error: {err}"
)
raise err from None
return None
[docs]
def make_udp_socket(timeout: float | None = None, broadcast: bool = False) -> socket.socket | None:
"""
Creates and binds a IPv4 UDP :class:`~socket.socket`.
Args:
timeout: Timeout to set for the socket, in seconds
broadcast: If the socket should be a broadcast socket
Returns:
The created UDP :class:`~socket.socket`
"""
sock = socket.socket(AF_INET, SOCK_DGRAM)
if timeout is not None:
sock.settimeout(timeout)
if broadcast:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
try:
# '' : Equivalent to '0.0.0.0', which is 'all interfaces'
# 0 : Let the OS choose a port
sock.bind(("", 0))
except Exception as ex:
log.error(f"Failed to bind UDP socket: {ex}")
return None
return sock
[docs]
def send_discovery_packet(sock: socket.socket, ip: str, port: int, payload: bytes) -> bool:
"""
Send initial hello packet used to fingerprint the device.
Args:
sock: :class:`~socket.socket` to use
ip: IPv4 address to send packet to
port: Port to send packet to
payload: Payload to send
Returns:
If the send was successful
"""
log.trace2(f"Sending discovery packet to {ip}:{port}")
try:
sock.sendto(payload, (ip, port))
except Exception as err:
log.error(f"Failed to send discovery packet to {ip}:{port}: {err}")
return False
return True
[docs]
def err_code_to_str(code: int) -> str:
"""
Translates a :mod:`socket` error code to a human-readable string.
Args:
code: Integer code to lookup
Returns:
Human-readable form of the error code, such as
``Operation not permitted`` or ``EPERM``. If the code
is unable to be looked up, the original code will
be returned as a string.
"""
if consts.WINDOWS and hasattr(socket, "errorTab"):
err_str = socket.errorTab.get(code)
else:
try:
err_str = os.strerror(code)
except ValueError:
err_str = errno.errorcode.get(code)
if err_str:
return f"{err_str} (Code: {code})"
return str(code)
__all__ = [
"check_tcp_port",
"check_udp_service",
"err_code_to_str",
"fingerprint",
"make_udp_socket",
"send_discovery_packet",
]