"""
Discovery of hosts on a Ethernet network (commonly called "scanning").
**Raw Sockets**
Several functions require the use of raw sockets. On Linux, these require
either root permissions, or the ``cap_net_raw`` capability applied to the
Python interpreter. On Windows, the program must be running with Administrator
permissions, usually from an elevated command prompt.
Further reading about getting raw socket permissions on Linux
- https://stackoverflow.com/a/27059188
- https://stackoverflow.com/a/47982075
- https://stackoverflow.com/a/30826137
- https://gist.github.com/tomix86/32394a43be70c337cbf1e0c0a56cbd8d
- http://man7.org/linux/man-pages/man7/capabilities.7.html
"""
import timeit
from collections.abc import Iterable
from concurrent import futures
from scapy.all import RandShort, sr1
from scapy.layers.inet import ICMP, IP
from scapy.layers.l2 import arping
from peat import config, log, state, utils
from .addresses import ip_in_local_subnet, ip_is_local_interface, sort_ips
from .ip import check_tcp_port
[docs]
def check_host(ip: str, timeout: float = 1.0, icmp_fallback_tcp_syn: bool | None = None) -> bool:
"""
Checks if a network host is online.
.. note::
Requests to ``127.0.0.1`` will always return :obj:`True`
Args:
ip: IPv4 address of host
timeout: Number of seconds to wait for a response
icmp_fallback_tcp_syn: In the case of a :term:`ICMP` failure, fallback to
attempting a TCP SYN RST to check if the host is online.
The edge case is this: if we are able to use raw sockets and
the host is NOT in a local subnet, then ICMP requests will be used.
Certain devices (such as the SEL RTAC) and many firewalls drop
ICMP requests. We still want to check if the host is online,
therefore we use TCP as a fallback. If the device is sensitive
to TCP RST's, then this argument should be set to :obj:`False`. The
TCP port used is configured by the ``SYN_PORT`` PEAT configuration
option (e.g. ``export PEAT_SYN_PORT=80``).
Returns:
If the host is online
"""
if icmp_fallback_tcp_syn is None:
icmp_fallback_tcp_syn = config.ICMP_FALLBACK_TCP_SYN
# Minor hack to skip ARP/ICMP check for quick testing using localhost
if ip == "127.0.0.1":
log.debug("Skipping online check of 127.0.0.1 since it's localhost")
return True
# If it's actually a local interface, skip
if ip_is_local_interface(ip):
log.debug(f"Skipping online check of {ip} since it's an interface on the local system")
return False
# If we're able to use raw sockets, try using better methods
if not config.FORCE_ONLINE_METHOD_TCP and (
config.FORCE_ONLINE_METHOD_PING or state.raw_socket_capable
):
# Use ARP if IP is in a local subnet (lightest and safest)
if ip_in_local_subnet(ip):
return check_host_arp(ip, timeout)
# Otherwise, use pings (ICMP requests)
else:
remote_result = check_host_icmp(ip, timeout)
# Fallback to TCP SYN-RST if ICMP check fails and flag is enabled
if not remote_result and icmp_fallback_tcp_syn:
remote_result = check_tcp_port(
ip, config.SYN_PORT, timeout, reset=True, syn_sweep=True
)
return remote_result
# Fallback to the old unsafe and noisy TCP SYN sweep method
log.trace3(f"Falling back to TCP SYN online check for {ip}")
return check_tcp_port(ip, config.SYN_PORT, timeout, reset=True, syn_sweep=True)
[docs]
def check_host_arp(ip: str, timeout: float = 1.0):
"""
Check if a host is online using :term:`ARP` ``who-has`` requests.
.. note::
This function requires the ability to use raw sockets.
Example ``tcpdump`` output of lookups for ``192.0.2.200`` and ``192.0.2.201``:
.. code-block::
ARP, Request who-has 192.0.2.200 tell 192.0.2.20, length 28
ARP, Request who-has 192.0.2.201 tell 192.0.2.20, length 28
ARP, Reply 192.0.2.200 is-at 00:00:00:00:00:00, length 46
ARP, Reply 192.0.2.201 is-at 00:00:00:00:00:01, length 46
Args:
ip: IPv4 address of host
timeout: Number of seconds to wait for a response
Returns:
If the host is online
"""
if config.DEBUG >= 3:
log.debug(f"Using ARP to check {ip} (timeout: {timeout})")
try:
# This will automatically use the proper interface for the who-has
# request even if the network being queried isn't on the default
# interface.
answers = arping(ip, verbose=0, timeout=timeout, cache=True)[0]
if answers is not None and len(answers) >= 1:
log.trace2(f"ARP check of {ip} succeeded")
return True
except OSError:
log.debug(f"Lacking system permissions to send ARP request to {ip}")
except Exception:
log.exception(f"failed to arping {ip}")
return False
[docs]
def check_host_icmp(ip: str, timeout: float = 1.0):
"""
Check if a host is online using an :term:`ICMP` request (``ping``).
.. note::
This function requires the ability to use raw sockets.
Pings can be accomplished without raw sockets via the ``ping``
command. However, calling a system command is extremely costly,
and would result in dramatically increased scanning times.
Therefore, the use of raw sockets is preferred to ensure the
check finishes in a reasonable amount of time.
Args:
ip: IPv4 address of host
timeout: Number of seconds to wait for a response
Returns:
If the host is online
"""
if config.DEBUG >= 3:
log.debug(f"Using ICMP to check {ip} (timeout: {timeout})")
# Setting ID fields prevents some filtering by routers and firewalls
packet = IP(dst=ip, id=int(RandShort())) / ICMP(id=int(RandShort()))
result = None
try:
# sr1 = "Send/Receive 1"
result = sr1(packet, timeout=timeout, verbose=0)
except OSError:
log.debug(f"Lacking system permissions to send ICMP request to {ip}")
except Exception:
log.exception(f"failed to send ICMP request to {ip}")
if result:
code = result.getlayer(ICMP).code
if code == 0:
log.trace2(f"ICMP check of {ip} succeeded")
return True
log.trace2(f"ICMP request to {ip} failed with code {code} (timeout: {timeout:.2f})")
return False
[docs]
def check_host_syn_sweep(ip: str, ports: list[int], timeout: float = 1.0) -> bool:
"""
Checks if a host is online using TCP SYN requests to a range of ports.
TCP SYN requests are sent to the specified ports, and if the device
responds in any way, it is considered to be "online".
Args:
ip: IPv4 address of host
ports: TCP ports to check
timeout: Number of seconds to wait for a response
Returns:
If the host is online
"""
if config.DEBUG >= 3:
log.debug(f"Using TCP SYNs to check {ip} (timeout: {timeout})")
for port in ports:
try:
if check_tcp_port(ip, port, timeout, reset=True, syn_sweep=True):
log.trace2(f"TCP SYN check of {ip} succeeded")
return True
except Exception as err:
log.debug(f"failed to TCP SYN port {port} on {ip}: {err}")
return False # Critical error, exit out of loop early
return False # All hosts failed
[docs]
def get_reachable_hosts(ip_list: list[str], ports: Iterable[int] | None = None) -> list[str]:
"""
Checks for online hosts.
.. note::
If the ports parameter is specified, then purely TCP SYN requests will
be used. Otherwise, :term:`ARP` and/or :term:`ICMP` requests will be used.
Args:
ip_list: IPv4 addresses to check
ports: Ports to attempt to check if hosts are responding
Returns:
Sorted :class:`list` of IP addresses of hosts that are responding
"""
# Warn if user is forcing pings but we're not able to do them
if config.FORCE_ONLINE_METHOD_PING and not state.raw_socket_capable:
log.warning(
"FORCE_ONLINE_METHOD_PING is enabled but the system "
"is not able to use RAW sockets, the uptime check will "
"likely fail with an exception."
)
elif config.FORCE_ONLINE_METHOD_TCP and state.raw_socket_capable:
log.warning(
"FORCE_ONLINE_METHOD_TCP is enabled when the system is "
"able to use ARP/ICMP, online checks will only use TCP SYNs."
)
hosts = sort_ips(ip_list) # Sort IPs for determinism
valid_hosts: list[str] = []
start_time = timeit.default_timer()
with futures.ThreadPoolExecutor(config.MAX_THREADS) as executor:
if ports:
ports = sorted(ports) # Sort ports for determinism
log.info(
f"Checking online status of {len(hosts)} hosts using "
f"{len(ports)} TCP ports: {str(ports).strip('[]')}"
)
results: list[tuple[futures.Future, str]] = [
(executor.submit(check_host_syn_sweep, ip, ports), ip) for ip in hosts
]
else:
if not config.FORCE_ONLINE_METHOD_TCP:
log.info(
f"Checking online status of {len(hosts)} hosts using ARP and/or ICMP requests"
)
else:
log.warning(
f"Forcing check of {len(hosts)} hosts using TCP SYNs "
f"since FORCE_ONLINE_METHOD_TCP is enabled"
)
results: list[tuple[futures.Future, str]] = [
(executor.submit(check_host, ip), ip) for ip in hosts
]
for res in results:
try:
if res[0].result() is True:
valid_hosts.append(res[1])
except Exception as err:
log.trace(f"Error in get_reachable_hosts: {err}")
continue
time_elapsed = timeit.default_timer() - start_time
if valid_hosts: # Sort the results for determinism
valid_hosts = sort_ips(valid_hosts)
log.info(
f"{len(valid_hosts)} hosts are responding (checked "
f"{len(hosts)} hosts in {utils.fmt_duration(time_elapsed)})"
)
return valid_hosts
__all__ = [
"check_host",
"check_host_arp",
"check_host_icmp",
"check_host_syn_sweep",
"get_reachable_hosts",
]