8.6. General APIs

Functions used by device module implementations and internally by PEAT.

8.6.1. Datastore

class Datastore[source]

Registry of DeviceData instances.

objects: list[DeviceData] = None

DeviceData instances in this datastore.

global_options: dict[str, Any] = None

Options that will be applied and used by all devices in this datastore. Changes to the values in this dict will be reflected in the options of all devices in this datastore.

create(ident, ident_type)[source]

Create a new DeviceData instance, add it to the datastore, and return it.

Return type:

DeviceData

get(ident, ident_type='ip')[source]

Get a DeviceData instance from the datastore.

Warning

If you want to search for an existing device and fail if one isn’t found, then use search() instead.

Examples of datastore.get()
>>> from pprint import pprint
>>> from peat import SCEPTRE, datastore

>>> device = datastore.get("192.0.2.20")
>>> device.ip
'192.0.2.20'

>>> device = datastore.get("192.0.2.123")  
>>> SCEPTRE.pull(device)  
>>> pprint(device.export())  
>>> parsed_name = "relay_1"  # Name extracted from the pulled config
>>> device = datastore.get(parsed_name, "name")  
>>> device.data.name == parsed_name  
True
Parameters:
  • ident (str) -- Identifier to search for, such as 192.168.0.1

  • ident_type (str) -- What ident is, e.g. "serial_port" or "ip"

Return type:

DeviceData

Returns:

The DeviceData object found or a new object if there wasn’t a device found that matched the arguments.

search(ident, ident_type)[source]

Search for the device with a given identifier.

Example: During passive scanning a device was initialized with a MAC which gets resolved to a IP. Later during active scanning, we want to add information to the device. We can look it up by it’s IP, even if it was originally added to the datastore using it’s MAC.

Parameters:
  • ident (str) -- Identifier to search for, such as 192.168.0.1

  • ident_type (str) -- What ident is, e.g. "serial_port" or "ip"

Return type:

DeviceData | None

Returns:

The DeviceData found or None if the search failed

remove(to_remove)[source]

Remove a device from the datastore.

Parameters:

to_remove (DeviceData) -- DeviceData instance to remove

Return type:

bool

Returns:

If the device was successfully found and removed

prune_inactive()[source]

Remove inactive devices from the datastore (dev._is_active == False).

Return type:

None

deduplicate(prune_inactive=True)[source]

Cleanup duplicate devices in the datastore.

Duplicates are only merged if they have the same IP, MAC or serial port.

Any duplicates found are merged into a single DeviceData object, and the additional copies are removed from the list of objects.

Parameters:

prune_inactive (bool) -- If inactive devices should be removed before beginning deduplication

Return type:

None

property verified: list[DeviceData]

Devices that have been verified (dev._is_verified == True).

property device_options: DeepChainMap

Get global options with module defaults and injects applied.

This is a hack, to be sure, but refactoring will take more time than it’s worth.

datastore = <peat.data.store.Datastore object>

Global singleton for managing DeviceData instances and making them available throughout PEAT.

8.6.2. High-level APIs

8.6.2.1. Scan API

portscan(dev, methods, finish_on_first_success=False, full_check_snmp=False)[source]

Check service status on a host using a provided set of methods.

Parameters:
  • dev (DeviceData) -- Device to scan

  • methods (list[tuple[IPMethod, type[DeviceModule]]]) -- List of methods to use. DeviceModule classes can be provided and any methods associated with them will be added to the list of methods to use.

  • finish_on_first_success (bool) -- Return immediately once a method succeeds and skip the remaining methods.

  • full_check_snmp (bool) -- Check SNMP using full protocol messages. If False, simple TCP SYN-RSTs are used.

Return type:

None

unicast_ip_scan(hosts, device_types=None)[source]

Scan a single host directly (“unicast” messages).

Parameters:
Return type:

tuple[dict[str, bool], list[type[DeviceModule]], list[str]] | None

Returns:

Tuple with results for each device as a dict and the

the modules used for the scan as a list. If the scan failed None is returned.

check_host_unicast_ip(ip, modules)[source]

Identifies an unknown device using unicast IP communication.

TCP SYN checks are used if ARP/ICMP scanning is unavailable on the host PEAT is running on (if state.raw_socket_capable is False).

Parameters:
Return type:

bool

Returns:

True if successfully identified, False if identification failed

Raises:

DeviceError -- Critical error occurred during identification

broadcast_scan(targets, device_types=None)[source]

Discover devices via network broadcasts.

Warning

Layer 2 broadcast scanning is not currently supported (as of December 2021)

If the module supports discovery via Layer 2 (e.g. broadcast MAC address or network interface) and/or Layer 3 broadcasts (192.168.0.255, etc.). Multicast is also considered to be a form of broadcast from PEAT’s perspective.

# These should work for "peat pull" as well as scan
peat scan -d clx -b eno2np1
peat scan -d clx -b 192.168.0.255
peat scan -d clx -b 192.168.0.0/24
peat scan -d clx -b 192.168.0.255/24
peat scan -d clx -b 192.168.0.255/24 192.168.0.0/24
peat scan -d clx -b eno2np1 192.168.0.0/24 examples/broadcast_targets.txt
Parameters:
  • targets (list[str]) -- Targets for broadcast scanning. These can be IPv4 subnets, IPv4 broadcast addresses, network interfaces, or paths to text files containing targets (one target per line).

  • device_types (list[str | type[DeviceModule]] | None) -- Module names (strings) or DeviceModule classes (objects) to use for identification

Return type:

tuple[dict[str, bool], list[type[DeviceModule]], list[str]] | None

Returns:

tuple with results for each device as a dict, the

the modules used for the scan as a list, and the resolved targets as a list. If the scan failed None is returned.

serial_scan(serial_ports, device_types=None)[source]

Scan serial ports for devices.

Parameters:
Return type:

tuple[dict[str, bool], list[type[DeviceModule]], list[str]] | None

Returns:

tuple with results for each device as a dict, the

the modules used for the scan as a list, and the resolved targets as a list. If the scan failed None is returned.

check_host_serial(port, modules)[source]

Identifies an unknown device using serial communication.

Parameters:
Return type:

bool

Returns:

True if successfully identified, False if identification failed

Raises:

DeviceError -- Critical error occurred during identification

run_identify(scanning_function, addresses, modules)[source]

Generic function to execute scanning functions in parallel.

This works with IP and serial scanning functions.

Parameters:
Return type:

dict[str, bool]

Returns:

The result of identification for each host as a dict, with keys being host strings and values booleans indicating the result of the identification.

scan(scan_targets, scan_type, device_types=None)[source]

Scan IP networks and hosts and/or serial ports for OT devices.

Parameters:
  • scan_targets (list[str]) -- What to scan, such as network hosts or serial ports

  • scan_type (Literal['unicast_ip', 'broadcast_ip', 'serial']) --

    Communication type of the targets. Allowed values:

    • unicast_ip

    • broadcast_ip

    • serial

  • device_types (list[str | type[DeviceModule]] | None) -- mixed device type strings, alias strings or DeviceModule classes to scan using. If None, all currently imported DeviceModule modules are used.

Return type:

dict[str, dict | list | str | float | int] | None

Returns:

Scan summary as a dict, or None if an error occurred

8.6.2.2. Pull API

pull(targets, comm_type, device_types)[source]

Pull from devices.

Parameters:
  • targets (list[str]) -- Devices to pull from, such as network hosts or serial ports

  • comm_type (Literal['unicast_ip', 'broadcast_ip', 'serial']) --

    Method of communication for the pull. Allowed values:

    • unicast_ip

    • broadcast_ip

    • serial

  • device_types (list[str]) -- Names of device modules or module aliases to use

Return type:

dict[str, dict | list | str | float | int] | None

Returns:

Pull summary as a dict, or None if an error occurred

8.6.2.3. Push API

push(targets, comm_type, device_types, input_source, push_type, skip_scan=False)[source]

Push (upload) configuration or firmware to devices.

Note

By default all targets are scanned and verified, and only the devices that are successfully verified are pushed to. This enables this function to be used without requiring a scan to be done and ensuring pushes are not performed to invalid devices. This behavior can be disabled by setting PUSH_SKIP_SCAN to True or passing --push-skip-scan on the CLI.

Parameters:
  • targets (list[str]) -- Devices to push to, such as network hosts or serial ports

  • comm_type (Literal['unicast_ip', 'broadcast_ip', 'serial']) --

    Method of communication for the push. Allowed values:

    • unicast_ip

    • broadcast_ip

    • serial

  • device_types (list[str]) -- Names of device modules or module aliases to use. If scanning is disabled this should be a single device type.

  • input_source (Path | str) -- Path of the file to push, as a string

  • push_type (Literal['config', 'firmware']) -- Type of push being performed. Valid push types are “config” and “firmware”.

  • skip_scan (bool) -- If device verification (scanning) should be skipped. NOTE: this currently only applies to unicast_ip devices.

Return type:

bool

Returns:

If the push was successful

Raises:

PeatError -- If the push failed due to an issue with configuration or arguments, such as invalid device types or push type.

8.6.2.4. Parse API

parse(filepaths, device_types=None, sub_dirs=True)[source]

Find and parse device and/or project files.

Parameters:
  • filepaths (str | Path | list[str | Path]) -- File or directory paths to parse, or "-" to read from standard input (stdin).

  • device_types (list[type[DeviceModule] | str] | None) -- names, aliases, or classes of PEAT device modules to use. If None, all currently imported modules are used.

  • sub_dirs (bool) -- If sub-directories of a directory path should be searched

Return type:

dict[str, dict | list | str | float | int] | None

Returns:

Pull summary as a dict, or None if an error occurred

parse_data(source, dev_cls)[source]
Return type:

DeviceData | None

find_parsable_files(files, dev_cls)[source]

Find files that are parsable by a device type.

Return type:

list[Path]

8.6.2.5. Pillage API

pillage(source)[source]

peat pillage.

Parameters:
  • config_file -- Path to pillage config file

  • source (str) -- Path of directory to pillage

Return type:

bool

Returns:

If pillaging was successful

validate_image_mounting()[source]

Ensure all the requirements needed to mount an image are satisfied.

In order to mount an image and search, certain conditions must be met:

  • PEAT must be run as root

  • PEAT must be run on a Linux system and not on Windows Subsystem for Linux (WSL)

  • kmodpy Python package must be installed

  • qemu-nbd must be installed and available in the system PATH

Return type:

bool

Returns:

If the requirements for mounting are met

mount_image(source, mount_point)[source]
Return type:

bool

unmount_image(mount_point)[source]
Return type:

bool

remove_nbd_module()[source]
Return type:

bool

remove_nbd_device()[source]
Return type:

bool

list_loaded_modules()[source]
Return type:

list[str]

search(source, results)[source]
Return type:

bool

copy_file(src, dst)[source]
Return type:

None

is_valid_file(_file)[source]

Determine if a file is valid based on the search criteria in the Pillage configuration.

check_file_conditions(_file, conditions)[source]
Return type:

tuple

8.6.3. Network Discovery

Discovery of hosts on a Ethernet network (commonly called “scanning”).

Raw Sockets

Several functions require the use of raw sockets. On Linux, these require either root permissions, or the cap_net_raw capability applied to the Python interpreter. On Windows, the program must be running with Administrator permissions, usually from an elevated command prompt.

Further reading about getting raw socket permissions on Linux

check_host(ip, timeout=1.0, icmp_fallback_tcp_syn=None)[source]

Checks if a network host is online.

