Source code for peat.protocols.addresses

import itertools
import json
import re
import socket
from functools import lru_cache
from ipaddress import (
    AddressValueError,
    IPv4Address,
    IPv4Network,
    _BaseV4,
    ip_address,
    ip_network,
)
from pathlib import Path

from peat import ParseError, log, state

ADDR_RANGE_REGEX = re.compile(
    r"((\d{1,3}|\d{1,3}-\d{1,3})"
    r"\.(\d{1,3}|\d{1,3}-\d{1,3})"
    r"\.(\d{1,3}|\d{1,3}-\d{1,3})"
    r"\.(\d{1,3}|\d{1,3}-\d{1,3}))",
    re.IGNORECASE | re.ASCII,
)


[docs] def address_to_pathname(address: str) -> str: """ Converts a address or other host identifier to a suitable filepath name. """ return address.strip().lower().replace("/", "_").replace(" ", "_")
[docs] @lru_cache(maxsize=2048) def resolve_hostname_to_ip(hostname: str) -> str: if not hostname: return "" if hostname == "localhost": return "127.0.0.1" try: return socket.gethostbyname(hostname) except OSError as ex: log.debug(f"Failed to resolve hostname '{hostname}': {ex}") return ""
[docs] @lru_cache(maxsize=1024) def resolve_ip_to_hostname(ip: str) -> str: if not ip: return "" if ip == "127.0.0.1": return "localhost" if ip == "0.0.0.0": return "" try: return socket.gethostbyaddr(ip)[0] except (OSError, IndexError) as ex: log.debug(f"Failed to resolve IP '{ip}': {ex}") return ""
[docs] def expand_commas_and_clean_strings( host_list: list[str | bytes | IPv4Address | IPv4Network | Path], ) -> list[str | bytes | IPv4Address | IPv4Network | Path]: """ Expand strings with ',' characters into separate items, convert bytes to str, and remove items that are only whitespace or empty strings. """ expanded = [] for item in host_list: if isinstance(item, (bytes, str)): # Convert bytes to str if isinstance(item, bytes): item = item.decode() item = item.strip() if not item: continue # Expand strings with commas in them if "," in item: for sub_item in item.strip().split(","): sub_item = sub_item.strip() if sub_item: expanded.append(sub_item) continue expanded.append(item) return expanded
[docs] def expand_filenames_to_hosts( host_list: list[str | bytes | IPv4Address | IPv4Network | Path], ) -> list[str | bytes | IPv4Address | IPv4Network]: """ Read host data from any hosts that are filenames, and remove them from the list. """ expanded = [] for list_item in host_list: is_path = False if isinstance(list_item, (bytes, str, Path)): # ensure byte conversion to check filename doesn't affect source data to_test = list_item if isinstance(to_test, bytes): to_test = to_test.decode() # Ensure filenames don't get included in list of hosts # if the file doesn't exist if isinstance(to_test, str) and ( to_test.endswith(".txt") or to_test.endswith(".json") ): is_path = True try: if isinstance(to_test, Path): pth = to_test.resolve() else: pth = Path(to_test).resolve() if pth.is_file(): log.info(f"Reading hosts from file {pth.as_posix()}") is_path = True file_text = pth.read_text(encoding="utf-8") if pth.suffix == ".json": # Note: this assumes JSON data is a list raw_data = json.loads(file_text) else: # text data can be space, tab, or newline delimited raw_data = file_text.split() data = [str(x).strip() for x in raw_data if x] expanded.extend(data) log.info(f"{len(data)} hosts were read from file {pth.name}") elif pth.exists(): log.error(f"Host file '{str(pth)}' exists but isn't a valid file") state.error = True except Exception: log.exception( f"unknown error occurred while checking if hosts " f"are filepaths, host_list={host_list}" ) state.error = True if not is_path and not isinstance(list_item, Path): expanded.append(list_item) return expanded
[docs] def hosts_to_ips( host_list: list[str | bytes | IPv4Address | IPv4Network], ) -> list[str]: """ Converts a list of mixed host strings to a list of unique IPv4 addresses. The mixed host strings can consist of IPv4 addresses in "dotted-decimal" format, IPv4 subnets in :term:`CIDR` notation, Nmap-style IPv4 address ranges, Nmap-style IPv4 network ranges, combinations of ranges, hostnames, and domain names (e.g. a :term:`FQDN`). If a file or set of files is specified, they will be read and the hosts will be added to the list of hosts to parse. Host strings in files can be space, tab, or newline-separated. Basically, PEAT will call ``.split()`` on whatever is in the file. JSON files will be loaded as JSON and treated as an array of strings. Hostnames and domain names will be resolved into IPv4 addresses, and duplicate, malformed, or invalid addresses will be removed. Examples - ``localhost`` (Hostname) - ``docs.python.org`` (Domain name) - ``192.0.2.23`` (Standard dotted-decimal IPv4 address) - ``192.0.2.0/24`` (:term:`CIDR`-specified subnet) - ``192.0.2.20-40`` (Nmap-style host range) - ``192.0.2-3.0`` (Nmap-style network range) - ``192.0.2-9.14-17`` (Combination of network and host ranges) - ``172-192.16-30.80-90.12-14`` (Multiple combinations) - ``targets.txt`` (Text file with hosts) - ``hosts.json`` (JSON file with array of hosts) - ``192.0.2.20,192.0.2.30`` (Comma-separated hosts) .. warning:: Valid addresses that are not generally used, such as multicast or reserved address spaces, will result in warnings emitted to logging and will still be returned in the list of results. Args: host_list: Mixed host strings to convert. These can include IPv4 address strings (dotted-decimal notation), hostnames, subnets (:term:`CIDR` notation), Nmap-style host address ranges, and/or :mod:`ipaddress` objects (:class:`~ipaddress.IPv4Address`/:class:`~ipaddress.IPv4Network`). Returns: List of unique dotted-decimal IPv4 address strings """ if not isinstance(host_list, list): log.error(f"Expected a list of hosts, got '{type(host_list).__name__}'") state.error = True return [] # note: assumes someone isn't trying to pass a filename with a ',' in it commas_expanded = expand_commas_and_clean_strings(host_list) expanded_hosts = expand_filenames_to_hosts(commas_expanded) ipaddress_objects = hosts_to_objs(expanded_hosts) string_addresses = ip_objs_to_ips(ipaddress_objects) return string_addresses
[docs] def hosts_to_objs( host_list: list[str | bytes | IPv4Address | IPv4Network], ) -> list[_BaseV4]: """ Converts a list of mixed host strings into :mod:`ipaddress` objects. Args: host_list: Mixed host strings to convert (refer to :func:`~peat.protocols.addresses.hosts_to_ips` for details) Returns: List of :mod:`ipaddress` objects (:class:`~ipaddress.IPv4Address` and :class:`~ipaddress.IPv4Network`) """ log.trace(f"Converting {len(host_list)} hosts into ipaddress objects") # Use a set to prevent duplicates obj_set = set() # type: set[_BaseV4] for host in host_list: # If it's already an object, just add it if isinstance(host, (IPv4Address, IPv4Network)): obj_set.add(host) elif isinstance(host, (str, bytes)): if host == "all": continue try: ip_obj = host_string_to_objs(host) if isinstance(ip_obj, (set, list)): obj_set.update(ip_obj) else: obj_set.add(ip_obj) except (ValueError, AddressValueError, OSError, socket.gaierror) as err: log.warning(f"Failed to process host string '{host}', skipping...") log.debug(f"Error that occurred for host string '{host}': {err}") else: log.critical( f"Cannot convert '{repr(host)}' to an IP address, " f"invalid type '{type(host).__name__}'" ) state.error = True log.trace(f"Converted {len(host_list)} host strings into {len(obj_set)} ipaddress objects") return list(obj_set) # Convert set to list
[docs] def host_string_to_objs( host_string: str | bytes, strict_network: bool = True ) -> _BaseV4 | set[IPv4Address]: """ Converts a mixed host string into :mod:`ipaddress` object(s). .. code-block:: python :caption: Examples converting strings to IPv4Address objects >>> from pprint import pprint >>> from peat.protocols.addresses import host_string_to_objs >>> pprint(host_string_to_objs("192.168.2-3.142-144")) {IPv4Address('192.168.2.142'), IPv4Address('192.168.2.143'), IPv4Address('192.168.2.144'), IPv4Address('192.168.3.142'), IPv4Address('192.168.3.143'), IPv4Address('192.168.3.144')} >>> pprint(host_string_to_objs("192.168.3.140-142")) {IPv4Address('192.168.3.140'), IPv4Address('192.168.3.141'), IPv4Address('192.168.3.142')} >>> host_string_to_objs("172.16.0.0/30") IPv4Network('172.16.0.0/30') >>> host_string_to_objs("localhost") IPv4Address('127.0.0.1') >>> host_string_to_objs(b"192.168.3.1") IPv4Address('192.168.3.1') .. warning:: The IP range parsing isn't robust and can match bogus values like ``999.353.23-35.22`` or ```000-000.999-999.-1.0`` Args: host_string: Host string to convert (refer to :func:`~peat.protocols.addresses.hosts_to_ips` for details) strict_network: If network parsing is strict, in other words host bits being set in a network address will result in an error if this is :obj:`True`. Returns: Either a single instance or :class:`set` of :mod:`ipaddress` objects (:class:`~ipaddress.IPv4Address` and :class:`~ipaddress.IPv4Network`) """ # Convert bytes to str if isinstance(host_string, bytes): host_string = host_string.decode() # Strip excess whitespace host = host_string.strip().replace(" ", "") # Process as a network/subnet # Example: 192.0.2.0/24 if host.count("/") == 1 and host.count(".") == 3: return ip_network(host, strict=strict_network) # Process as a range of dotted-decimal IPv4 addresses match = None groups = () # '-' is checked first to short-circuit and skip matching for the # common case (e.g. not a range). if "-" in host and host.count(".") == 3: # NOTE: technically this will match bogus values like "999.353.23-35.22" match = ADDR_RANGE_REGEX.fullmatch(host) if match: groups = match.groups() # If there's a '-' and it matched the regex, then it's definitely an IP range # Group 1: "192.0.2-3.140-150" # Group 2-5: "192", "168", "2-3", "140-150" if match and groups and len(groups) == 5 and "-" in groups[0]: # This bit is somewhat complicated... # Essentially, we carve the string into 4 octets, # then convert each of those octets into a iterable. # The iterables are then passed to itertools.product(). processed_octets = [] range_encountered = False for i, octet in enumerate(groups[1:]): if "-" in octet: # 140-145 => range(140, 146) (Python's range() needs +1) start, end = octet.split("-") processed_octets.append(range(int(start), int(end) + 1)) range_encountered = True elif octet == "0" and range_encountered and i != 3: # Any zero-octet occurring after a range is encountered # is converted to a list of 256 integers (0 to 255). # We assume they are all valid hosts since subnet # information is not provided. processed_octets.append(range(256)) # .0 - .255 elif octet == "0" and range_encountered and i == 3: # Exclude subnet and broadcast from the final zero-set processed_octets.append(range(1, 254)) # .1 - .254 else: # Since strings are interpreted as iterables by product(), # we convert them to an integer and make a 1-element list. processed_octets.append([int(octet)]) # Extract addresses from the range addr_set = set() for addr_tuple in itertools.product(*processed_octets): addr_string = ".".join(str(t) for t in addr_tuple) addr_set.add(ip_address(addr_string)) return addr_set # We've eliminated subnets and address ranges, so it's likely a single host try: # Attempt to parse as an IP address ip = ip_address(host) except ValueError: # If it's not a valid IP, let's try and see if it's a hostname. # If it's a hostname or FQDN, the address will be resolved # and returned as a string. Then we create a IPv4 address # object as normal. resolved_address = socket.gethostbyname(host) ip = ip_address(resolved_address) # Warn about host address classes that are likely not valid for attr in ["is_multicast", "is_unspecified", "is_reserved", "is_link_local"]: if getattr(ip, attr) is True: log.warning(f"IP {str(ip)} {attr.replace('_', ' ')}") return ip
[docs] def ip_objs_to_ips(ip_obj_list: list[_BaseV4]) -> list[str]: """ Converts :mod:`ipaddress` objects to unique IPv4 address strings. Args: ip_obj_list: :mod:`ipaddress` objects to convert Returns: Sorted list of unique IPv4 address strings Raises: ParseError: One of the objects is not a :mod:`ipaddress` object instance """ address_set = set() # type: set[str] for ip_obj in ip_obj_list: if isinstance(ip_obj, IPv4Address): # str() cast converts IPAddress object to a unicode string address_set.add(str(ip_obj)) elif isinstance(ip_obj, IPv4Network): # Get all the valid hosts in the subnet for ip in ip_obj.hosts(): address_set.add(str(ip)) else: raise ParseError(f"Invalid IP object type: {type(ip_obj).__name__}") log.trace( f"Converted {len(ip_obj_list)} ipaddress objects " f"into {len(address_set)} IPv4 address strings" ) return sort_ips(address_set) # Convert set to list and sort before returning
[docs] @lru_cache def ip_is_local_interface(ip: str) -> bool: """ Checks if a IP matches any of the local machine's :term:`NIC` IPs. Args: ip: IPv4 address string to check Returns: If the IP matches that of a network interface on the local machine """ for interface in state.local_interface_objects: if str(interface.ip) == ip: return True return False
[docs] @lru_cache(maxsize=1024) def ip_in_local_subnet(ip: str | IPv4Address) -> bool: """ Checks if a IP is in any of the locally connected subnets. .. note:: This function does NOT check that a host actually exists! It only asserts that an address mathematically falls into the range of local subnets connected to the system running PEAT. Args: ip: IPv4 address string to check Returns: If the address is in a locally connected network """ if not isinstance(ip, IPv4Address): ip = IPv4Address(ip) for local_network in state.local_networks: # type: IPv4Network if ip in local_network: return True return False
[docs] @lru_cache def network_is_local(net: IPv4Network) -> bool: """ Checks if a network address space is a subset of a local subnet. This checks to see if the network fits partially into or is equal to any of the subnets connected to the local system. Args: net: IPv4 network to check Returns: If the network address space fits in a local subnet """ return any(net.subnet_of(local) for local in state.local_networks)
[docs] def sort_ips(ip_list: list[str] | set[str]) -> list[str]: """ Sort IPv4 address strings in ascending order by integer IPs. """ return sorted(ip_list, key=lambda x: socket.inet_pton(socket.AF_INET, x))
[docs] @lru_cache def split_ipv4_cidr(addr: str) -> tuple[str, str]: """ Convert subnet mask from :term:`CIDR` bits to full dotted-decimal. Args: addr: IPv4 address with :term:`CIDR` subnet, e.g. ``172.16.0.20/24`` Returns: Tuple with the host IPv4 address (``172.16.0.20``) and dotted-decimal subnet mask (``255.255.255.0``) """ return addr.partition("/")[0], str(IPv4Network(addr, strict=False).netmask)
[docs] def clean_ipv4(addr: str) -> str: """ Strip leading zeros from a IPv4 address e.g. 192.000.002.004 => 192.0.2.4 """ if not addr: return "" return ".".join(str(int(x)) for x in addr.split("."))
[docs] def clean_mac(mac: str) -> str: """ Clean and format MAC address strings. """ if not mac or not mac.strip(): return "" # Replace Windows-style '-' characters with ':' if "-" in mac: mac = mac.replace("-", ":") mac = mac.strip().upper() # Fill in missing characters if len(mac) != 17: new_mac = [] for part in mac.split(":"): if len(part) == 0: # fill in missing zeroes new_mac.append("00") elif len(part) == 1: # add leading zero new_mac.append(f"0{part}") else: new_mac.append(part) mac = ":".join(new_mac) return mac
__all__ = [ "address_to_pathname", "clean_ipv4", "clean_mac", "expand_commas_and_clean_strings", "expand_filenames_to_hosts", "host_string_to_objs", "hosts_to_ips", "hosts_to_objs", "ip_in_local_subnet", "ip_is_local_interface", "ip_objs_to_ips", "network_is_local", "resolve_hostname_to_ip", "resolve_ip_to_hostname", "sort_ips", "split_ipv4_cidr", ]