import os.path
import re
import socket
import subprocess
from collections import namedtuple
from functools import lru_cache
from typing import Final
# NOTE: the reason we use the manuf package instead of Scapy's conf.manufdb
# is the manuf package bundles the manuf file in the package itself instead
# of relying on it being on the system, as well as having a more robust parser.
# TODO (05/06/2024): scapy started bundling manuf in April 2024, maybe use that?
try:
from manuf.manuf import MacParser as MacParserClass
except ImportError:
MacParserClass = None
from scapy.packet import Packet
from peat import config, consts, log, state, utils
from .addresses import clean_mac
MAC_RE_COLON: Final[str] = r"([0-9a-fA-F]{2}(?::[0-9a-fA-F]{2}){5})"
MAC_RE_DASH: Final[str] = r"([0-9a-fA-F]{2}(?:-[0-9a-fA-F]{2}){5})"
IPV4_RE: Final[str] = r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})"
MANUF_PATH = None
MAC_PARSER = None
# Vendor tuple
# Manually define here instead of importing from manuf because manuf
# might not always be present.
Vendor = namedtuple("Vendor", ["manuf", "manuf_long", "comment"])
[docs]
def scapy_human_field(obj: Packet, field: str) -> str:
"""
Returns "human-friendly" version of a Packet field.
"""
return obj.get_field(field).i2repr(obj, getattr(obj, field))
[docs]
@lru_cache(maxsize=1024)
def mac_to_vendor(mac: str) -> Vendor | None:
"""
Lookup the vendor using the :term:`OUI` of a MAC address.
To update the manuf file bundled with peat:
.. code-block:: shell
# On Linux
pdm run update-manuf-linux
# On Windows
pdm run update-manuf-windows
Examples:
.. code-block:: python
>>> from peat.protocols.common import mac_to_vendor
>>> mac_to_vendor("00:30:A7:00:00:01")
Vendor(manuf='SchweitzerEn', manuf_long='Schweitzer Engineering', comment=None)
>>> mac_to_vendor("00:16:3E:00:00:01")
Vendor(manuf='Xensource', manuf_long='Xensource, Inc.', comment=None)
>>> mac_to_vendor("B4:B1:5A:00:00:01")
Vendor(manuf='SiemensEnerg', manuf_long='Siemens AG Energy Management Division', comment=None)
>>> mac_to_vendor("a4:4c:c8:00:00:01")
Vendor(manuf='Dell', manuf_long='Dell Inc.', comment=None)
>>> mac_to_vendor("f8:59:71:00:00:01")
Vendor(manuf='Intel', manuf_long='Intel Corporate', comment=None)
>>> mac_to_vendor("00:50:56:00:00:01")
Vendor(manuf='VMware', manuf_long='VMware, Inc.', comment=None)
Args:
mac: Colon-separated 48-bit (6-octet) MAC address to lookup
Returns:
Vendor information as a :class:`~collections.namedtuple`,
or :obj:`None` if the lookup or the address failed (likely
due to a malformed MAC address).
The vendor object has 3 attributes:
- ``manuf``
- ``manuf_long``
- ``comment``
""" # noqa: E501
if MacParserClass is None:
log.warning(
f"Failed OUI lookup for MAC address {mac}: the 'manuf' package is not installed"
)
return None
global MAC_PARSER
global MANUF_PATH
if not MANUF_PATH:
MANUF_PATH = utils.get_resource(__package__, "manuf")
if not MAC_PARSER:
if config.DEBUG >= 2:
log.debug(f"manuf source: {MANUF_PATH}")
MAC_PARSER = MacParserClass(manuf_name=MANUF_PATH, update=False)
try:
mac = clean_mac(mac) # Cleanup address for lookup
result = MAC_PARSER.get_all(mac)
if not all(x is None for x in result): # Every value will be None if lookup failed
return result
log.warning(f"OUI lookup failed for MAC address '{mac}'")
except ValueError:
log.exception(f"Failed to parse MAC address '{mac}' (likely malformed)")
return None
[docs]
@lru_cache(maxsize=1024)
def mac_to_vendor_string(mac: str) -> str:
"""
Resolve a MAC address to a vendor string for use with "mac_vendor"
field in data model. Simplified wrapper around ``mac_to_vendor()``.
This will use long-form vendor name if resolved,
and fallback to short form if there is no long form.
Args:
mac: Colon-separated 48-bit (6-octet) MAC address to lookup
Returns:
Vendor string, or empty string if lookup failed
"""
if not mac or mac == "00:00:00:00:00:00":
return ""
try:
vendor = mac_to_vendor(mac)
except Exception:
log.exception(f"exception resolving vendor for MAC address '{mac}'")
else:
if vendor and vendor.manuf_long:
return vendor.manuf_long
elif vendor and vendor.manuf:
return vendor.manuf
return ""
[docs]
@lru_cache
def mac_to_ip(mac: str) -> str:
"""
Lookup the IPv4 address for a MAC address.
On Linux (and OSX), the local ARP cache is searched (``/proc/net/arp``).
On Windows, ``arp.exe`` is used.
Args:
mac: MAC address of the device, colon-separated
Returns:
IPv4 address (dotted-decimal), or an empty string
if the IP address could not be determined
"""
if not mac:
return ""
log.trace(f"Getting IP address for {mac}")
if not isinstance(mac, str) or ":" not in mac:
log.critical(f"Invalid MAC address passed to mac_to_ip: {mac}")
state.error = True
return ""
try:
mac = mac.lower()
if consts.WSL or consts.WINDOWS:
ip = _get_ip_from_mac_arpexe(mac)
elif consts.POSIX:
ip = _search_arptable(IPV4_RE + r".*" + re.escape(mac))
else:
log.error("Failed mac_to_ip: unsupported platform")
return ""
except Exception as ex:
log.warning(f"Could not get IP address for MAC '{mac}': {ex}")
return ""
if not ip:
log.debug(f"Failed to find IPv4 address for MAC '{mac}'")
return ip
[docs]
@lru_cache
def ip_to_mac(ip: str) -> str:
"""
Resolve the MAC address for a IPv4 address.
On Linux (and OSX), the local :term:`ARP` cache is searched (``/proc/net/arp``).
On Windows, Scapy is used, with ``arp.exe`` used as a fallback.
.. warning::
On Windows, this may result in a :term:`ARP` request being made on the
local subnet if the PEAT configuration option ``RESOLVE_MAC`` is True.
Args:
ip: IPv4 address of the device (Note: this cannot be a hostname!)
Returns:
MAC address as a colon-delimited string, or an empty string
if the MAC address could not be determined
"""
if not ip:
return ""
if ip in ["0.0.0.0", "127.0.0.1"]:
return ""
log.trace(f"Getting MAC address for {ip}")
if not isinstance(ip, str) or "." not in ip:
log.critical(f"Invalid IPv4 address passed to ip_to_mac: {ip}")
state.error = True
return ""
try:
if consts.WSL:
mac_address = _get_mac_from_ip_arpexe(ip)
elif consts.POSIX:
mac_address = _search_arptable(re.escape(ip) + r".*" + MAC_RE_COLON)
elif consts.WINDOWS:
mac_address = ""
# Use Scapy getmacbyip() to send a ARP request to resolve the MAC,
# if RESOLVE_MAC configuration option is enabled.
#
# Scapy getmacbyip() does not inherently require any special permissions on Windows.
# On POSIX systems, however, it requires root permissions.
#
# cegoes, 09/20/2024:
# on Windows, scapy may need Administrator anyway to be able to
# determine the default route and send a ARP request.
if config.RESOLVE_MAC:
if config.DEBUG >= 3:
log.debug(f"Using Scapy getmacbyip() to get MAC for {ip}")
# import here to avoid triggering a scapy import unless it's needed (slow)
from scapy.layers.l2 import getmacbyip
res: str | tuple[str, float] = getmacbyip(ip)
# Handle weird edge case where Scapy will sometimes return
# a tuple when running as Administrator on Windows.
mac_address: str | None = res[0] if isinstance(res, tuple) else res
if mac_address is None:
mac_address = ""
# 09/20/2024:
# If scapy isn't able to determine default route, it may
# return "ff:ff:ff:ff:ff:ff" as the MAC address.
#
# Scapy will sometimes return a zero MAC address.
if mac_address and mac_address.upper() in [
"00:00:00:00:00:00",
"FF:FF:FF:FF:FF:FF",
]:
mac_address = ""
# Fall back to Windows arp.exe if Scapy fails or ARP lookups disabled
if not mac_address:
mac_address = _get_mac_from_ip_arpexe(ip)
else:
log.error("Failed ip_to_mac: unsupported platform")
return ""
except Exception as ex:
log.warning(f"Could not get MAC address for {ip} due to an error: {ex}")
return ""
# Ensure we return an empty string on failures
if mac_address is None or mac_address == "00:00:00:00:00:00":
mac_address = ""
# MAC was not found
if mac_address == "":
log.debug(f"Failed to find MAC address for {ip}")
# If a MAC was found, convert it to upper-case
elif isinstance(mac_address, str):
mac_address = clean_mac(mac_address)
else:
log.warning(
f"Failed to get MAC address for {ip}: invalid type "
f"'{type(mac_address).__name__}' for MAC address {mac_address}"
)
mac_address = ""
return mac_address
# TODO: do a timed cache for the ARP table lookups.
# https://cachetools.readthedocs.io/en/latest/#cachetools.TTLCache
#
# Currently, if there are 300 IPs being searched for, the ARP file is read
# 300 times, which is really inefficient. Instead, we should
# cache the result and expire it after 15 seconds or so.
# ARP tables usually refresh every 30 or 60 seconds anyway.
#
# This is even more pronounced on Windows, because it's running
# a subprocess vs just reading a file.
def _search_arptable(regex: str) -> str:
"""
Search ``/proc/net/arp`` for an address (IP or MAC) using a regex.
"""
if config.DEBUG >= 3:
log.debug(f"Searching /proc/net/arp using regex: '{regex}'")
if not os.path.exists("/proc/net/arp"):
log.error(
"Failed arp table lookup: /proc/net/arp does not exist, "
"your platform is abnormal or unsupported"
)
return ""
with open("/proc/net/arp") as arpfile:
arp_table = arpfile.read()
if not arp_table:
log.error("No data in /proc/net/arp")
return ""
# Update ARP table in state for debugging purposes
state.arp_table = arp_table
# IP address HW type Flags HW address Mask Device
# 192.0.2.1 0x1 0x2 00:00:00:00:00:01 * ens33
matched = re.search(regex, arp_table)
if matched is not None:
return matched.groups()[0]
if config.DEBUG >= 3:
log.debug(f"Failed to search /proc/net/arp using '{regex}'")
return ""
def _get_ip_from_mac_arpexe(mac: str) -> str:
"""
Lookup IP from a MAC using arp.exe on Windows.
"""
if config.DEBUG >= 3:
log.debug(f"Running 'arp.exe -a' to find IP for {mac}")
arp_output = _get_arpexe_output()
if arp_output:
regex = IPV4_RE + r".*" + re.escape(mac.replace(":", "-"))
m = re.search(regex, arp_output)
if m and m.groups():
return m.groups()[0]
log.debug(f"Failed to get IP using 'arp.exe -a' for {mac}")
return ""
def _get_mac_from_ip_arpexe(ip: str) -> str:
"""
Lookup MAC from a IP using arp.exe on Windows.
"""
if config.DEBUG >= 3:
log.debug(f"Running 'arp.exe -a' to find find MAC for IP {ip}")
arp_output = _get_arpexe_output()
if arp_output:
m = re.search(MAC_RE_DASH, arp_output)
if m and m.groups():
return m.groups()[0].replace("-", ":")
log.debug(f"Failed to get MAC using 'arp.exe -a' for {ip}")
return ""
def _get_arpexe_output() -> str:
# Interface: 192.168.215.1 --- 0x13
# Internet Address Physical Address Type
# 224.0.0.22 00-00-00-00-00-01 static
# 224.0.0.251 00-00-00-00-00-02 static
# 239.255.255.250 00-00-00-00-00-03 static
proc = subprocess.run(["arp.exe", "-a"], stdout=subprocess.PIPE, check=False)
if proc.returncode == 0 and proc.stdout:
arp_output = proc.stdout.decode()
state.arp_table = arp_output
return arp_output
if config.DEBUG >= 3:
log.debug(f"** arp.exe failed, proc object dump below **\n{proc}")
return ""
[docs]
def raw_socket_capable() -> bool:
"""
Determines if PEAT has permissions to use RAW sockets (``SOCK_RAW``).
"""
try:
if consts.LINUX:
socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
else:
socket.socket(socket.AF_INET, socket.SOCK_RAW)
except OSError:
log.debug("Unable to use RAW sockets")
return False
except Exception as ex:
log.warning(
"Unknown exception occurred while checking raw socket "
"capabilities. You might be on an unsupported platform."
)
log.debug(str(ex))
return False
else:
return True
__all__ = [
"ip_to_mac",
"mac_to_ip",
"mac_to_vendor",
"mac_to_vendor_string",
"raw_socket_capable",
]