Note

Requests to 127.0.0.1 will always return True

Parameters:
  • ip (str) -- IPv4 address of host

  • timeout (float) -- Number of seconds to wait for a response

  • icmp_fallback_tcp_syn (bool | None) -- In the case of a ICMP failure, fallback to attempting a TCP SYN RST to check if the host is online. The edge case is this: if we are able to use raw sockets and the host is NOT in a local subnet, then ICMP requests will be used. Certain devices (such as the SEL RTAC) and many firewalls drop ICMP requests. We still want to check if the host is online, therefore we use TCP as a fallback. If the device is sensitive to TCP RST’s, then this argument should be set to False. The TCP port used is configured by the SYN_PORT PEAT configuration option (e.g. export PEAT_SYN_PORT=80).

Return type:

bool

Returns:

If the host is online

check_host_arp(ip, timeout=1.0)[source]

Check if a host is online using ARP who-has requests.

Note

This function requires the ability to use raw sockets.

Example tcpdump output of lookups for 192.0.2.200 and 192.0.2.201:

ARP, Request who-has 192.0.2.200 tell 192.0.2.20, length 28
ARP, Request who-has 192.0.2.201 tell 192.0.2.20, length 28
ARP, Reply 192.0.2.200 is-at 00:00:00:00:00:00, length 46
ARP, Reply 192.0.2.201 is-at 00:00:00:00:00:01, length 46
Parameters:
  • ip (str) -- IPv4 address of host

  • timeout (float) -- Number of seconds to wait for a response

Returns:

If the host is online

check_host_icmp(ip, timeout=1.0)[source]

Check if a host is online using an ICMP request (ping).

Note

This function requires the ability to use raw sockets.

Pings can be accomplished without raw sockets via the ping command. However, calling a system command is extremely costly, and would result in dramatically increased scanning times. Therefore, the use of raw sockets is preferred to ensure the check finishes in a reasonable amount of time.

Parameters:
  • ip (str) -- IPv4 address of host

  • timeout (float) -- Number of seconds to wait for a response

Returns:

If the host is online

check_host_syn_sweep(ip, ports, timeout=1.0)[source]

Checks if a host is online using TCP SYN requests to a range of ports.

TCP SYN requests are sent to the specified ports, and if the device responds in any way, it is considered to be “online”.

Parameters:
  • ip (str) -- IPv4 address of host

  • ports (list[int]) -- TCP ports to check

  • timeout (float) -- Number of seconds to wait for a response

Return type:

bool

Returns:

If the host is online

get_reachable_hosts(ip_list, ports=None)[source]

Checks for online hosts.

Note

If the ports parameter is specified, then purely TCP SYN requests will be used. Otherwise, ARP and/or ICMP requests will be used.

Parameters:
  • ip_list (list[str]) -- IPv4 addresses to check

  • ports (Iterable[int] | None) -- Ports to attempt to check if hosts are responding

Return type:

list[str]

Returns:

Sorted list of IP addresses of hosts that are responding

8.6.4. Address Parsing

address_to_pathname(address)[source]

Converts a address or other host identifier to a suitable filepath name.

Return type:

str

resolve_hostname_to_ip(hostname)[source]
Return type:

str

resolve_ip_to_hostname(ip)[source]
Return type:

str

expand_commas_and_clean_strings(host_list)[source]

Expand strings with ‘,’ characters into separate items, convert bytes to str, and remove items that are only whitespace or empty strings.

Return type:

list[str | bytes | IPv4Address | IPv4Network | Path]

expand_filenames_to_hosts(host_list)[source]

Read host data from any hosts that are filenames, and remove them from the list.

Return type:

list[str | bytes | IPv4Address | IPv4Network]

hosts_to_ips(host_list)[source]

Converts a list of mixed host strings to a list of unique IPv4 addresses.

The mixed host strings can consist of IPv4 addresses in “dotted-decimal” format, IPv4 subnets in CIDR notation, Nmap-style IPv4 address ranges, Nmap-style IPv4 network ranges, combinations of ranges, hostnames, and domain names (e.g. a FQDN).

If a file or set of files is specified, they will be read and the hosts will be added to the list of hosts to parse. Host strings in files can be space, tab, or newline-separated. Basically, PEAT will call .split() on whatever is in the file. JSON files will be loaded as JSON and treated as an array of strings.

Hostnames and domain names will be resolved into IPv4 addresses, and duplicate, malformed, or invalid addresses will be removed.

Examples

  • localhost (Hostname)

  • docs.python.org (Domain name)

  • 192.0.2.23 (Standard dotted-decimal IPv4 address)

  • 192.0.2.0/24 (CIDR-specified subnet)

  • 192.0.2.20-40 (Nmap-style host range)

  • 192.0.2-3.0 (Nmap-style network range)

  • 192.0.2-9.14-17 (Combination of network and host ranges)

  • 172-192.16-30.80-90.12-14 (Multiple combinations)

  • targets.txt (Text file with hosts)

  • hosts.json (JSON file with array of hosts)

  • 192.0.2.20,192.0.2.30 (Comma-separated hosts)

Warning

Valid addresses that are not generally used, such as multicast or reserved address spaces, will result in warnings emitted to logging and will still be returned in the list of results.

Parameters:

host_list (list[str | bytes | IPv4Address | IPv4Network]) -- Mixed host strings to convert. These can include IPv4 address strings (dotted-decimal notation), hostnames, subnets (CIDR notation), Nmap-style host address ranges, and/or ipaddress objects (IPv4Address/IPv4Network).

Return type:

list[str]

Returns:

List of unique dotted-decimal IPv4 address strings

hosts_to_objs(host_list)[source]

Converts a list of mixed host strings into ipaddress objects.

Parameters:

host_list (list[str | bytes | IPv4Address | IPv4Network]) -- Mixed host strings to convert (refer to hosts_to_ips() for details)

Return type:

list[_BaseV4]

Returns:

List of ipaddress objects (IPv4Address and IPv4Network)

host_string_to_objs(host_string, strict_network=True)[source]

Converts a mixed host string into ipaddress object(s).

Examples converting strings to IPv4Address objects
>>> from pprint import pprint
>>> from peat.protocols.addresses import host_string_to_objs
>>> pprint(host_string_to_objs("192.168.2-3.142-144"))
{IPv4Address('192.168.2.142'),
 IPv4Address('192.168.2.143'),
 IPv4Address('192.168.2.144'),
 IPv4Address('192.168.3.142'),
 IPv4Address('192.168.3.143'),
 IPv4Address('192.168.3.144')}
>>> pprint(host_string_to_objs("192.168.3.140-142"))
{IPv4Address('192.168.3.140'),
 IPv4Address('192.168.3.141'),
 IPv4Address('192.168.3.142')}
>>> host_string_to_objs("172.16.0.0/30")
IPv4Network('172.16.0.0/30')
>>> host_string_to_objs("localhost")
IPv4Address('127.0.0.1')
>>> host_string_to_objs(b"192.168.3.1")
IPv4Address('192.168.3.1')

Warning

The IP range parsing isn’t robust and can match bogus values like 999.353.23-35.22 or `000-000.999-999.-1.0

Parameters:
  • host_string (str | bytes) -- Host string to convert (refer to hosts_to_ips() for details)

  • strict_network (bool) -- If network parsing is strict, in other words host bits being set in a network address will result in an error if this is True.

Return type:

_BaseV4 | set[IPv4Address]

Returns:

Either a single instance or set of ipaddress objects (IPv4Address and IPv4Network)

ip_objs_to_ips(ip_obj_list)[source]

Converts ipaddress objects to unique IPv4 address strings.

Parameters:

ip_obj_list (list[_BaseV4]) -- ipaddress objects to convert

Return type:

list[str]

Returns:

Sorted list of unique IPv4 address strings

Raises:

ParseError -- One of the objects is not a ipaddress object instance

ip_is_local_interface(ip)[source]

Checks if a IP matches any of the local machine’s NIC IPs.

Parameters:

ip (str) -- IPv4 address string to check

Return type:

bool

Returns:

If the IP matches that of a network interface on the local machine

ip_in_local_subnet(ip)[source]

Checks if a IP is in any of the locally connected subnets.

Note

This function does NOT check that a host actually exists! It only asserts that an address mathematically falls into the range of local subnets connected to the system running PEAT.

Parameters:

ip (str | IPv4Address) -- IPv4 address string to check

Return type:

bool

Returns:

If the address is in a locally connected network

network_is_local(net)[source]

Checks if a network address space is a subset of a local subnet.

This checks to see if the network fits partially into or is equal to any of the subnets connected to the local system.

Parameters:

net (IPv4Network) -- IPv4 network to check

Return type:

bool

Returns:

If the network address space fits in a local subnet

sort_ips(ip_list)[source]

Sort IPv4 address strings in ascending order by integer IPs.

Return type:

list[str]

split_ipv4_cidr(addr)[source]

Convert subnet mask from CIDR bits to full dotted-decimal.

Parameters:

addr (str) -- IPv4 address with CIDR subnet, e.g. 172.16.0.20/24

Return type:

tuple[str, str]

Returns:

Tuple with the host IPv4 address (172.16.0.20) and dotted-decimal subnet mask (255.255.255.0)

clean_ipv4(addr)[source]

Strip leading zeros from a IPv4 address e.g. 192.000.002.004 => 192.0.2.4

Return type:

str

clean_mac(mac)[source]

Clean and format MAC address strings.

Return type:

str

8.6.5. Common networking functions

class Vendor(manuf, manuf_long, comment)
comment

Alias for field number 2

manuf

Alias for field number 0

manuf_long

Alias for field number 1

scapy_human_field(obj, field)[source]

Returns “human-friendly” version of a Packet field.

Return type:

str

mac_to_vendor(mac)[source]

Lookup the vendor using the OUI of a MAC address.

To update the manuf file bundled with peat:

# On Linux
pdm run update-manuf-linux

# On Windows
pdm run update-manuf-windows

Examples:

>>> from peat.protocols.common import mac_to_vendor
>>> mac_to_vendor("00:30:A7:00:00:01")
Vendor(manuf='SchweitzerEn', manuf_long='Schweitzer Engineering', comment=None)
>>> mac_to_vendor("00:16:3E:00:00:01")
Vendor(manuf='Xensource', manuf_long='Xensource, Inc.', comment=None)
>>> mac_to_vendor("B4:B1:5A:00:00:01")
Vendor(manuf='SiemensEnerg', manuf_long='Siemens AG Energy Management Division', comment=None)
>>> mac_to_vendor("a4:4c:c8:00:00:01")
Vendor(manuf='Dell', manuf_long='Dell Inc.', comment=None)
>>> mac_to_vendor("f8:59:71:00:00:01")
Vendor(manuf='Intel', manuf_long='Intel Corporate', comment=None)
>>> mac_to_vendor("00:50:56:00:00:01")
Vendor(manuf='VMware', manuf_long='VMware, Inc.', comment=None)
Parameters:

mac (str) -- Colon-separated 48-bit (6-octet) MAC address to lookup

Return type:

Vendor | None

Returns:

Vendor information as a namedtuple, or None if the lookup or the address failed (likely due to a malformed MAC address). The vendor object has 3 attributes:

  • manuf

  • manuf_long

  • comment

mac_to_vendor_string(mac)[source]

Resolve a MAC address to a vendor string for use with “mac_vendor” field in data model. Simplified wrapper around mac_to_vendor().

This will use long-form vendor name if resolved, and fallback to short form if there is no long form.

Parameters:

mac (str) -- Colon-separated 48-bit (6-octet) MAC address to lookup

Return type:

str

Returns:

Vendor string, or empty string if lookup failed

mac_to_ip(mac)[source]

Lookup the IPv4 address for a MAC address.

On Linux (and OSX), the local ARP cache is searched (/proc/net/arp). On Windows, arp.exe is used.

Parameters:

mac (str) -- MAC address of the device, colon-separated

Return type:

str

Returns:

IPv4 address (dotted-decimal), or an empty string if the IP address could not be determined

ip_to_mac(ip)[source]

Resolve the MAC address for a IPv4 address.

On Linux (and OSX), the local ARP cache is searched (/proc/net/arp). On Windows, Scapy is used, with arp.exe used as a fallback.

Warning

On Windows, this may result in a ARP request being made on the local subnet if the PEAT configuration option RESOLVE_MAC is True.

Parameters:

ip (str) -- IPv4 address of the device (Note: this cannot be a hostname!)

Return type:

str

Returns:

MAC address as a colon-delimited string, or an empty string if the MAC address could not be determined

raw_socket_capable()[source]

Determines if PEAT has permissions to use RAW sockets (SOCK_RAW).

Return type:

bool

8.6.6. Communication Protocols

8.6.6.1. IP

check_tcp_port(ip, port, timeout=None, reset=False, syn_sweep=False)[source]

Check if a TCP port is open on the given IP address.

By default, this is essentially a bargain-basement nmap -sT (“Connect scan”). It will finalize connections using TCP FIN, which can sometimes result in issues with devices continuing to send data. Therefore, I recommended enabling reset, which closes connections with TCP RST instead of FIN.

In other words (from Stack Overflow FIN vs RST):

  • FIN says: “I finished talking to you, but I’ll still listen to

    everything you have to say until you say that you’re done.”

  • RST says: “There is no conversation. I won’t say anything and

    I won’t listen to anything you say.”

Further reading

Warning

VMware VM interfaces will cause this function to return True for most ports!

Parameters:
  • ip (str) -- IPv4 address of the host to check

  • port (int) -- TCP port number to check

  • timeout (float | None) -- Number of seconds to wait for a response

  • reset (bool) -- Close the connection with [ACK, RST] instead of a [FIN] by setting SO_LINGER timeout to 0. This is generally useful to prevent issues with devices that will respond with data and cause issues (like errno 11 “Resource temporarily unavailable”). This is similar to nmap -sS (“SYN scan”).

  • syn_sweep (bool) -- If connection refused (code 111) should be considered “open”. Commonly used when checking if a host is online and responding. This also changes the amount of logging output to accommodate SYN sweep scanning, which generates a lot of errors.

Return type:

bool

Returns:

If the port is open, or the host is responding (if syn_sweep is set)

check_udp_service(ip, service, port=None, timeout=1.0)[source]

Check if a specific UDP service is listening.

Parameters:
  • ip (str) -- IPv4 address of the host to check

  • service (str) -- Name of the service to check

  • port (int | None) -- UDP port number to check

  • timeout (float) -- Number of seconds to wait for a response

Return type:

bool

Returns:

If the service is listening

fingerprint(ip, port, timeout, payload, finger_func)[source]

Fingerprints (verifies) a UDP device.

The provided socket, ip, port, and byte payload are used in a discovery packet to determine if the device is eligible to be fingerprinted. If so, it then uses the provided fingerprint function object to verify and return details about the device.

Parameters:
  • ip (str) -- IPv4 address of device

  • port (int) -- UDP port to use

  • timeout (float) -- Timeout for function

  • payload (bytes) -- Payload to send in the discovery

  • finger_func (Callable) -- Function to use to verify the device

Return type:

dict | None

Returns:

The device description dict, or None if it failed or there was an error

make_udp_socket(timeout=None, broadcast=False)[source]

Creates and binds a IPv4 UDP socket.

Parameters:
  • timeout (float | None) -- Timeout to set for the socket, in seconds

  • broadcast (bool) -- If the socket should be a broadcast socket

Return type:

socket | None

Returns:

The created UDP socket

send_discovery_packet(sock, ip, port, payload)[source]

Send initial hello packet used to fingerprint the device.

Parameters:
  • sock (socket) -- socket to use

  • ip (str) -- IPv4 address to send packet to

  • port (int) -- Port to send packet to

  • payload (bytes) -- Payload to send

Return type:

bool

Returns:

If the send was successful

err_code_to_str(code)[source]

Translates a socket error code to a human-readable string.

Parameters:

code (int) -- Integer code to lookup

Return type:

str

Returns:

Human-readable form of the error code, such as Operation not permitted or EPERM. If the code is unable to be looked up, the original code will be returned as a string.

8.6.6.2. FTP

class FTP(ip, port=21, timeout=5.0)[source]

Generic wrapper for File Transfer Protocol (FTP) functionality.

property ftp: FTP

ftplib.FTP instance used for interacting with the server.

property address: str

Alias for ip to make code cleaner for some PEAT modules.

disconnect()[source]

Attempt to cleanly disconnect from the device.

Return type:

None

login(user='', passwd='')[source]

Login to the FTP server.

Note

This function should be called only once for each instance, per the documentation for ftplib.

Parameters:
  • user (str) -- Username to login with (default: anonymous)

  • passwd (str) -- Password to login with (default: anonymous@)

Return type:

bool

Returns:

If the login was successful

download_binary(filename, save_path=None, save_to_file=True)[source]

Download a binary file from the FTP server.

Parameters:
  • filename (str) -- Name or path of the file to download. This path must be valid on the server. Relative paths generally work best.

  • save_path (Path | None) -- Path on the local system to save the file to. If not specified then it’s saved to the device’s standard output directory and filename.

  • save_to_file (bool) -- If the data should be automatically written to a file as defined by save_path

Return type:

bytes

Returns:

The binary file data as bytes

download_text(filename, save_path=None, save_to_file=True)[source]

Download a text file from the FTP server.

Parameters:
  • filename (str) -- Name or path of the file to download. This path must be valid on the server. Relative paths generally work best.

  • save_path (Path | None) -- Path on the local system to save the file to. If not specified then it’s saved to the device’s standard output directory and filename.

  • save_to_file (bool) -- If the data should be automatically written to a file as defined by save_path

Return type:

str

Returns:

Text file data as a str

find_file(check_for, ext, directory=None)[source]
Return type:

str | None

upload_text(filename, content)[source]
Return type:

None

upload_binary(filename, content)[source]
Return type:

None

cmd(command)[source]

Execute a raw FTP command.

Return type:

str | None

cd(directory)[source]

Change directory on the server (cwd).

Return type:

bool

cwd(directory)[source]
Return type:

bool

pwd()[source]

Get the current working directory on the server.

Return type:

str | None

mkdir(directory)[source]

Create a directory on the server.

Return type:

bool

rmdir(directory)[source]

Remove a directory on the server.

Return type:

bool

file_size(filename)[source]

Get the size of a file on the server.

Return type:

int | None

file_delete(filename)[source]

Remove a file from the server.

Return type:

int | None

file_rename(filename, new_name)[source]

Rename a file on the server.

Return type:

int | None

dir(directory=None)[source]

List files on the FTP server, including file metadata (dir command).

This returns two objects:

  • List of filenames

  • List of dicts with detailed information about each file,

    including type of file, modification time, and size.

Return type:

tuple[list[str], list[dict]] | None

Returns:

tuple with list of filenames and list of dicts with file metadata, or None if the command failed.

rdir(directory=None, _paths_done=None)[source]

Recursively lists files on the server and parses metadata about those files (the dir command).

This calls recursively calls dir() on any directories, and returns the consolidated output of the calls. Refer to that method’s docstring for further details about the returned data.

Return type:

tuple[list[str], list[dict]] | None

Returns:

Tuple with list of filenames and list of dicts with file metadata, or None if the command failed.

download_files(local_dir, files)[source]

Download files from a device to a directory on the local system.

The file structure of the local files will match that on the device, if possible.

Warning

On Windows, there are restrictions on characters allowed in paths, so the paths may vary on that platform.

Parameters:
  • local_dir (Path) -- Path to local directory to save downloaded files to.

  • files (list[dict]) -- Listing of files to download, as returned from dir() or rdir().

nlst(directory=None)[source]

nlst command to list files on the server.

Return type:

list[str] | None

nlst_files(directory=None)[source]

List files in a directory on the device using NLST.

This is a wrapper around nlst().

Parameters:

directory (str | None) -- Case-insensitive name of directory to list the contents of. If None or empty string, the current directory is listed.

Return type:

list[str]

Returns:

List of names of files in the directory.

list_command(directory)[source]

Get list of files with file type, permission, and timestamp information.

This uses the LIST FTP command directly, instead of nlst or dir.

Parameters:

directory (str) -- Case-insensitive name of directory to list the contents of.

Return type:

list[str]

Returns:

List of names of files in the directory, or empty list if the command failed.

getwelcome()[source]

Return the welcome message sent by the server.

Return type:

str | None

process_vxworks_ftp_welcome(welcome, dev=None)[source]

Extract the VxWorks version from the FTP welcome message.

Parameters:
  • welcome (str) -- FTP welcome message string to parse

  • dev (Optional[DeviceData]) -- DeviceData object to annotate with extracted information If None, no information will be annotated, the version will be returned and nothing else will happen.

Return type:

str | None

Returns:

The version number as a string, or None if the parse failed.

_do(func, *args)[source]
Return type:

list | str | None

8.6.6.3. Telnet

class Telnet(ip, port=23, timeout=5.0)[source]

Telnet functionality for interacting with devices.

Added features:

  • Improved error handling

  • Improved cleanup of connections on exit, even if exceptions happen

  • Logging messages

  • Records all commands and responses and saves to a file

  • Simpler function calls

ENCODING: str = 'utf-8'
PRE_WRITE_SLEEP: float = 0.15
POST_WRITE_SLEEP: float = 0.1
READ_DELAY: float = 0.15
LINE_TERMINATOR: str = '\r\n'
property comm: Telnet

Python Telnet instance used for interacting with the device.

test_connection()[source]

Test connection to the device.

Return type:

bool

disconnect()[source]

Cleanly disconnect from the device.

Return type:

None

write(command, flush=False)[source]

Send a Telnet command to the device.

Parameters:
  • command (bytes | str | int) -- Command to send (this will be automatically encoded)

  • flush (bool) -- If the responses to the command should be dropped

Return type:

None

login(user, passwd)[source]

Login to the telnet device.

Return type:

dict | None

read(delay=None, strip_whitespace=True)[source]

Read all data currently in the telnet response buffer.

This is a stateful operation, so calling this method again will not result in the same information. The results are saved in the all_output class attribute for future access.

Parameters:
  • delay (float | None) -- Seconds to sleep before querying for data

  • strip_whitespace (bool) -- If str.strip() should be called on the results

Return type:

str

Returns:

Decoded data read from the Telnet receive stream

read_until(until, delay=0.15, timeout=None, strip_whitespace=True)[source]

Read the telnet response buffer until the specified string is reached.

This is a stateful operation, so calling this method again will not result in the same information. The results are saved in the all_output class attribute for future access.

Parameters:
  • until (bytes | str) -- String to read all data up to

  • delay (float) -- Seconds to sleep before querying for data

  • timeout (float | None) -- Seconds to wait for the string before timing out

  • None ((if)

  • configuration) (this defaults to the Telnet class's timeout)

  • strip_whitespace (bool) -- If strip() should be called on the results

Return type:

str

Returns:

Decoded data read from the telnet receive stream

_add_data(raw_data, strip_whitespace=True)[source]

Decodes and saves responses from device.

Return type:

str

8.6.6.4. HTTP

class HTTP(ip, port=80, timeout=5.0, dev=None, protocol='')[source]

Basic set of reusable HTTP functionality.

page_cache: dict[str, Response] = {}
DEFAULT_HEADERS: dict = {}
property url: str
property session: Session
property connected: bool
disconnect()[source]
Return type:

None

_save_response_to_file(response, page, url, dev)[source]

Save raw text data from response to disk, even if bad status code.

Return type:

Path | None

get(page='', protocol='', url='', use_cache=True, params=None, auth=None, allow_errors=False, dev=None, timeout=None, **kwargs)[source]

Perform a HTTP GET request and return the response.

Warning

Results of queries for an identical URL are cached by default for a single run of PEAT. If your tool is querying the status or looking for changes within a single run of PEAT, then set use_cache to False.

Note

The response object will have three additional attributes: request_timestamp, response_timestamp, file_path.

Parameters:
  • page (str) -- URL path of the page to get

  • protocol (Literal['http', 'https', '']) -- Name of the protocol to use If empty string (default), the HTTP instance’s “protocol” will be used, if set. Otherwise, it will default to “http”.

  • url (str) -- URL to use instead of the auto-constructed one

  • use_cache (bool) -- If the internal page cache should be used.

  • params (dict | None) -- Additional HTTP parameters to include in the request

  • auth -- Authentication to use for the request (refer to Requests docs)

  • dev (DeviceData | None) -- DeviceData object to save files to

  • timeout (float | None) -- Timeout for the query. If None, the default timeout for this class instance is used instead.

  • kwargs -- Additional keyword arguments that will be passed directly to Requests.get()

Return type:

Response | None

Returns:

The response object, or None if the request failed. The response object will have three additional attributes: request_timestamp, response_timestamp, file_path.

post(url, timeout=None, dev=None, use_cache=False, **kwargs)[source]

Perform a HTTP POST request and return the response.

Note

The response object will have three additional attributes: request_timestamp, response_timestamp, file_path.

Parameters:
  • url (str) -- URL to use for the request

  • timeout (float | None) -- Timeout for the query. If None, the default timeout for this class instance is used instead.

  • dev (DeviceData | None) -- DeviceData object to save files to

  • use_cache (bool) -- If the internal page cache should be used

  • kwargs -- Additional keyword arguments that will be passed directly to Requests.post()

Return type:

Response | None

Returns:

The response object, or None if the request failed. The response object will have three additional attributes: request_timestamp, response_timestamp, file_path.

get_ssl_certificate()[source]

Retrieve and parse the server’s SSL certificate.

Return type:

X509 | None

Returns:

SSL certificate data in Elastic Common Schema (ECS)-compliant format

decode_ssl_certificate(source)[source]

Decode a raw SSL certificate retrieved from a server into a raw dict.

Parameters:

source (str | bytes | Path) -- SSL certificate in string or bytes format, or the file path to a certificate (as a Path object).

Return type:

tuple[dict[str, str | tuple | int], str]

Returns:

Decoded SSL certificate data as a dict

parse_decoded_ssl_certificate(decoded, raw)[source]

Parse a decoded SSL certificate into Elastic Common Schema (ECS) format usable with the x509 data model (X509).

Parameters:
  • decoded (dict[str, str | tuple | int]) -- Decoded SSL certificate, usually obtained from calling decode_ssl_certificate().

  • raw (str) -- The original SSL certificate text

Return type:

X509

Returns:

SSL certificate data in Elastic Common Schema (ECS)-compliant format

static gen_soup(text)[source]

Generate a BeautifulSoup instance from the text using the efficient lxml library if it’s available or html.parser otherwise.

Return type:

BeautifulSoup

Returns:

A bs4.BeautifulSoup instance with the parser set to

the value of peat.consts.BS4_PARSER

static gen_session()[source]

Session with SSL certificate verification disabled and no proxies from environment (e.g. http_proxy/https_proxy).

Return type:

Session

8.6.6.5. SNMP

Simple Network Management Protocol (SNMP) functionality for PEAT.

Under the hood, PEAT uses the lovely open-source PySNMP package (NOTE: as of May 2024, we now use a fork).

The code in this file is essentially a wrapper around pysnmp with error handling, logging, and other additions.

1.3.6.1. = iso.org.dod.internet = Any network device.

Building custom MIBs

Use mibdump.py or build-pysnmp-mib (included with PySNMP) to build a MIB in a format usable by PySNMP for use with PEAT.

NOTE: once the MIBs are generated, make sure they’re included in the PyInstaller spec file that’s used to build the PEAT executable, in distribution/peat.spec.

  • Example 1: mibdump.py --generate-mib-texts SIEMENS-SMI. This should generate a PySNMP module for the SIEMENS-SMI.mib file in ~/.pysnmp/mibs.

  • Example 2: build-pysnmp-mib -o SipOptical.py SipOptical.mib

References/further reading:

Authors:

  • Christopher Goes

class SNMP(ip, port=161, timeout=1.0, community='public', snmp_version=1, mib_paths=None)[source]

Generic wrapper for Simple Network Management Protocol (SNMP) functionality.

get(identity, single_query=True, query_limit=0, walk_whole_mib=False)[source]

Gets the value(s) of a SNMP object.

The SNMP object is referenced by passing a tuple to the identity``argument. This is then passed directly to PySNMP. Additionally, a custom MIB definition can be used by providing an absolute file path to the ``mib_path argument.

The object identity can be one of the following:

  • OID

  • MIB name (this will enumerate the entire MIB)

  • MIB name + object name

  • MIB name + object name + instance number (for an enumeration/table)

  • Any of the above + a custom MIB definition file to use

Examples:

from pathlib import Path
from peat.protocols.snmp import SNMP

snmp = SNMP(ip="192.0.2.1")
# OID
snmp.get("1.3.6.1.2.1.1.1.0")
# "Walk" all values out of a MIB
snmp.get("SNMPv2-MIB", single_query=False)
# MIB + Object
snmp.get(("SNMPv2-MIB", "sysDescr"))
# MIB + Object + Instance
snmp.get(("SNMPv2-MIB", "sysDescr", 0))
# Custom MIB definition file with a Path object (absolute path)
snmp_with_mib = SNMP(ip="192.0.2.1", mib_paths=[Path("/path/myMib.py")])
snmp_with_mib.get(("MyMib", "myObj"))
# Custom MIB definition file with a raw string (absolute path)
snmp_with_mib = SNMP(ip="192.0.2.1", mib_paths=["/path/myMib.py"])
snmp_with_mib.get(("MyMib", "myObj"))
Parameters:
  • identity (str | tuple) -- Tuple used to reference the SNMP object. Refer to function documentation above for details.

  • single_query (bool) -- If True, the OID will be queried once with getCmd. Otherwise, it will be iterated over using nextCmd until there is no more data (for OIDs with “sub values”, e.g. tables or enumerations).

  • query_limit (int) -- If single_query is False, limit the iteration to this number of queries and stop. This is cheap workaround if you don’t have a MIB.

  • walk_whole_mib (bool) -- If all values in a MIB should be walked. This sets lexicographicMode=True in PySNMP nextCmd.

Return type:

list[dict]

Returns:

list of values, or an empty list if the get failed.

Raises:

ValueError -- If a critical error occurred, such as a invalid type being passed

verify(identity, to_find)[source]

Checks if a string is in the response data for an SNMP query.

Parameters:
  • identity (str | tuple) -- Tuple used to reference the SNMP object.

  • find -- String(s) to search the response for

Return type:

bool

Returns:

If the verification succeeded

Raises:

ValueError -- If a critical error occurred, such as a invalid type being passed

snmp_walk(ip, mib_name, mib_src, community='public', timeout=0.5, port=161, snmp_version=1)[source]

Walks an SNMP MIB and returns a dictionary of names and values

Parameters:
  • ip (str) -- IPv4 address of the SNMP device

  • mib_name (str) -- Name of the mib to reference

  • mib_src (str | Path) -- Filesystem path to the MIB file, which must be a PySNMP module (optional)

  • timeout (float) -- Number of seconds to wait for a response

  • community (str) -- SNMP Community string to use

  • port (int) -- Port the device’s SNMP server is listening on

  • snmp_version (int) -- Version of SNMP to use (1 = v1 | 2 = v2c)

Return type:

dict

Returns:

A dictionary of names and values. An empty dict is returned if there was an error.

8.6.6.6. Serial

find_serial_ports(filter_list=None)[source]

Find host serial ports enumerated by the operating system. Filter by specified list, if any.

Return type:

list[str]

open_serial_port(address, baudrate, timeout)[source]

Open the specified serial port.

Since the code where the connection should be closed may not be able to communicate with the code where it needs to be opened, just be robust about closing and re-opening the connection.

Parameters:
  • address (str) -- The address string of the port to close

  • baudrate (int) -- The baud rate

  • timeout (float) -- The timeout in seconds

Return type:

bool

Returns:

True if the port is opened successfully, False otherwise

close_serial_port(address)[source]

Close the specified serial port. Handle closing an un-opened port gracefully.

Parameters:

address (str) -- The address string of the port to close

Return type:

bool

Returns:

True if the port is closed successfully, False otherwise

port_nums_to_addresses(port_values)[source]

Converts a list of mixed port value strings to a list of unique serial port address strings.

>>> import platform
>>> from peat.protocols.serial import port_nums_to_addresses
>>> platform.system()  
'Linux'
>>> port_nums_to_addresses(["0-1", "0", "1-1"])  
['/dev/ttyS0', '/dev/ttyS1', '/dev/ttyUSB0', '/dev/ttyUSB1']
>>> port_nums_to_addresses(["/dev/ttyS0", "/dev/ttyUSB0"])  
['/dev/ttyS0', '/dev/ttyUSB0']
Parameters:

port_values (list[str]) -- Mixed port number or dash-separated range strings to convert

Return type:

list[str]

Returns:

Sorted list of unique platform-specific serial port address strings

platform_port_fmt(num)[source]

Formats an integer into a platform-specific serial port address string.

Parameters:

num (int) -- An integer to format into a platform-specific serial port string

Return type:

list[str]

Returns:

A list of platform-specific serial port strings

isint(s)[source]

If a string is a valid int.

Return type:

bool

parse_baudrates(baudrate_values)[source]

Converts a list of mixed baud rate value strings to a list of standardized baud rates with duplicates removed.

>>> from peat.protocols.serial import parse_baudrates
>>> parse_baudrates(["9600"])
[9600]
>>> parse_baudrates(["9600-115200"])
[9600, 19200, 38400, 57600, 115200]
>>> parse_baudrates(["9600-115200", "57600"])
[9600, 19200, 38400, 57600, 115200]
Parameters:

baudrate_values (list[str]) -- Mixed baud rate number or dash-separated range strings to convert

Return type:

list[int]

Returns:

A list of unique standardized baud rate integers sorted in

reverse order (highest to lowest)

std_b_idx(baudrate)[source]

Convert an arbitrary integer to the appropriate std_b index.

Return type:

int

serial_txn(wr_bytes, address)[source]

Performs a serial transaction (writes and then reads).

Parameters:
  • wr_bytes (bytes) -- The bytes to write

  • address (str) -- The serial port string (/dev/ttyS1, COM1, etc)

Return type:

bytearray | None

Returns:

The bytearray that was read, if any

serial_write(wr_bytes, address)[source]

Writes bytes to an open serial port.

Parameters:
  • wr_bytes (bytes) -- The bytes to write

  • address (str) -- The serial port string (/dev/ttyS1, COM1, etc)

Return type:

int

Returns:

The number of bytes written, or -1 if there was an error

serial_read(address)[source]

Reads bytes from an open serial port.

Parameters:

address (str) -- The serial port string (/dev/ttyS1, COM1, etc)

Return type:

bytearray | None

Returns:

The bytearray that was read, including an empty array if nothing was read but there were no errors, or None if there were errors

pretty_hex_bytes(b)[source]
Return type:

str

handle_scan_serial_exception(port, ex)[source]

Handle pyserial’s serial.SerialException instances.

Return type:

bool

Returns:

True if the exception is regular, False if it’s related to the port being in use by another program or the port not existing on the host.

8.6.6.7. CIP

Scapy packets for the Common Industrial Protocol (CIP) protocol, used by Rockwell Allen Bradley PLCs and other devices.

CIP is encapsulated by the ENIP protocol. In other words, CIP is a subset of ENIP.

Authors

  • Casey Glatter

  • Patrica Schulz

  • Mark Woodard

class ByteLenField(name, default, count_of=None, length_of=None)[source]

Field representing the length of the payload of the packet in one byte.

class AbCIP(_pkt, /, *, cmServiceCode=82, cmReqPathSize=2, cmReqPath=537273345, timeout=1690, messageRequestSize=6, serviceCode=1, lengthReqPath=2, reqPath=536945665, routePathSize=1, reserved=0, routeSegment=1, routeAddr=0)[source]
fields_desc: List[AnyField] = [<ByteEnumField (AbCIP).cmServiceCode>, <ByteField (AbCIP).cmReqPathSize>, <IntField (AbCIP).cmReqPath>, <ShortField (AbCIP).timeout>, <LEShortField (AbCIP).messageRequestSize>, <ByteEnumField (AbCIP).serviceCode>, <ByteField (AbCIP).lengthReqPath>, <IntField (AbCIP).reqPath>, <ByteField (AbCIP).routePathSize>, <ByteField (AbCIP).reserved>, <ByteField (AbCIP).routeSegment>, <ByteField (AbCIP).routeAddr>]
_name = 'CIP'
aliastypes = [<class 'peat.protocols.cip.cip_packets.AbCIP'>, <class 'scapy.packet.Packet'>]
class CIP(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]

Common Industrial Protocol (CIP) packet.

fields_desc: List[AnyField] = [<LEIntField (CIP).handle>, <LEShortField (CIP).timeout>, <LEShortField (CIP).itemCount>, <LEShortField (CIP).dataItemType_1>, <LEShortField (CIP).dataItemLength_1>, <LEShortField (CIP).dataItemType_2>, <LEFieldLenField (CIP).dataItemLength_2>, <ByteEnumField (CIP).service>, <ByteLenField (CIP).requestPathSize>, <StrLenField (CIP).requestPath>, <StrField (CIP).data>]
_name = 'CIP Packet'
aliastypes = [<class 'peat.protocols.cip.cip_packets.CIP'>, <class 'scapy.packet.Packet'>]
class CCM(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]

CIP Connection Manager (CCM) packet.

fields_desc: List[AnyField] = [<LEShortField (CCM).timeoutTicks>, <LEShortField (CCM).messageSize>, <ByteEnumField (CCM).service>, <ByteLenField (CCM).requestPathSize>, <StrLenField (CCM).requestPath>, <StrField (CCM).data>]
_name = 'CCM'
aliastypes = [<class 'peat.protocols.cip.cip_packets.CCM'>, <class 'scapy.packet.Packet'>]

8.6.6.8. ENIP

class EnipDriver(enip_socket, cpu_slot=0, rpi=5000, timeout=44818, backplane=1, cid_ot=b'\\x00\\x00\\x00\\x00', cid_to=b"'\\x04\\x19q", csn=b'\\x8f\\x00', vid=b'M\\x00', vsn=b'qnL\\x0c')[source]

Ethernet/IP (ENIP) protocol handler.

open()[source]

Connect and open a session with the device.

Raises:

EnipCommError -- exception occurred while opening the session

Return type:

bool

close()[source]

Close the session with the device and the underlying socket.

Return type:

bool

Returns:

If the close was successful

Raises:

EnipCommError -- Exception occurred while unregistering the session

send(msg)[source]

Sends a message through the ENIP socket.

Return type:

int

Returns:

Number of bytes sent

Raises:

EnipCommError -- if the send failed

recv()[source]

Receives a message from the ENIP socket.

Return type:

bytes

Returns:

The message received as bytes

Raises:

EnipCommError -- if the received failed

_get_sequence()[source]

Increment and return the sequence number used with connected messages.

Return type:

int

Returns:

The current sequence number

static _build_common_packet_format(message_type, message, addr_type, addr_data=None, timeout=10)[source]

Builds and returns a common message.

Check Volume 2 (page 2.22) of CIP specification for reference.

Return type:

bytes

Returns:

The built message

send_nop()[source]

Send a NOP to the target, gets no reply.

A NOP provides a way for either an originator or target to determine if the TCP connection is still open.

Raises:

EnipCommError -- if the message send failed

Return type:

None

list_identity()[source]

ListIdentity command to locate and identify potential target.

Raises:

EnipCommError -- if the message send failed

Return type:

ENIPListIdentityResponse

list_services()[source]

Returns list of service names returned by ListServices command.

Return type:

list[str]

_get_services()[source]
Return type:

ENIPListServicesResponse

list_interfaces()[source]

Returns list of interfaces returned by ListInterfaces command.

Return type:

list[str]

_get_interfaces()[source]
Return type:

ENIPListServicesResponse

send_rr_data(message)[source]

SendRRData transfer an encapsulated request/reply packet between the originator and target.

Parameters:

message (bytes) -- The message to be send to the target

Returns:

0: (boolean) reply is valid 1: the target CID T->O value from the response

Return type:

a tuple where

Raises:

EnipCommError -- if the message send failed

send_unit_data(message)[source]

SendUnitData send encapsulated connected messages.

Parameters:

message (bytes) -- The message to be sent to the target

Return type:

bytes

Returns:

The reply data

Raises:

EnipCommError -- if the message send failed

send_connected_command(service, path, cmd_data)[source]

Sends a connected command to the device.

Parameters:
  • service (int) -- One byte value indicating the request service

  • path (bytes) -- The path to the specified instance (including path length)

  • cmd_data (bytes) -- Additional command data

Return type:

bytes

Returns:

The reply data

Raises:

EnipCommError -- if the message send failed

register_session()[source]

Register a new session with the communication partner.

Return type:

int

Returns:

The session number

Raises:

EnipCommError -- if the registration message send failed

unregister_session()[source]

Unregister a connection.

Raises:

EnipCommError -- if the unregister message send failed

Return type:

None

forward_open()[source]

CIP implementation of the forward open message.

Refer to ODVA documentation Volume 1 3-5.5.2

Return type:

bool

Returns:

False if any error in the replayed message

Raises:

EnipCommError -- No session was registered before calling forward open

forward_close()[source]

CIP implementation of the forward close message

Each connection opened with the froward open message need to be closed. Refer to ODVA documentation Volume 1 3-5.5.3

Return type:

bool

Returns:

False if any error in the message reply

Raises:

EnipCommError -- No session was registered before calling forward close

class EnipSocket(ip, port, timeout=5.0)[source]

Ethernet/IP (ENIP) socket.

Authors

  • Christopher Goes

connect()[source]

Connect to ENIP socket.

Return type:

bool

Returns:

If connection was successful

Raises:

EnipCommError -- if the socket timed out during connection

close()[source]

Close the Python socket and log files (if debugging).

Return type:

None

send(message)[source]

Send a ENIP message.

Return type:

int

Returns:

Number of bytes sent

Raises:

EnipCommError -- Exception occured while sending the message

receive()[source]

Receive and unpack an ENIP response.

Return type:

bytes

Returns:

The ENIP response as bytes

Raises:

EnipCommError -- Exception occured while recieving the response

_close_log_files()[source]
Return type:

None

_log_protocol_msg(msg, direction)[source]

Logs protocol messages to two files: formatted bytes and raw bytes.

Return type:

None

static _format_bytes_msg(message, info='')[source]
Return type:

str

Scapy packets for the ENIP protocol used by the Allen Bradley PLC.

Authors

  • Casey Glatter

  • Christopher Goes

  • Patrica Schulz

  • Mark Woodard

class LEShortLenField(name, default, count_of=None, length_of=None)[source]

A len field in a 2-byte integer.

class EncapData(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<LEIntField (EncapData).handle>, <LEShortField (EncapData).timeout>, <LEShortField (EncapData).itemCount>, <LEShortField (EncapData).addrItemType>, <LEShortField (EncapData).addrLengthData>, <LEShortField (EncapData).dataItemType>, <LEFieldLenField (EncapData).lengthData>, <StrLenField (EncapData).data>]
_name = 'EncapData for ENIP'
aliastypes = [<class 'peat.protocols.enip.enip_packets.EncapData'>, <class 'scapy.packet.Packet'>]
class GetAttributesAllResponse(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<LEShortEnumField (GetAttributesAllResponse).vendor>, <scapy.fields.MayEnd object>, <LEShortField (GetAttributesAllResponse).productCode>, <ByteField (GetAttributesAllResponse).MajorFirmwareVersion>, <ByteField (GetAttributesAllResponse).MinorFirmwareVersion>, <LEShortField (GetAttributesAllResponse).status>, <LEIntField (GetAttributesAllResponse).serialNumber>, <ByteField (GetAttributesAllResponse).productNameLength>, <scapy.fields.MayEnd object>, <XByteField (GetAttributesAllResponse).state>]
_name = 'GetAttributesAll Response'
_overload_fields = {<class 'peat.protocols.enip.enip_packets.ENIPListIdentityResponse'>: {}}
aliastypes = [<class 'peat.protocols.enip.enip_packets.GetAttributesAllResponse'>, <class 'scapy.packet.Packet'>]
class ENIPListIdentityResponse(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<LEShortField (ENIPListIdentityResponse).itemCount>, <LEShortEnumField (ENIPListIdentityResponse).typeID>, <LEShortField (ENIPListIdentityResponse).length>, <LEShortField (ENIPListIdentityResponse).encapsulationVersion>, <LEShortField (ENIPListIdentityResponse).sinFamily>, <ShortField (ENIPListIdentityResponse).sinPort>, <IPField (ENIPListIdentityResponse).sinAddr>, <LELongField (ENIPListIdentityResponse).sinZero>, <LEShortEnumField (ENIPListIdentityResponse).vendor>, <LEShortEnumField (ENIPListIdentityResponse).productType>, <LEShortField (ENIPListIdentityResponse).productCode>, <ByteField (ENIPListIdentityResponse).MajorFirmwareVersion>, <ByteField (ENIPListIdentityResponse).MinorFirmwareVersion>, <LEShortField (ENIPListIdentityResponse).status>, <LEIntField (ENIPListIdentityResponse).serialNumber>, <ByteField (ENIPListIdentityResponse).productNameLength>, <StrLenField (ENIPListIdentityResponse).productName>, <XByteField (ENIPListIdentityResponse).state>]
_name = 'ENIP ListIdentity Response'
aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPListIdentityResponse'>, <class 'scapy.packet.Packet'>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'peat.protocols.enip.enip_packets.GetAttributesAllResponse'>)]
class ENIPListInterfacesResponseItems(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<LEShortEnumField (ENIPListInterfacesResponseItems).typeID>, <FieldLenField (ENIPListInterfacesResponseItems).itemLength>, <StrLenField (ENIPListInterfacesResponseItems).itemData>]
_name = 'ENIPListInterfacesResponseItems'
aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPListInterfacesResponseItems'>, <class 'scapy.packet.Packet'>]
class ENIPListInterfacesResponse(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<FieldLenField (ENIPListInterfacesResponse).itemCount>, <PacketField (ENIPListInterfacesResponse).listItems>]
_name = 'ENIPListInterfacesResponse'
aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPListInterfacesResponse'>, <class 'scapy.packet.Packet'>]
class ENIPListServicesResponseItems(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<LEShortEnumField (ENIPListServicesResponseItems).typeID>, <LEShortField (ENIPListServicesResponseItems).length>, <LEShortField (ENIPListServicesResponseItems).encapsulationVersion>, <LEShortField (ENIPListServicesResponseItems).sinFamily>, <StrFixedLenField (ENIPListServicesResponseItems).serviceName>]
_name = 'ENIP ListServices Response Items'
aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPListServicesResponseItems'>, <class 'scapy.packet.Packet'>]
class ENIPListServicesResponse(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<FieldLenField (ENIPListServicesResponse).itemCount>, <PacketField (ENIPListServicesResponse).listItems>]
_name = 'ENIP ListServices Response'
aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPListServicesResponse'>, <class 'scapy.packet.Packet'>]
class ENIPRegisterSession(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<LEShortField (ENIPRegisterSession).protocolVersion>, <LEShortField (ENIPRegisterSession).optionsRegisterSession>]
_name = 'ENIP RegisterSession'
aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPRegisterSession'>, <class 'scapy.packet.Packet'>]
class _ENIPBuilder[source]
post_build(packet, payload)[source]
class ENIP(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]

EtherNet/IP packet encapsulation.

If “data” isn’t set, it will be automatically set based on commandCode.

The header is 24 bytes fixed length.

It can be used with TCP. Maybe UDP as well?

fields_desc: List[AnyField] = [<LEShortEnumField (ENIP).commandCode>, <LEShortLenField (ENIP).dataLength>, <LEIntField (ENIP).sessionHandle>, <LEIntEnumField (ENIP).status>, <LongField (ENIP).senderContext>, <LEIntField (ENIP).options>, <scapy.fields.MultipleTypeField object>]
_name = 'EtherNet/IP Request'
aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIP'>, <class 'scapy.packet.Packet'>]

8.6.6.9. PCCC

class PCCCoverCIP(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<ByteField (PCCCoverCIP).serviceCode>, <ByteField (PCCCoverCIP).lengthReqPath>, <IntField (PCCCoverCIP).reqPath>, <ByteField (PCCCoverCIP).lengthReqID>, <ShortField (PCCCoverCIP).vendorID>, <IntField (PCCCoverCIP).serialNum>, <ByteField (PCCCoverCIP).pcccCmd>, <ByteField (PCCCoverCIP).sts>, <ShortField (PCCCoverCIP).tnsw>, <ByteField (PCCCoverCIP).pcccFnc>, <LEIntField (PCCCoverCIP).plcFwPhysAddr>, <ByteField (PCCCoverCIP).readSize>]
aliastypes = [<class 'peat.protocols.pccc.pccc_functions.PCCCoverCIP'>, <class 'scapy.packet.Packet'>]
generate_pccc_packet(pccc_cip_data)[source]
Return type:

ENIP

8.6.7. Utilities/common functionality

convert_to_snake_case(camel_string)[source]

Converts a string from CamelCase to snake_case.

Return type:

str

clean_replace(to_clean, replace_with, chars)[source]
Return type:

str

clean_empty_dirs(to_clean)[source]

Recursively remove all empty directories on the path.

Return type:

None

fmt_duration(seconds)[source]

Format a time duration into a human-readable string.

Wrapper for humanfriendly.format_timespan().

Return type:

str

fmt_size(size_in_bytes)[source]

Convert a integer size into a human-readable string.

Wrapper for humanfriendly.format_size().

Return type:

str

sort(obj)[source]

Sort a dictionary. Returns a new dict (it doesn’t sort in-place).

Return type:

dict

short_pth_str(path, max_parts=4)[source]

Generate a string of a subset of a path.

Parameters:
  • path (Path) -- Path to convert

  • max_parts (int) -- Number of parts of the path to show, including the base. For example, a file path with max_parts=4 will show the filename and 3 parent directories. If the number of parts is less than max_parts then the entire path is shown.

Return type:

str

Returns:

Shortened file path string

move_item(obj, new_position, item)[source]

Move an item in a list to a new position in the list.

Return type:

None

move_file(src, dest)[source]

Move a file, handling duplicates and/or destination directory creation.

If src is a file and dest is a directory, then dest will be changed to be the name of the src file with the original dest as the parent, following the behavior of the cp command.

Return type:

Path

Returns:

The updated path of the file

merge(d1, d2, no_copy=False)[source]

Merges two dictionaries.

Values in d1 take priority over d2. If one of the dictionaries is None, then the other is returned.

Return type:

dict

parse_date(raw_time, year_first=None)[source]

Parse a raw string into a datetime object or ISO 8601 date string.

This leverages dateutil’s fuzzy parser, which can handle a wide array of input time formats.

Warning

This function does not do any special timezone handling. If a timezone is presented, then it’s included in the datetime object’s tzinfo, otherwise it’s timezone-unaware. Ensure you properly convert to UTC!

Note

To get a ISO 8601-formated string, call .isoformat() on the returned datetime object. This will result in a string in the following format: <YYYY-MM-DD>T<HH:MM:SS.mmmmmm>

Parameters:
  • raw_time (str) -- time string to parse, in almost any format (within reason)

  • iso_format -- If the a ISO 8601 format string should be returned instead of a datetime object

  • year_first (bool | None) -- Whether to interpret the first value in an ambiguous 3-integer date (e.g. 01/05/09) as the year. Passed straight to dateutil.parse().

Return type:

datetime | None

Returns:

datetime.datetime object, or None if the parse fails.

get_formatted_platform_str()[source]

Record information about the current system and environment.

Return type:

str

get_debug_string()[source]
Return type:

str

get_resource(package, file)[source]

Unpack and return the path to a resource included with PEAT.

Some examples of these are schemas and definitions for TC6 XML.

The resources are unpacked into a temporary directory. These temporary files are cleaned up when PEAT exits.

Details: https://importlib-resources.readthedocs.io/en/latest/migration.html

Parameters:
  • package (str) -- where the resource is located (use __package__ for this)

  • file (str) -- name of the resource to get

Return type:

str

Returns:

Absolute path to the resource on the filesystem

collect_files(path, sub_dirs=True)[source]

Convert a path into a list of absolute file path strings.

If path is a file, then the returned list will only contain that file.

Parameters:
  • path (Path) -- path a file or a directory to search

  • sub_dirs (bool) -- if sub-directories of a directory path should be searched

Return type:

list[str]

Returns:

Absolute file paths of all files in the path, in the order they were listed by the OS (no sorting)

copy_file(src_path, dst_path, overwrite=False)[source]

Copy and/or rename a file.

Return type:

None

check_file(file, ext=None)[source]

Checks if a path exists and is valid, and returns the Path to it.

Parameters:
  • file (Path | str) -- Path to validate

  • ext (list | str | None) -- Expected file extension(s). If the file doesn’t have any of

  • extension (the)

Return type:

str | Path | None

Returns:

Absolute Path of the file

fix_file_owners(to_fix)[source]

Change owner of file(s) and directorie(s) to actual user running PEAT instead of root.

Return type:

bool

file_perms_to_octal(mode_string)[source]

“rwxrwxr--” => “0774”

Return type:

str

save_results_summary(data, results_type, log_debug=False)[source]

Helper function to write JSON results to a file if the path is configured.

Return type:

None

write_temp_file(data, filename)[source]

Write arbitrary data to a file in the PEAT temporary directory.

Note

The temporary directory is configured via the TEMP_DIR configuration option (config.TEMP_DIR). If this is None, then this function is a No-OP (it does nothing).

Parameters:
Return type:

Path | None

Returns:

The Path of the file the data was written to, or None if there was an error or TEMP_DIR is disabled.

write_file(data, file, overwrite_existing=False, format_json=True, merge_existing=False)[source]

Write data to a file.

Warning

File names MUST be valid for ALL platforms, including Windows! If validation fails, then the name will be sanitized and changed, and therefore will differ from the original name passed to this function!

Note

If the file extension is .json, then the data will saved as canonical JSON. Data will be processed into types that are JSON-compatible prior to conversion. Standard Python types and objects are fine. bytes are decoded as UTF-8.

Note

If the containing directory/directories don’t exist, they will be created before the file is written (same behavior as mkdir -p).

Parameters:
  • data (str | bytes | bytearray | Container | Iterable) -- Content to write to the file

  • file (Path) -- Path of file to write. This path should be an absolute path. Any directories on the path that don’t exist will be created automatically. If not, it will default to the configured OUT_DIR for PEAT.

  • overwrite_existing (bool) -- If existing files should be overwritten (instead of writing to a new file with a numeric extension)

  • format_json (bool) -- If JSON data should be formatted with 4 space indentation

  • merge_existing (bool) -- If the file already exists and is JSON, then read the data from the existing file, merge the new data with it, then overwrite the file with the merged data.

Return type:

bool

Returns:

If the write was successful

dup_path(file)[source]

Handle duplicate files by adding a ‘.<num>’ to the name.

Return type:

Path

calc_hash(source, hash_type='md5')[source]

Calculate the hash of a file, bytes, or str.

Parameters:
  • source (str | bytes | Path) -- Data or filepath to hash

  • hash_type (str) -- Hash algorithm to use. Can be any algorithm in hashlib.

Return type:

str

Returns:

The generated hash as a uppercase string

gen_hashes(source, hash_algorithms=None)[source]

Generate hashes of text, bytes, or the contents of a file.

Parameters:
Return type:

dict[str, str]

Returns:

The generated hashes as a dict keyed

by hash name: {'md5': '...', 'sha1': '...', ...}

utc_now()[source]

This simple helper function ensures a proper timezone-aware UTC-timezone datetime.datetime object is returned.

Further reading: https://blog.miguelgrinberg.com/post/it-s-time-for-a-change-datetime-utcnow-is-now-deprecated

Return type:

datetime

time_now()[source]

Get the current time.

Retrieve the current time in the format specified in consts.

Returns:

The current time

Return type:

str

are_we_superuser()[source]

Checks if PEAT is running with root privileges (effective user ID/euid of 0) or Administrator on Windows.

Return type:

bool

rgetattr(obj, attr, *args)[source]

Recursive version of the builtin getattr().

Return type:

Any

rsetattr(obj, attr, val)[source]

Recursive version of the builtin setattr().

Return type:

Any

deep_get(dictionary, keys, default=None)[source]

Safely read the value of a deeply nested key from nested dict objects.

>>> from peat.utils import deep_get
>>> x = {'1': {'2': {'3': 'hi'}}}
>>> deep_get(x, "1.2.3")
'hi'
Parameters:
  • dictionary (dict) -- Dictionary to search

  • keys (str) -- dot-separated string of keys to query, e.g. "1.2.3" to query a dict like {'1': {'2': {'3': 'hi'}}}

  • default -- Default value to return if the query fails, similar to dict.get()

Return type:

Any

Returns:

The value of the object, or the value of default if the query failed (this is None if unset).

is_ip(to_check)[source]

Check if a string is a IPv4 or IPv6 address.

Return type:

bool

is_email(to_check)[source]

Check if a string is a email address.

Return type:

bool

is_mac(to_check)[source]

Check if a string is a MAC address.

Return type:

bool

8.6.8. Data Utilities

class DeepChainMap(*maps)[source]

Variant of collections.ChainMap that supports edits of nested dict objects.

In PEAT, this is used for providing nested sets of device and protocol options (configurations) with the ability the modify the underlying sources (e.g. a set of global runtime defaults) and still preserve the order of precedence and transparent overriding (e.g. options configured at runtime for a specific device will still override the global defaults, even though the global defaults were also modified at runtime).

Nested objects can override keys at various levels without overriding the parent structure. This is best explained via examples.

>>> from peat.data.data_utils import DeepChainMap
>>> layer1 = {}
>>> layer2 = {"key": 9999}
>>> layer3 = {"deep_object": {"deep_key": "The Deep"}}
>>> deep_map = DeepChainMap(layer1, layer2, layer3)
>>> deep_map["key"]
9999
>>> layer1["key"] = -1111
>>> deep_map["key"]
-1111
>>> layer2["key"]
9999
>>> deep_map["deep_object"]["deep_key"]
'The Deep'
>>> layer1["deep_object"] = {"another_key": "another_value"}
>>> deep_map["deep_object"]["deep_key"]
'The Deep'
>>> deep_map["deep_object"]["another_key"]
'another_value'
to_dict(to_convert=None)[source]

Create a copy of the object as a normal dict.

Return type:

dict

lookup_by_str(container, value, lookup)[source]

String of attribute to search for, e.g. "ip" to lookup interfaces using Interface.ip attribute on the value.

Return type:

int | None

find_position(obj, key, value)[source]

Find if and where an object with a given value is in a list.

Return type:

int | None

match_all(obj_list, value)[source]

Search the list for objects where all values in value match.

Return type:

int | None

strip_empty_and_private(obj, strip_empty=True, strip_private=True)[source]

Recursively removes empty values and keys starting with _.

Return type:

dict

strip_key(obj, bad_key)[source]

Recursively removes all matching keys from a dict. :rtype: dict

Warning

This will NOT strip values out of a list of dict!

only_include_keys(obj, allowed_keys)[source]

Filters any keys that don’t match the allowed list of keys.

Return type:

dict

compare_dicts(d1, d2, keys)[source]
Return type:

bool

dedupe_model_list(current)[source]

Deduplicates a list of BaseModel objects while preserving the original order.

Models that are a subset of another (contains some keys and values) will be merged together and their values combined.

Warning

This function is expensive to call, ~O(n^2 log n) algorithm (in)efficiency. Do not call more than absolutely needed!

Parameters:

current (list[BaseModel]) -- list of models to deduplicate

Return type:

list[BaseModel]

Returns:

List of deduplicated items

none_aware_attrgetter(attrs)[source]

Variant of operator.attrgetter() that handles values that may be None.

Return type:

Callable

sort_model_list(model_list)[source]

In-place sort of a list of models.

The attribute _sort_by_fields on the first model in the list is used to sort the models.

Raises:
  • PeatError -- invalid type in list or _sort_by_fields

  • is undefined on the model being sorted --

Return type:

None

merge_models(dest, source)[source]

Copy values from one model to another.

Return type:

None

8.6.9. Logging Utils

class ElasticLogSink(server_url, index='vedar-logs')[source]
emit(message)[source]
generate_log_dict(record)[source]

Generate a dict with detailed metadata about the log event, for use in Elasticsearch and in JSON-formatted logs.

This information gets used in record[“extra”][“json_log_string”]

terminal_formatter(record)[source]
file_formatter(record)[source]
use_colors()[source]

If colorized terminal output should be used.

Disable colored terminal output if NO_COLOR environment variable is set, per the Command Line Interface Guidelines (CLIG). :rtype: bool

config.NO_COLOR is True if user has disabled via:

  • CLI arg --no-color

  • YAML config no_color: true

  • Environment variable PEAT_NO_COLOR

patch_json_string(record)[source]
setup_logging(file=None, json_file=None, debug_info_file=None)[source]

Configures the logging interface used by everything for output. The third-party package loguru is used for logging.

Verbosity can be configured via config.VERBOSE. Terminal output can be disabled via config.QUIET.

Parameters:
  • file (Path | None) -- File path to write human-readable logs. If a Path object is provided, it is used as an absolute path. If None, standard log file output will be disabled.

  • json_file (Path | None) -- Store JSON formatted logs to a file, in JSON Lines (jsonl) format. These share the same format as Elasticsearch logging, and can be used to reconstruct the vedar-logs Elasticsearch index. If None, JSON logging will be disabled.

  • debug_info_file (Path | None) -- Text file with system info, configuration values, dumps of scapy internal state, and other info that may be helpful for debugging or troubleshooting customer issues.

Return type:

None

Print the logo and Run ID to stderr, colorized if colors are enabled.

Return type:

None

8.6.10. Settings

8.6.10.1. SettingsManager

_env_var_constructor(loader, node)[source]

Implements a custom YAML tag for loading optional environment variables.

If the environment variable is set, returns the value of it. Otherwise, returns None.

Example usage in the YAML configuration:

key: !ENV 'MY_APP_KEY'
_join_var_constructor(loader, node)[source]

Implements a custom YAML tag for concatenating other tags in the document to strings.

This allows for a much more DRY (Don’t Repeat Yourself) configuration file.

class SettingsManager(label, env_prefix, init_env=True)[source]

Stores and manages configuration values from multiple sources.

Typical usage is to subclass this class and configure the possible variables and default values as class attributes, much like dataclasses.

Warning

All class attributes MUST have a type! Otherwise, they will be skipped over and not appear in the list of defaults. This is due to the implementation being a hack on top of __annotations__.

Order of precedence for configurations

  • Runtime changes (example: config.DEBUG = 2), including CLI arguments

  • Environment variables (example: export PEAT_DEBUG=2)

  • Configuration file (YAML or JSON)

  • Default values set in subclasses of this class (example: DEBUG: int = 0)

Precedence is managed by a collections.ChainMap.

Parameters:
  • label (str) -- The type of information being stored

  • env_prefix (str) -- Prefix to use for environment variables

  • init_env (bool) -- Load values from environment variables during object initialization

_load_values(conf, load_to, key_prefix='')[source]

Read and set configuration values from a input dictionary.

Note

Any values that are None are skipped and will NOT be loaded

Parameters:
  • conf (dict[str, Any]) -- Configuration to load

  • load_to (str) -- Where in the Settings object the loaded configuration should be stored. Valid options are: runtime_configs, env_configs, and file_configs.

  • key_prefix (str) -- Optional value to prepend to the keys being looked up. Example use case is for loading environment variables prefixed with PEAT_, where config=dict(os.environ).

Return type:

None

load_from_dict(conf)[source]

Update runtime configuration values from a dictionary.

Parameters:

conf (dict[str, Any]) -- Configuration values to load

Raises:

AttributeError -- If a configuration option in conf is not already defined on the class

Return type:

None

load_from_environment(env_prefix=None)[source]

Update configuration values from environment variables.

Parameters:

env_prefix (str | None) -- String prefixing the environment variable names, e.g. PEAT_ to load variables such as PEAT_DEBUG into DEBUG. If None, then this is set to self.env_prefix.

Raises:

AttributeError -- If a configuration option in the environment is not already defined on the class

Return type:

None

load_from_file(file)[source]

Load stored values from a YAML or JSON file.

Note that these settings can be overridden by environment variables or values set at runtime.

Parameters:

file (Path) -- Path to a YAML or JSON file to load settings from

Return type:

bool

Returns:

If the load was successful

Raises:

AttributeError -- If a configuration option loaded from the file is not already defined on the class

save_to_file(outdir, save_yaml=True, save_json=True)[source]

Save the currently stored values to YAML and JSON files.

Parameters:
  • outdir (Path) -- Directory path to save the files to

  • save_yaml (bool) -- set to False to disable YAML file saving

  • save_json (bool) -- if settings should be saved as JSON

Return type:

None

export()[source]

Current values in a deterministic format that can be exported.

Return type:

dict[str, Any]

Returns:

JSON-serializable dict with uppercase keys, sorted “alphabetically” by key (well, technically UNICODE order).

yaml()[source]

Export the current settings as YAML text.

Return type:

str

json()[source]

Export the current settings as JSON text.

Return type:

str

json_dict(include_none_vals=False)[source]

Convert the current settings to a JSON dictionary.

Return type:

dict[str, Any]

Returns:

The current setting values as a JSON-serializable dict with uppercase keys.

get_serialized_value(item)[source]

Get a configuration value in a JSON-serializable format.

Parameters:

item (str) -- Case-sensitive name of the attribute to get

Return type:

Any

Returns:

The configuration value in a JSON-serializable format

Raises:

KeyError -- If the attribute named by item doesn’t exist

static _serialize_value(value)[source]
Return type:

Any

typecast(key, value)[source]

Convert and store a value as the appropriate Python data type.

Store the variable as the appropriate Python data type, such as bool, int, float, str, Path, list, etc. This converts “0.5” to 0.5, “/home/” to Path(“/home/”), etc.

If the type is properly annotated (e.g. VAR: str = ‘stuff’), then we use the annotation, otherwise try to infer the type from the default value. However, the backup method does not work if the default value is None.

Parameters:
  • key (str) -- Case-sensitive name of the value (what attribute will be changed)

  • value (Any) -- The raw value to typecast (e.g. a string from an environment variable)

Return type:

Any

Returns:

The typecasted value as a valid Python datatype matching the annotation

Raises:

KeyError -- If the attribute named by key doesn’t exist

non_default(key)[source]

If an item was sourced by a non-default method (env, file, runtime).

Parameters:

key (str) -- Name of the item to check

Return type:

bool

Returns:

If the item has a value that overrides the default value. Note that this method will also return False if the key isn’t valid.

is_default_value(key)[source]

If an item’s current value matches the default value.

Parameters:

key (str) -- Name of the item to check

Return type:

bool

Returns:

If the item has a value that matches the default value

Raises:

AttributeError -- If the attribute named by key doesn’t exist

fixup_dirs(new_parent, dir_name, override_all=False)[source]
Return type:

None

8.6.11. Constants

TIMEZONE: str = 'Etc/UTC'

Timezone of the local system

Type:

str

BS4_PARSER: str = 'lxml'

The beautifulsoup4 parser to use

Type:

str

WINDOWS: Final[bool] = False

If the local system is a Windows system (this is FALSE if in WSL)

Type:

bool

POSIX: Final[bool] = True

If the local system is a POSIX system (Linux, OSX, BSD, etc.)

Type:

bool

LINUX: Final[bool] = True

If the local system is Linux-based

Type:

bool

WSL: Final[bool] = False

If the local system is a Windows Subsystem for Linux (WSL) environment

Type:

bool

TIME_FMT: Final[str] = '%Y-%m-%d_%H-%M-%S'

Format to use for timestamp strings

Type:

str

LOG_TIME_FMT: Final[str] = '%Y-%m-%d %H:%M:%S'

TIME_FMT without an underscore, and a colon instead of a dash for H/M/S

Type:

str

START_TIME_UTC: Final[datetime] = datetime.datetime(2026, 2, 7, 5, 32, 29, 141572, tzinfo=datetime.timezone.utc)

UTC time of when PEAT was imported that can be used for time stamping

Type:

datetime

START_TIME_LOCAL: Final[datetime] = datetime.datetime(2026, 2, 7, 5, 32, 29, 141572, tzinfo=datetime.timezone(datetime.timedelta(0), 'UTC'))

Local time of when PEAT was imported that can be used for time stamping

Type:

datetime

START_TIME: Final[str] = '2026-02-07_05-32-29'

Formatted local time (TIME_FMT) of when PEAT was imported (“2022-09-13_15-04-33”)

Type:

str

RUN_ID: Final[int] = 177044234927

Unique “ID” to track all artifacts associated with a single run of PEAT RUN_ID := 12-digit integer (current UTC time + random 2-digit integer) NOTE: previously, the Run ID was 10 digits (changed 02/20/2020)

Type:

int

IANA_IP_PROTOS: Final[dict[int, str]] = {0: 'hopopts', 1: 'icmp', 2: 'igmp', 3: 'ggp', 4: 'ipv4', 5: 'st', 6: 'tcp', 7: 'cbt', 8: 'egp', 9: 'igp', 12: 'pup', 17: 'udp', 22: 'idp', 27: 'rdp', 41: 'ipv6', 43: 'routing', 44: 'fragment', 50: 'esp', 51: 'ah', 58: 'icmpv6', 59: 'none', 60: 'dstopts', 77: 'nd', 78: 'iclfxbm', 103: 'pim', 113: 'pgm', 115: 'l2tp', 132: 'sctp', 255: 'raw', 256: 'max'}

Resolves a IANA IP protocol ID to the name of the protocol

exception PeatError[source]

Generic class for any error raised by PEAT.

exception ParseError[source]

Parsing errors.

exception CommError[source]

Communication errors.

exception DeviceError[source]

Errors related to a device or DeviceModule module.

convert(value)[source]

Recursively convert values into JSON-friendly standard Python types.

Note

set objects are sorted after being converted to a list. The order of a Python set is not defined and therefore may vary between runs. Sorting helps alleviate that somewhat.

Note

This is located in consts.py so it can be used in locations that are not safe to import utils.py from, such as settings_manager.py.

Parameters:

value (Any) -- Value to convert

Return type:

str | bool | int | float | list | dict | None

Returns:

The converted value

get_platform_info()[source]

Collect information about the system PEAT is running on.

Return type:

dict[str, str | int | bool]

sanitize_filepath(path)[source]
Return type:

str

sanitize_filename(filename)[source]

Validate and sanitize filenames to work on all platforms, notably Windows.

This replaces any invalid characters with _.

Return type:

str

gen_random_dev_id()[source]

Generate a randomized device ID.

Format: unknown-dev-<run-id>-<random-integer>

Return type:

str

lower_dict(to_lower, children=True)[source]

Convert keys of a dict and it’s immediate children dicts to lowercase. Dict keys must be strings.

Parameters:

children (bool) -- if values that are dicts should have their keys converted as well

Return type:

dict[str, Any]

str_to_bool(val)[source]

Convert a string representation of truth to True or False.

True values are: ‘y’, ‘yes’, ‘t’, ‘true’, ‘on’, ‘1’, ‘enable’, ‘enabled’, ‘up’

False values are: ‘n’, ‘no’, ‘f’, ‘false’, ‘off’, ‘0’, ‘disable’, ‘disabled’, ‘down’

Parameters:

val (str) -- String to evaluate

Return type:

bool

Returns:

The boolean representation of the string

Raises:

ValueError -- val is anything other than a boolean value

SYSINFO: Final[dict[str, str | int | bool]] = {'arch': '64bit', 'cli_arguments': '-j 4 -b html docs _docs_html', 'cli_exe': '/home/runner/work/PEAT/PEAT/.venv/bin/sphinx-build', 'containerized': False, 'cpu': 'x86_64', 'hostname': 'runnervmwffz4', 'os_family': 'ubuntu', 'os_full': 'Linux 6.11.0-1018-azure (Ubuntu 24.04.3 LTS)', 'os_name': 'Linux', 'os_rel': '6.11.0-1018-azure', 'os_ver': '24.04.3', 'pid': 3026, 'podman': False, 'ppid': 3019, 'python_exe': '/home/runner/work/PEAT/PEAT/.venv/bin/python', 'python_impl': 'CPython', 'python_version': '3.12.12', 'run_id': 177044234927, 'start_time': '2026-02-07 05:32:29', 'tcpdump': '/usr/bin/tcpdump', 'timezone': 'Etc/UTC', 'username': 'runner'}

Information about the system PEAT is running on from get_platform_info()

Type:

dict

8.6.12. CLI internals

run_peat(args, start_time)[source]

CLI main (note: the entrypoint that calls this is in __main__.py).

Return type:

None

oneshot_main(args)[source]

Main logic when running a regular PEAT command, e.g. peat scan.

This is distinct from the other current and future capabilities, such as monitoring or the PEAT HTTP server. “oneshot” refers to the “run and done” (in “one shot”) and non-persistent nature of the traditional PEAT CLI commands.

Return type:

bool

export_device_data(args)[source]

Export data from all devices in the datastore to files and/or Elasticsearch.

Return type:

bool

get_targets(args)[source]

Collect targets and module names from a file or CLI argument.

Return type:

tuple[list[str], Literal['unicast_ip', 'broadcast_ip', 'serial'], list[str]]

parse_scan_summary(summary)[source]

Extract targets, communication method, and PEAT modules from a scan summary.

Return type:

tuple[list[str], Literal['unicast_ip', 'broadcast_ip', 'serial'], list[str]]

read_host_file(host_file)[source]

Parse a scan summary from a file or STDIN into a scan summary dict.

Return type:

dict[str, Any] | None

write_readme()[source]

Generate a README describing the output from PEAT.

This file gets written in ./peat_results/ (or whatever is configured for OUT_DIR).

Return type:

bool

Returns:

If the file was written successfully (or if the file already exists)

8.6.12.1. cli_args

Commandline argument parsing and help. Used by cli_main.py.

build_argument_parser(version='0.0.0')[source]

Builds the argparse parser for parsing CLI commands and arguments.

The ordering of how the arguments are added to the parsers matters. It looks wacky in the code, but it leads to cleaner output for the user.

Return type:

ArgumentParser

parse_peat_arguments(version='ERROR')[source]

Parses command line arguments.

Return type:

Namespace

add_list_module_args(parser_or_group)[source]

Hack to add the module listing arguments to parsers with mutually-exclusive groups that require one argument (scan/pull/push) and normal parsers (parse/pillage/heat).

This also adds the --examples and --all-examples arguments.

Return type:

None

8.6.13. Command Parsers

Parse and/or process the output of various Linux commands and files, such as arp -a, /proc/meminfo, /var/log/messages, etc.

References for /proc (aka, “procfs”):

class NixParserBase[source]

Base class for nix file and command parsers (Linux, VxWorks, etc.).

file: PurePosixPath | None = None
paths: list[PurePosixPath] = []
command: str = ''
commands: list[str] = []
classmethod parse_and_process(to_parse, dev)[source]

Parse the data, then process it into the device data model.

Return type:

bool

classmethod type()[source]
Return type:

str

classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class VarLogMessagesParser[source]

Parse messages from /var/log/messages.

file: PurePosixPath | None = PurePosixPath('/var/log/messages')
MESSAGE_REGEX: str = '(?P<timestamp>\\w+[ \\t]+\\d+[ \\t]+\\d{2}:\\d{2}:\\d{2})[ \\t]+(?P<hostname>\\S+)[ \\t]+(?P<logger>\\S+)\\.(?P<level>\\S+)[ \\t]+(?P<process>[^:]+): (?P<message>.*)'
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

list[dict[str, str]]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class ProcCmdlineParser[source]

Parse output of /proc/cmdline (the kernel’s startup command line arguments). Parses returns dict with the arguments as key-value pairs.

file: PurePosixPath | None = PurePosixPath('/proc/cmdline')
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

dict[str, bool | str]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class ProcCpuinfoParser[source]

Parse output of /proc/cpuinfo and return dict with the formatted data.

file: PurePosixPath | None = PurePosixPath('/proc/cpuinfo')
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

dict[str, list | str]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class ProcMeminfoParser[source]

Parse output of /proc/meminfo and return dict with the formatted data, with integer values in bytes.

file: PurePosixPath | None = PurePosixPath('/proc/meminfo')
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

dict[str, int]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class ProcModulesParser[source]

Parse output of /proc/modules and return a list of the module names.

file: PurePosixPath | None = PurePosixPath('/proc/modules')
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

list[str]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class ProcUptimeParser[source]

Parse /proc/uptime and return the timedelta for how long the system has been up for.

Process sets dev.uptime.

file: PurePosixPath | None = PurePosixPath('/proc/uptime')
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

timedelta

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class ProcNetDevParser[source]

Parse and process /proc/net/dev

file: PurePosixPath | None = PurePosixPath('/proc/net/dev')
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

dict[str, dict[str, int]]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class EtcPasswdParser[source]

Parse /etc/passwd and return the extracted user data.

Process adds the users to dev.users in the device data model.

file: PurePosixPath | None = PurePosixPath('/etc/passwd')
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

list[str]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class DateParser[source]

Parse output of the date command.

This gets timezone information, as well as baseline for what year it is for the purposes of timestamping logs from sources such as /var/log/messages.

command: str = 'date'
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

datetime | None

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class EnvParser[source]

Parse the output of the env command.

Environment variables for current shell session

command: str = 'env'
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

dict[str, str]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class IfconfigParser[source]

Parse output of ifconfig -a command.

Shows all network interfaces. This is usually seen on older Linux distributions that install the net-utils package by default, as well as BusyBox systems.

command: str = 'ifconfig -a'
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

dict[str, dict]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class ArpParser[source]

Parse and process output of arp -a command.

ARP table, shows all known network devices.

command: str = 'arp -a'
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

list[str]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class SshdConfigParser[source]

Parse and process /etc/ssh/sshd_config.

Cleanup the sshd_config to just lines with configs, excluding empty lines and comments.

file: PurePosixPath | None = PurePosixPath('/etc/ssh/sshd_config')
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

list[str]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class HostnameParser[source]

Set dev.hostname to the output of the hostname command

command: str = 'hostname'
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

str

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class LsRecursiveParser[source]

Recursive ls of full file system. This assumes BusyBox’s ls output. Other system’s output may differ.

Command: ls -lenAR /etc /boot /var/log /root /sysopt /sbin /pkg /bin /common /opt /lib

Args:

  • l: one column output

  • e: full date and time

  • n: numeric UIDs and GIDs instead of names

  • A: include files that start with . and exclude the . and .. “files”.

  • R: recurse

command: str = 'ls -lenAR /etc /boot /var/log /root /sysopt /sbin /pkg /bin /common /opt /lib'
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

list[dict]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

class NetstatSocketsVxWorksParser[source]

Parse output of “netstat -anP” command on VxWorks.

-a: more sockets -n: numeric names instead of hostnames resolved -P: show the TID (task ID) that owns the socket

command: str = 'netstat -anP'
classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

list[str]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

convert_filename(to_convert)[source]

Take command string or file path and make it into something that can be saved to disk.

Return type:

str