8.6. General APIs¶
Functions used by device module implementations and internally by PEAT.
8.6.1. Datastore¶
- class Datastore[source]¶
Registry of
DeviceDatainstances.-
objects:
list[DeviceData] = None¶ DeviceDatainstances in this datastore.
-
global_options:
dict[str,Any] = None¶ Options that will be applied and used by all devices in this datastore. Changes to the values in this
dictwill be reflected in the options of all devices in this datastore.
- create(ident, ident_type)[source]¶
Create a new
DeviceDatainstance, add it to the datastore, and return it.- Return type:
- get(ident, ident_type='ip')[source]¶
Get a
DeviceDatainstance from the datastore.Warning
If you want to search for an existing device and fail if one isn’t found, then use
search()instead.Examples ofdatastore.get()¶>>> from pprint import pprint >>> from peat import SCEPTRE, datastore >>> device = datastore.get("192.0.2.20") >>> device.ip '192.0.2.20' >>> device = datastore.get("192.0.2.123") >>> SCEPTRE.pull(device) >>> pprint(device.export()) >>> parsed_name = "relay_1" # Name extracted from the pulled config >>> device = datastore.get(parsed_name, "name") >>> device.data.name == parsed_name True
- Parameters:
- Return type:
- Returns:
The
DeviceDataobject found or a new object if there wasn’t a device found that matched the arguments.
- search(ident, ident_type)[source]¶
Search for the device with a given identifier.
Example: During passive scanning a device was initialized with a MAC which gets resolved to a IP. Later during active scanning, we want to add information to the device. We can look it up by it’s IP, even if it was originally added to the datastore using it’s MAC.
- Parameters:
- Return type:
- Returns:
The
DeviceDatafound orNoneif the search failed
- remove(to_remove)[source]¶
Remove a device from the datastore.
- Parameters:
to_remove (
DeviceData) --DeviceDatainstance to remove- Return type:
- Returns:
If the device was successfully found and removed
- prune_inactive()[source]¶
Remove inactive devices from the datastore (
dev._is_active == False).- Return type:
- deduplicate(prune_inactive=True)[source]¶
Cleanup duplicate devices in the datastore.
Duplicates are only merged if they have the same IP, MAC or serial port.
Any duplicates found are merged into a single
DeviceDataobject, and the additional copies are removed from the list of objects.
- property verified: list[DeviceData]¶
Devices that have been verified (
dev._is_verified == True).
- property device_options: DeepChainMap¶
Get global options with module defaults and injects applied.
This is a hack, to be sure, but refactoring will take more time than it’s worth.
-
objects:
- datastore = <peat.data.store.Datastore object>¶
Global singleton for managing
DeviceDatainstances and making them available throughout PEAT.
8.6.2. High-level APIs¶
8.6.2.1. Scan API¶
- portscan(dev, methods, finish_on_first_success=False, full_check_snmp=False)[source]¶
Check service status on a host using a provided set of methods.
- Parameters:
dev (
DeviceData) -- Device to scanmethods (
list[tuple[IPMethod,type[DeviceModule]]]) -- List of methods to use.DeviceModuleclasses can be provided and any methods associated with them will be added to the list of methods to use.finish_on_first_success (
bool) -- Return immediately once a method succeeds and skip the remaining methods.full_check_snmp (
bool) -- Check SNMP using full protocol messages. If False, simple TCP SYN-RSTs are used.
- Return type:
- unicast_ip_scan(hosts, device_types=None)[source]¶
Scan a single host directly (“unicast” messages).
- Parameters:
hosts (
list[str]) -- Hosts to scan, as a list of dotted-decimal addresses, hostnames, subnets (CIDR notation), address ranges, and/oripaddressobjects (IPv4Address/IPv4Network)device_types (
list[str|type[DeviceModule]] |None) -- Module names (strings) orDeviceModuleclasses (objects) to use for identification
- Return type:
tuple[dict[str,bool],list[type[DeviceModule]],list[str]] |None- Returns:
- check_host_unicast_ip(ip, modules)[source]¶
Identifies an unknown device using unicast IP communication.
TCP SYN checks are used if ARP/ICMP scanning is unavailable on the host PEAT is running on (if
state.raw_socket_capableis False).- Parameters:
ip (
str) -- IPv4 address of the devicemodules (
list[type[DeviceModule]]) --DeviceModuleclasses to use for identification
- Return type:
- Returns:
True if successfully identified, False if identification failed
- Raises:
DeviceError -- Critical error occurred during identification
- broadcast_scan(targets, device_types=None)[source]¶
Discover devices via network broadcasts.
Warning
Layer 2 broadcast scanning is not currently supported (as of December 2021)
If the module supports discovery via Layer 2 (e.g. broadcast MAC address or network interface) and/or Layer 3 broadcasts (192.168.0.255, etc.). Multicast is also considered to be a form of broadcast from PEAT’s perspective.
# These should work for "peat pull" as well as scan peat scan -d clx -b eno2np1 peat scan -d clx -b 192.168.0.255 peat scan -d clx -b 192.168.0.0/24 peat scan -d clx -b 192.168.0.255/24 peat scan -d clx -b 192.168.0.255/24 192.168.0.0/24 peat scan -d clx -b eno2np1 192.168.0.0/24 examples/broadcast_targets.txt
- Parameters:
targets (
list[str]) -- Targets for broadcast scanning. These can be IPv4 subnets, IPv4 broadcast addresses, network interfaces, or paths to text files containing targets (one target per line).device_types (
list[str|type[DeviceModule]] |None) -- Module names (strings) orDeviceModuleclasses (objects) to use for identification
- Return type:
tuple[dict[str,bool],list[type[DeviceModule]],list[str]] |None- Returns:
- serial_scan(serial_ports, device_types=None)[source]¶
Scan serial ports for devices.
- Parameters:
serial_ports (
list[str]) -- Serial ports to scan, as a list of integers or ranges.device_types (
list[str|type[DeviceModule]] |None) -- Module names (strings) orDeviceModuleclasses (objects) to use for identification
- Return type:
tuple[dict[str,bool],list[type[DeviceModule]],list[str]] |None- Returns:
- check_host_serial(port, modules)[source]¶
Identifies an unknown device using serial communication.
- Parameters:
port (
str) -- Serial port to checkmodules (
list[type[DeviceModule]]) --DeviceModuleclasses to use for identification
- Return type:
- Returns:
True if successfully identified, False if identification failed
- Raises:
DeviceError -- Critical error occurred during identification
- run_identify(scanning_function, addresses, modules)[source]¶
Generic function to execute scanning functions in parallel.
This works with IP and serial scanning functions.
- Parameters:
scanning_function (
Callable) -- Identify function object to callmodules (
list[type[DeviceModule]]) --DeviceModuleclasses to use for identification
- Return type:
- Returns:
The result of identification for each host as a
dict, with keys being host strings and values booleans indicating the result of the identification.
- scan(scan_targets, scan_type, device_types=None)[source]¶
Scan IP networks and hosts and/or serial ports for OT devices.
- Parameters:
scan_targets (
list[str]) -- What to scan, such as network hosts or serial portsscan_type (
Literal['unicast_ip','broadcast_ip','serial']) --Communication type of the targets. Allowed values:
unicast_ipbroadcast_ipserial
device_types (
list[str|type[DeviceModule]] |None) -- mixed device type strings, alias strings orDeviceModuleclasses to scan using. IfNone, all currently importedDeviceModulemodules are used.
- Return type:
- Returns:
Scan summary as a
dict, orNoneif an error occurred
8.6.2.2. Pull API¶
- pull(targets, comm_type, device_types)[source]¶
Pull from devices.
- Parameters:
targets (
list[str]) -- Devices to pull from, such as network hosts or serial portscomm_type (
Literal['unicast_ip','broadcast_ip','serial']) --Method of communication for the pull. Allowed values:
unicast_ipbroadcast_ipserial
device_types (
list[str]) -- Names of device modules or module aliases to use
- Return type:
- Returns:
Pull summary as a
dict, orNoneif an error occurred
8.6.2.3. Push API¶
- push(targets, comm_type, device_types, input_source, push_type, skip_scan=False)[source]¶
Push (upload) configuration or firmware to devices.
Note
By default all targets are scanned and verified, and only the devices that are successfully verified are pushed to. This enables this function to be used without requiring a scan to be done and ensuring pushes are not performed to invalid devices. This behavior can be disabled by setting
PUSH_SKIP_SCANto True or passing--push-skip-scanon the CLI.- Parameters:
targets (
list[str]) -- Devices to push to, such as network hosts or serial portscomm_type (
Literal['unicast_ip','broadcast_ip','serial']) --Method of communication for the push. Allowed values:
unicast_ipbroadcast_ipserial
device_types (
list[str]) -- Names of device modules or module aliases to use. If scanning is disabled this should be a single device type.input_source (
Path|str) -- Path of the file to push, as a stringpush_type (
Literal['config','firmware']) -- Type of push being performed. Valid push types are “config” and “firmware”.skip_scan (
bool) -- If device verification (scanning) should be skipped. NOTE: this currently only applies to unicast_ip devices.
- Return type:
- Returns:
If the push was successful
- Raises:
PeatError -- If the push failed due to an issue with configuration or arguments, such as invalid device types or push type.
8.6.2.4. Parse API¶
- parse(filepaths, device_types=None, sub_dirs=True)[source]¶
Find and parse device and/or project files.
- Parameters:
filepaths (
str|Path|list[str|Path]) -- File or directory paths to parse, or"-"to read from standard input (stdin).device_types (
list[type[DeviceModule] |str] |None) -- names, aliases, or classes of PEAT device modules to use. IfNone, all currently imported modules are used.sub_dirs (
bool) -- If sub-directories of a directory path should be searched
- Return type:
- Returns:
Pull summary as a
dict, orNoneif an error occurred
8.6.2.5. Pillage API¶
- validate_image_mounting()[source]¶
Ensure all the requirements needed to mount an image are satisfied.
In order to mount an image and search, certain conditions must be met:
PEAT must be run as
rootPEAT must be run on a Linux system and not on Windows Subsystem for Linux (WSL)
kmodpyPython package must be installedqemu-nbdmust be installed and available in the system PATH
- Return type:
- Returns:
If the requirements for mounting are met
8.6.3. Network Discovery¶
Discovery of hosts on a Ethernet network (commonly called “scanning”).
Raw Sockets
Several functions require the use of raw sockets. On Linux, these require
either root permissions, or the cap_net_raw capability applied to the
Python interpreter. On Windows, the program must be running with Administrator
permissions, usually from an elevated command prompt.
Further reading about getting raw socket permissions on Linux
- check_host(ip, timeout=1.0, icmp_fallback_tcp_syn=None)[source]¶
Checks if a network host is online.
Note
Requests to
127.0.0.1will always returnTrue- Parameters:
ip (
str) -- IPv4 address of hosttimeout (
float) -- Number of seconds to wait for a responseicmp_fallback_tcp_syn (
bool|None) -- In the case of a ICMP failure, fallback to attempting a TCP SYN RST to check if the host is online. The edge case is this: if we are able to use raw sockets and the host is NOT in a local subnet, then ICMP requests will be used. Certain devices (such as the SEL RTAC) and many firewalls drop ICMP requests. We still want to check if the host is online, therefore we use TCP as a fallback. If the device is sensitive to TCP RST’s, then this argument should be set toFalse. The TCP port used is configured by theSYN_PORTPEAT configuration option (e.g.export PEAT_SYN_PORT=80).
- Return type:
- Returns:
If the host is online
- check_host_arp(ip, timeout=1.0)[source]¶
Check if a host is online using ARP
who-hasrequests.Note
This function requires the ability to use raw sockets.
Example
tcpdumpoutput of lookups for192.0.2.200and192.0.2.201:ARP, Request who-has 192.0.2.200 tell 192.0.2.20, length 28 ARP, Request who-has 192.0.2.201 tell 192.0.2.20, length 28 ARP, Reply 192.0.2.200 is-at 00:00:00:00:00:00, length 46 ARP, Reply 192.0.2.201 is-at 00:00:00:00:00:01, length 46
- check_host_icmp(ip, timeout=1.0)[source]¶
Check if a host is online using an ICMP request (
ping).Note
This function requires the ability to use raw sockets.
Pings can be accomplished without raw sockets via the
pingcommand. However, calling a system command is extremely costly, and would result in dramatically increased scanning times. Therefore, the use of raw sockets is preferred to ensure the check finishes in a reasonable amount of time.
- check_host_syn_sweep(ip, ports, timeout=1.0)[source]¶
Checks if a host is online using TCP SYN requests to a range of ports.
TCP SYN requests are sent to the specified ports, and if the device responds in any way, it is considered to be “online”.
8.6.4. Address Parsing¶
- address_to_pathname(address)[source]¶
Converts a address or other host identifier to a suitable filepath name.
- Return type:
- expand_commas_and_clean_strings(host_list)[source]¶
Expand strings with ‘,’ characters into separate items, convert bytes to str, and remove items that are only whitespace or empty strings.
- Return type:
list[str|bytes|IPv4Address|IPv4Network|Path]
- expand_filenames_to_hosts(host_list)[source]¶
Read host data from any hosts that are filenames, and remove them from the list.
- Return type:
list[str|bytes|IPv4Address|IPv4Network]
- hosts_to_ips(host_list)[source]¶
Converts a list of mixed host strings to a list of unique IPv4 addresses.
The mixed host strings can consist of IPv4 addresses in “dotted-decimal” format, IPv4 subnets in CIDR notation, Nmap-style IPv4 address ranges, Nmap-style IPv4 network ranges, combinations of ranges, hostnames, and domain names (e.g. a FQDN).
If a file or set of files is specified, they will be read and the hosts will be added to the list of hosts to parse. Host strings in files can be space, tab, or newline-separated. Basically, PEAT will call
.split()on whatever is in the file. JSON files will be loaded as JSON and treated as an array of strings.Hostnames and domain names will be resolved into IPv4 addresses, and duplicate, malformed, or invalid addresses will be removed.
Examples
localhost(Hostname)docs.python.org(Domain name)192.0.2.23(Standard dotted-decimal IPv4 address)192.0.2.0/24(CIDR-specified subnet)192.0.2.20-40(Nmap-style host range)192.0.2-3.0(Nmap-style network range)192.0.2-9.14-17(Combination of network and host ranges)172-192.16-30.80-90.12-14(Multiple combinations)targets.txt(Text file with hosts)hosts.json(JSON file with array of hosts)192.0.2.20,192.0.2.30(Comma-separated hosts)
Warning
Valid addresses that are not generally used, such as multicast or reserved address spaces, will result in warnings emitted to logging and will still be returned in the list of results.
- Parameters:
host_list (
list[str|bytes|IPv4Address|IPv4Network]) -- Mixed host strings to convert. These can include IPv4 address strings (dotted-decimal notation), hostnames, subnets (CIDR notation), Nmap-style host address ranges, and/oripaddressobjects (IPv4Address/IPv4Network).- Return type:
- Returns:
List of unique dotted-decimal IPv4 address strings
- hosts_to_objs(host_list)[source]¶
Converts a list of mixed host strings into
ipaddressobjects.- Parameters:
host_list (
list[str|bytes|IPv4Address|IPv4Network]) -- Mixed host strings to convert (refer tohosts_to_ips()for details)- Return type:
list[_BaseV4]- Returns:
List of
ipaddressobjects (IPv4AddressandIPv4Network)
- host_string_to_objs(host_string, strict_network=True)[source]¶
Converts a mixed host string into
ipaddressobject(s).Examples converting strings to IPv4Address objects¶>>> from pprint import pprint >>> from peat.protocols.addresses import host_string_to_objs >>> pprint(host_string_to_objs("192.168.2-3.142-144")) {IPv4Address('192.168.2.142'), IPv4Address('192.168.2.143'), IPv4Address('192.168.2.144'), IPv4Address('192.168.3.142'), IPv4Address('192.168.3.143'), IPv4Address('192.168.3.144')} >>> pprint(host_string_to_objs("192.168.3.140-142")) {IPv4Address('192.168.3.140'), IPv4Address('192.168.3.141'), IPv4Address('192.168.3.142')} >>> host_string_to_objs("172.16.0.0/30") IPv4Network('172.16.0.0/30') >>> host_string_to_objs("localhost") IPv4Address('127.0.0.1') >>> host_string_to_objs(b"192.168.3.1") IPv4Address('192.168.3.1')
Warning
The IP range parsing isn’t robust and can match bogus values like
999.353.23-35.22or`000-000.999-999.-1.0- Parameters:
host_string (
str|bytes) -- Host string to convert (refer tohosts_to_ips()for details)strict_network (
bool) -- If network parsing is strict, in other words host bits being set in a network address will result in an error if this isTrue.
- Return type:
_BaseV4|set[IPv4Address]- Returns:
Either a single instance or
setofipaddressobjects (IPv4AddressandIPv4Network)
- ip_in_local_subnet(ip)[source]¶
Checks if a IP is in any of the locally connected subnets.
Note
This function does NOT check that a host actually exists! It only asserts that an address mathematically falls into the range of local subnets connected to the system running PEAT.
- Parameters:
ip (
str|IPv4Address) -- IPv4 address string to check- Return type:
- Returns:
If the address is in a locally connected network
- network_is_local(net)[source]¶
Checks if a network address space is a subset of a local subnet.
This checks to see if the network fits partially into or is equal to any of the subnets connected to the local system.
- Parameters:
net (
IPv4Network) -- IPv4 network to check- Return type:
- Returns:
If the network address space fits in a local subnet
8.6.5. Common networking functions¶
- class Vendor(manuf, manuf_long, comment)¶
- comment¶
Alias for field number 2
- manuf¶
Alias for field number 0
- manuf_long¶
Alias for field number 1
- scapy_human_field(obj, field)[source]¶
Returns “human-friendly” version of a Packet field.
- Return type:
- mac_to_vendor(mac)[source]¶
Lookup the vendor using the OUI of a MAC address.
To update the manuf file bundled with peat:
# On Linux pdm run update-manuf-linux # On Windows pdm run update-manuf-windows
Examples:
>>> from peat.protocols.common import mac_to_vendor >>> mac_to_vendor("00:30:A7:00:00:01") Vendor(manuf='SchweitzerEn', manuf_long='Schweitzer Engineering', comment=None) >>> mac_to_vendor("00:16:3E:00:00:01") Vendor(manuf='Xensource', manuf_long='Xensource, Inc.', comment=None) >>> mac_to_vendor("B4:B1:5A:00:00:01") Vendor(manuf='SiemensEnerg', manuf_long='Siemens AG Energy Management Division', comment=None) >>> mac_to_vendor("a4:4c:c8:00:00:01") Vendor(manuf='Dell', manuf_long='Dell Inc.', comment=None) >>> mac_to_vendor("f8:59:71:00:00:01") Vendor(manuf='Intel', manuf_long='Intel Corporate', comment=None) >>> mac_to_vendor("00:50:56:00:00:01") Vendor(manuf='VMware', manuf_long='VMware, Inc.', comment=None)
- mac_to_vendor_string(mac)[source]¶
Resolve a MAC address to a vendor string for use with “mac_vendor” field in data model. Simplified wrapper around
mac_to_vendor().This will use long-form vendor name if resolved, and fallback to short form if there is no long form.
- mac_to_ip(mac)[source]¶
Lookup the IPv4 address for a MAC address.
On Linux (and OSX), the local ARP cache is searched (
/proc/net/arp). On Windows,arp.exeis used.
- ip_to_mac(ip)[source]¶
Resolve the MAC address for a IPv4 address.
On Linux (and OSX), the local ARP cache is searched (
/proc/net/arp). On Windows, Scapy is used, witharp.exeused as a fallback.Warning
On Windows, this may result in a ARP request being made on the local subnet if the PEAT configuration option
RESOLVE_MACis True.
8.6.6. Communication Protocols¶
8.6.6.1. IP¶
- check_tcp_port(ip, port, timeout=None, reset=False, syn_sweep=False)[source]¶
Check if a TCP port is open on the given IP address.
By default, this is essentially a bargain-basement
nmap -sT(“Connect scan”). It will finalize connections using TCP FIN, which can sometimes result in issues with devices continuing to send data. Therefore, I recommended enabling reset, which closes connections with TCP RST instead of FIN.In other words (from Stack Overflow FIN vs RST):
- FIN says: “I finished talking to you, but I’ll still listen to
everything you have to say until you say that you’re done.”
- RST says: “There is no conversation. I won’t say anything and
I won’t listen to anything you say.”
Further reading
nmap connect scan: https://nmap.org/book/scan-methods-connect-scan.html
SO_LINGER: https://www.nybek.com/blog/2015/03/05/cross-platform-testing-of-so_linger/
SO_LINGER Python: https://stackoverflow.com/a/6440364
FIN vs RST: https://stackoverflow.com/a/13050021
Warning
VMware VM interfaces will cause this function to return
Truefor most ports!- Parameters:
ip (
str) -- IPv4 address of the host to checkport (
int) -- TCP port number to checktimeout (
float|None) -- Number of seconds to wait for a responsereset (
bool) -- Close the connection with [ACK, RST] instead of a [FIN] by setting SO_LINGER timeout to 0. This is generally useful to prevent issues with devices that will respond with data and cause issues (like errno 11 “Resource temporarily unavailable”). This is similar tonmap -sS(“SYN scan”).syn_sweep (
bool) -- If connection refused (code 111) should be considered “open”. Commonly used when checking if a host is online and responding. This also changes the amount of logging output to accommodate SYN sweep scanning, which generates a lot of errors.
- Return type:
- Returns:
If the port is open, or the host is responding (if
syn_sweepis set)
- check_udp_service(ip, service, port=None, timeout=1.0)[source]¶
Check if a specific UDP service is listening.
- fingerprint(ip, port, timeout, payload, finger_func)[source]¶
Fingerprints (verifies) a UDP device.
The provided socket, ip, port, and byte payload are used in a discovery packet to determine if the device is eligible to be fingerprinted. If so, it then uses the provided fingerprint function object to verify and return details about the device.
- send_discovery_packet(sock, ip, port, payload)[source]¶
Send initial hello packet used to fingerprint the device.
8.6.6.2. FTP¶
- class FTP(ip, port=21, timeout=5.0)[source]¶
Generic wrapper for File Transfer Protocol (FTP) functionality.
- property ftp: FTP¶
ftplib.FTPinstance used for interacting with the server.
- login(user='', passwd='')[source]¶
Login to the FTP server.
Note
This function should be called only once for each instance, per the documentation for
ftplib.
- download_binary(filename, save_path=None, save_to_file=True)[source]¶
Download a binary file from the FTP server.
- Parameters:
filename (
str) -- Name or path of the file to download. This path must be valid on the server. Relative paths generally work best.save_path (
Path|None) -- Path on the local system to save the file to. If not specified then it’s saved to the device’s standard output directory and filename.save_to_file (
bool) -- If the data should be automatically written to a file as defined bysave_path
- Return type:
- Returns:
The binary file data as
bytes
- download_text(filename, save_path=None, save_to_file=True)[source]¶
Download a text file from the FTP server.
- Parameters:
filename (
str) -- Name or path of the file to download. This path must be valid on the server. Relative paths generally work best.save_path (
Path|None) -- Path on the local system to save the file to. If not specified then it’s saved to the device’s standard output directory and filename.save_to_file (
bool) -- If the data should be automatically written to a file as defined bysave_path
- Return type:
- Returns:
Text file data as a
str
- dir(directory=None)[source]¶
List files on the FTP server, including file metadata (
dircommand).This returns two objects:
List of filenames
- List of dicts with detailed information about each file,
including type of file, modification time, and size.
- rdir(directory=None, _paths_done=None)[source]¶
Recursively lists files on the server and parses metadata about those files (the
dircommand).This calls recursively calls
dir()on any directories, and returns the consolidated output of the calls. Refer to that method’s docstring for further details about the returned data.
- download_files(local_dir, files)[source]¶
Download files from a device to a directory on the local system.
The file structure of the local files will match that on the device, if possible.
Warning
On Windows, there are restrictions on characters allowed in paths, so the paths may vary on that platform.
- nlst_files(directory=None)[source]¶
List files in a directory on the device using NLST.
This is a wrapper around
nlst().
- list_command(directory)[source]¶
Get list of files with file type, permission, and timestamp information.
This uses the LIST FTP command directly, instead of
nlstordir.
- process_vxworks_ftp_welcome(welcome, dev=None)[source]¶
Extract the VxWorks version from the FTP welcome message.
- Parameters:
welcome (
str) -- FTP welcome message string to parsedev (
Optional[DeviceData]) -- DeviceData object to annotate with extracted information IfNone, no information will be annotated, the version will be returned and nothing else will happen.
- Return type:
- Returns:
The version number as a string, or
Noneif the parse failed.
8.6.6.3. Telnet¶
- class Telnet(ip, port=23, timeout=5.0)[source]¶
Telnet functionality for interacting with devices.
Added features:
Improved error handling
Improved cleanup of connections on exit, even if exceptions happen
Logging messages
Records all commands and responses and saves to a file
Simpler function calls
- property comm: Telnet¶
Python Telnet instance used for interacting with the device.
- read(delay=None, strip_whitespace=True)[source]¶
Read all data currently in the telnet response buffer.
This is a stateful operation, so calling this method again will not result in the same information. The results are saved in the
all_outputclass attribute for future access.
- read_until(until, delay=0.15, timeout=None, strip_whitespace=True)[source]¶
Read the telnet response buffer until the specified string is reached.
This is a stateful operation, so calling this method again will not result in the same information. The results are saved in the all_output class attribute for future access.
- Parameters:
- Return type:
- Returns:
Decoded data read from the telnet receive stream
8.6.6.4. HTTP¶
- class HTTP(ip, port=80, timeout=5.0, dev=None, protocol='')[source]¶
Basic set of reusable HTTP functionality.
- property session: Session¶
- _save_response_to_file(response, page, url, dev)[source]¶
Save raw text data from response to disk, even if bad status code.
- get(page='', protocol='', url='', use_cache=True, params=None, auth=None, allow_errors=False, dev=None, timeout=None, **kwargs)[source]¶
Perform a HTTP
GETrequest and return the response.Warning
Results of queries for an identical URL are cached by default for a single run of PEAT. If your tool is querying the status or looking for changes within a single run of PEAT, then set
use_cachetoFalse.Note
The response object will have three additional attributes:
request_timestamp,response_timestamp,file_path.- Parameters:
page (
str) -- URL path of the page to getprotocol (
Literal['http','https','']) -- Name of the protocol to use If empty string (default), the HTTP instance’s “protocol” will be used, if set. Otherwise, it will default to “http”.url (
str) -- URL to use instead of the auto-constructed oneuse_cache (
bool) -- If the internal page cache should be used.params (
dict|None) -- Additional HTTP parameters to include in the requestauth -- Authentication to use for the request (refer to Requests docs)
dev (
DeviceData|None) -- DeviceData object to save files totimeout (
float|None) -- Timeout for the query. IfNone, the default timeout for this class instance is used instead.kwargs -- Additional keyword arguments that will be passed directly to
Requests.get()
- Return type:
Response|None- Returns:
The response object, or
Noneif the request failed. The response object will have three additional attributes:request_timestamp,response_timestamp,file_path.
- post(url, timeout=None, dev=None, use_cache=False, **kwargs)[source]¶
Perform a HTTP
POSTrequest and return the response.Note
The response object will have three additional attributes:
request_timestamp,response_timestamp,file_path.- Parameters:
url (
str) -- URL to use for the requesttimeout (
float|None) -- Timeout for the query. IfNone, the default timeout for this class instance is used instead.dev (
DeviceData|None) -- DeviceData object to save files touse_cache (
bool) -- If the internal page cache should be usedkwargs -- Additional keyword arguments that will be passed directly to
Requests.post()
- Return type:
Response|None- Returns:
The response object, or
Noneif the request failed. The response object will have three additional attributes:request_timestamp,response_timestamp,file_path.
- decode_ssl_certificate(source)[source]¶
Decode a raw SSL certificate retrieved from a server into a raw
dict.
- parse_decoded_ssl_certificate(decoded, raw)[source]¶
Parse a decoded SSL certificate into Elastic Common Schema (ECS) format usable with the x509 data model (
X509).
- static gen_soup(text)[source]¶
Generate a BeautifulSoup instance from the text using the efficient
lxmllibrary if it’s available orhtml.parserotherwise.- Return type:
BeautifulSoup- Returns:
- A
bs4.BeautifulSoupinstance with the parser set to the value of
peat.consts.BS4_PARSER
- A
8.6.6.5. SNMP¶
Simple Network Management Protocol (SNMP) functionality for PEAT.
Under the hood, PEAT uses the lovely open-source PySNMP package (NOTE: as of May 2024, we now use a fork).
The code in this file is essentially a wrapper around
pysnmp with error handling, logging, and other additions.
1.3.6.1. = iso.org.dod.internet = Any network device.
Building custom MIBs
Use mibdump.py or build-pysnmp-mib (included with PySNMP)
to build a MIB in a format usable by PySNMP for use with PEAT.
NOTE: once the MIBs are generated, make sure they’re included in the PyInstaller
spec file that’s used to build the PEAT executable, in distribution/peat.spec.
Example 1:
mibdump.py --generate-mib-texts SIEMENS-SMI. This should generate a PySNMP module for theSIEMENS-SMI.mibfile in~/.pysnmp/mibs.Example 2:
build-pysnmp-mib -o SipOptical.py SipOptical.mib
References/further reading:
Authors:
Christopher Goes
- class SNMP(ip, port=161, timeout=1.0, community='public', snmp_version=1, mib_paths=None)[source]¶
Generic wrapper for Simple Network Management Protocol (SNMP) functionality.
- get(identity, single_query=True, query_limit=0, walk_whole_mib=False)[source]¶
Gets the value(s) of a SNMP object.
The SNMP object is referenced by passing a
tupleto theidentity``argument. This is then passed directly to PySNMP. Additionally, a custom MIB definition can be used by providing an absolute file path to the ``mib_pathargument.The object identity can be one of the following:
OID
MIB name (this will enumerate the entire MIB)
MIB name + object name
MIB name + object name + instance number (for an enumeration/table)
Any of the above + a custom MIB definition file to use
Examples:
from pathlib import Path from peat.protocols.snmp import SNMP snmp = SNMP(ip="192.0.2.1") # OID snmp.get("1.3.6.1.2.1.1.1.0") # "Walk" all values out of a MIB snmp.get("SNMPv2-MIB", single_query=False) # MIB + Object snmp.get(("SNMPv2-MIB", "sysDescr")) # MIB + Object + Instance snmp.get(("SNMPv2-MIB", "sysDescr", 0)) # Custom MIB definition file with a Path object (absolute path) snmp_with_mib = SNMP(ip="192.0.2.1", mib_paths=[Path("/path/myMib.py")]) snmp_with_mib.get(("MyMib", "myObj")) # Custom MIB definition file with a raw string (absolute path) snmp_with_mib = SNMP(ip="192.0.2.1", mib_paths=["/path/myMib.py"]) snmp_with_mib.get(("MyMib", "myObj"))
- Parameters:
identity (
str|tuple) -- Tuple used to reference the SNMP object. Refer to function documentation above for details.single_query (
bool) -- IfTrue, the OID will be queried once withgetCmd. Otherwise, it will be iterated over usingnextCmduntil there is no more data (for OIDs with “sub values”, e.g. tables or enumerations).query_limit (
int) -- Ifsingle_queryisFalse, limit the iteration to this number of queries and stop. This is cheap workaround if you don’t have a MIB.walk_whole_mib (
bool) -- If all values in a MIB should be walked. This setslexicographicMode=Truein PySNMPnextCmd.
- Return type:
- Returns:
- Raises:
ValueError -- If a critical error occurred, such as a invalid type being passed
- verify(identity, to_find)[source]¶
Checks if a string is in the response data for an SNMP query.
- Parameters:
- Return type:
- Returns:
If the verification succeeded
- Raises:
ValueError -- If a critical error occurred, such as a invalid type being passed
- snmp_walk(ip, mib_name, mib_src, community='public', timeout=0.5, port=161, snmp_version=1)[source]¶
Walks an SNMP MIB and returns a dictionary of names and values
- Parameters:
ip (
str) -- IPv4 address of the SNMP devicemib_name (
str) -- Name of the mib to referencemib_src (
str|Path) -- Filesystem path to the MIB file, which must be a PySNMP module (optional)timeout (
float) -- Number of seconds to wait for a responsecommunity (
str) -- SNMP Community string to useport (
int) -- Port the device’s SNMP server is listening onsnmp_version (
int) -- Version of SNMP to use (1 = v1 | 2 = v2c)
- Return type:
- Returns:
A dictionary of names and values. An empty
dictis returned if there was an error.
8.6.6.6. Serial¶
- find_serial_ports(filter_list=None)[source]¶
Find host serial ports enumerated by the operating system. Filter by specified list, if any.
- open_serial_port(address, baudrate, timeout)[source]¶
Open the specified serial port.
Since the code where the connection should be closed may not be able to communicate with the code where it needs to be opened, just be robust about closing and re-opening the connection.
- close_serial_port(address)[source]¶
Close the specified serial port. Handle closing an un-opened port gracefully.
- port_nums_to_addresses(port_values)[source]¶
Converts a list of mixed port value strings to a list of unique serial port address strings.
>>> import platform >>> from peat.protocols.serial import port_nums_to_addresses >>> platform.system() 'Linux' >>> port_nums_to_addresses(["0-1", "0", "1-1"]) ['/dev/ttyS0', '/dev/ttyS1', '/dev/ttyUSB0', '/dev/ttyUSB1'] >>> port_nums_to_addresses(["/dev/ttyS0", "/dev/ttyUSB0"]) ['/dev/ttyS0', '/dev/ttyUSB0']
- platform_port_fmt(num)[source]¶
Formats an integer into a platform-specific serial port address string.
- parse_baudrates(baudrate_values)[source]¶
Converts a list of mixed baud rate value strings to a list of standardized baud rates with duplicates removed.
>>> from peat.protocols.serial import parse_baudrates >>> parse_baudrates(["9600"]) [9600] >>> parse_baudrates(["9600-115200"]) [9600, 19200, 38400, 57600, 115200] >>> parse_baudrates(["9600-115200", "57600"]) [9600, 19200, 38400, 57600, 115200]
- std_b_idx(baudrate)[source]¶
Convert an arbitrary integer to the appropriate
std_bindex.- Return type:
8.6.6.7. CIP¶
Scapy packets for the Common Industrial Protocol (CIP) protocol, used by Rockwell Allen Bradley PLCs and other devices.
CIP is encapsulated by the ENIP protocol. In other words, CIP is a subset of ENIP.
Authors
Casey Glatter
Patrica Schulz
Mark Woodard
- class ByteLenField(name, default, count_of=None, length_of=None)[source]¶
Field representing the length of the payload of the packet in one byte.
- class AbCIP(_pkt, /, *, cmServiceCode=82, cmReqPathSize=2, cmReqPath=537273345, timeout=1690, messageRequestSize=6, serviceCode=1, lengthReqPath=2, reqPath=536945665, routePathSize=1, reserved=0, routeSegment=1, routeAddr=0)[source]¶
- fields_desc: List[AnyField] = [<ByteEnumField (AbCIP).cmServiceCode>, <ByteField (AbCIP).cmReqPathSize>, <IntField (AbCIP).cmReqPath>, <ShortField (AbCIP).timeout>, <LEShortField (AbCIP).messageRequestSize>, <ByteEnumField (AbCIP).serviceCode>, <ByteField (AbCIP).lengthReqPath>, <IntField (AbCIP).reqPath>, <ByteField (AbCIP).routePathSize>, <ByteField (AbCIP).reserved>, <ByteField (AbCIP).routeSegment>, <ByteField (AbCIP).routeAddr>]¶
- _name = 'CIP'¶
- aliastypes = [<class 'peat.protocols.cip.cip_packets.AbCIP'>, <class 'scapy.packet.Packet'>]¶
- class CIP(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
Common Industrial Protocol (CIP) packet.
- fields_desc: List[AnyField] = [<LEIntField (CIP).handle>, <LEShortField (CIP).timeout>, <LEShortField (CIP).itemCount>, <LEShortField (CIP).dataItemType_1>, <LEShortField (CIP).dataItemLength_1>, <LEShortField (CIP).dataItemType_2>, <LEFieldLenField (CIP).dataItemLength_2>, <ByteEnumField (CIP).service>, <ByteLenField (CIP).requestPathSize>, <StrLenField (CIP).requestPath>, <StrField (CIP).data>]¶
- _name = 'CIP Packet'¶
- aliastypes = [<class 'peat.protocols.cip.cip_packets.CIP'>, <class 'scapy.packet.Packet'>]¶
- class CCM(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
CIP Connection Manager (CCM) packet.
- fields_desc: List[AnyField] = [<LEShortField (CCM).timeoutTicks>, <LEShortField (CCM).messageSize>, <ByteEnumField (CCM).service>, <ByteLenField (CCM).requestPathSize>, <StrLenField (CCM).requestPath>, <StrField (CCM).data>]¶
- _name = 'CCM'¶
- aliastypes = [<class 'peat.protocols.cip.cip_packets.CCM'>, <class 'scapy.packet.Packet'>]¶
8.6.6.8. ENIP¶
- class EnipDriver(enip_socket, cpu_slot=0, rpi=5000, timeout=44818, backplane=1, cid_ot=b'\\x00\\x00\\x00\\x00', cid_to=b"'\\x04\\x19q", csn=b'\\x8f\\x00', vid=b'M\\x00', vsn=b'qnL\\x0c')[source]¶
Ethernet/IP (ENIP) protocol handler.
- open()[source]¶
Connect and open a session with the device.
- Raises:
EnipCommError -- exception occurred while opening the session
- Return type:
- close()[source]¶
Close the session with the device and the underlying socket.
- Return type:
- Returns:
If the close was successful
- Raises:
EnipCommError -- Exception occurred while unregistering the session
- send(msg)[source]¶
Sends a message through the ENIP socket.
- Return type:
- Returns:
Number of bytes sent
- Raises:
EnipCommError -- if the send failed
- recv()[source]¶
Receives a message from the ENIP socket.
- Return type:
- Returns:
The message received as bytes
- Raises:
EnipCommError -- if the received failed
- _get_sequence()[source]¶
Increment and return the sequence number used with connected messages.
- Return type:
- Returns:
The current sequence number
- static _build_common_packet_format(message_type, message, addr_type, addr_data=None, timeout=10)[source]¶
Builds and returns a common message.
Check Volume 2 (page 2.22) of CIP specification for reference.
- Return type:
- Returns:
The built message
- send_nop()[source]¶
Send a NOP to the target, gets no reply.
A NOP provides a way for either an originator or target to determine if the TCP connection is still open.
- Raises:
EnipCommError -- if the message send failed
- Return type:
- list_identity()[source]¶
ListIdentity command to locate and identify potential target.
- Raises:
EnipCommError -- if the message send failed
- Return type:
- send_rr_data(message)[source]¶
SendRRData transfer an encapsulated request/reply packet between the originator and target.
- register_session()[source]¶
Register a new session with the communication partner.
- Return type:
- Returns:
The session number
- Raises:
EnipCommError -- if the registration message send failed
- unregister_session()[source]¶
Unregister a connection.
- Raises:
EnipCommError -- if the unregister message send failed
- Return type:
- forward_open()[source]¶
CIP implementation of the forward open message.
Refer to ODVA documentation Volume 1 3-5.5.2
- class EnipSocket(ip, port, timeout=5.0)[source]¶
Ethernet/IP (ENIP) socket.
Authors
Christopher Goes
- connect()[source]¶
Connect to ENIP socket.
- Return type:
- Returns:
If connection was successful
- Raises:
EnipCommError -- if the socket timed out during connection
- send(message)[source]¶
Send a ENIP message.
- Return type:
- Returns:
Number of bytes sent
- Raises:
EnipCommError -- Exception occured while sending the message
- receive()[source]¶
Receive and unpack an ENIP response.
- Return type:
- Returns:
The ENIP response as bytes
- Raises:
EnipCommError -- Exception occured while recieving the response
Scapy packets for the ENIP protocol used by the Allen Bradley PLC.
Authors
Casey Glatter
Christopher Goes
Patrica Schulz
Mark Woodard
- class LEShortLenField(name, default, count_of=None, length_of=None)[source]¶
A len field in a 2-byte integer.
- class EncapData(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
- fields_desc: List[AnyField] = [<LEIntField (EncapData).handle>, <LEShortField (EncapData).timeout>, <LEShortField (EncapData).itemCount>, <LEShortField (EncapData).addrItemType>, <LEShortField (EncapData).addrLengthData>, <LEShortField (EncapData).dataItemType>, <LEFieldLenField (EncapData).lengthData>, <StrLenField (EncapData).data>]¶
- _name = 'EncapData for ENIP'¶
- aliastypes = [<class 'peat.protocols.enip.enip_packets.EncapData'>, <class 'scapy.packet.Packet'>]¶
- class GetAttributesAllResponse(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
- fields_desc: List[AnyField] = [<LEShortEnumField (GetAttributesAllResponse).vendor>, <scapy.fields.MayEnd object>, <LEShortField (GetAttributesAllResponse).productCode>, <ByteField (GetAttributesAllResponse).MajorFirmwareVersion>, <ByteField (GetAttributesAllResponse).MinorFirmwareVersion>, <LEShortField (GetAttributesAllResponse).status>, <LEIntField (GetAttributesAllResponse).serialNumber>, <ByteField (GetAttributesAllResponse).productNameLength>, <scapy.fields.MayEnd object>, <XByteField (GetAttributesAllResponse).state>]¶
- _name = 'GetAttributesAll Response'¶
- _overload_fields = {<class 'peat.protocols.enip.enip_packets.ENIPListIdentityResponse'>: {}}¶
- aliastypes = [<class 'peat.protocols.enip.enip_packets.GetAttributesAllResponse'>, <class 'scapy.packet.Packet'>]¶
- class ENIPListIdentityResponse(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
- fields_desc: List[AnyField] = [<LEShortField (ENIPListIdentityResponse).itemCount>, <LEShortEnumField (ENIPListIdentityResponse).typeID>, <LEShortField (ENIPListIdentityResponse).length>, <LEShortField (ENIPListIdentityResponse).encapsulationVersion>, <LEShortField (ENIPListIdentityResponse).sinFamily>, <ShortField (ENIPListIdentityResponse).sinPort>, <IPField (ENIPListIdentityResponse).sinAddr>, <LELongField (ENIPListIdentityResponse).sinZero>, <LEShortEnumField (ENIPListIdentityResponse).vendor>, <LEShortEnumField (ENIPListIdentityResponse).productType>, <LEShortField (ENIPListIdentityResponse).productCode>, <ByteField (ENIPListIdentityResponse).MajorFirmwareVersion>, <ByteField (ENIPListIdentityResponse).MinorFirmwareVersion>, <LEShortField (ENIPListIdentityResponse).status>, <LEIntField (ENIPListIdentityResponse).serialNumber>, <ByteField (ENIPListIdentityResponse).productNameLength>, <StrLenField (ENIPListIdentityResponse).productName>, <XByteField (ENIPListIdentityResponse).state>]¶
- _name = 'ENIP ListIdentity Response'¶
- aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPListIdentityResponse'>, <class 'scapy.packet.Packet'>]¶
- payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'peat.protocols.enip.enip_packets.GetAttributesAllResponse'>)]¶
- class ENIPListInterfacesResponseItems(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
- fields_desc: List[AnyField] = [<LEShortEnumField (ENIPListInterfacesResponseItems).typeID>, <FieldLenField (ENIPListInterfacesResponseItems).itemLength>, <StrLenField (ENIPListInterfacesResponseItems).itemData>]¶
- _name = 'ENIPListInterfacesResponseItems'¶
- aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPListInterfacesResponseItems'>, <class 'scapy.packet.Packet'>]¶
- class ENIPListInterfacesResponse(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
- fields_desc: List[AnyField] = [<FieldLenField (ENIPListInterfacesResponse).itemCount>, <PacketField (ENIPListInterfacesResponse).listItems>]¶
- _name = 'ENIPListInterfacesResponse'¶
- aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPListInterfacesResponse'>, <class 'scapy.packet.Packet'>]¶
- class ENIPListServicesResponseItems(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
- fields_desc: List[AnyField] = [<LEShortEnumField (ENIPListServicesResponseItems).typeID>, <LEShortField (ENIPListServicesResponseItems).length>, <LEShortField (ENIPListServicesResponseItems).encapsulationVersion>, <LEShortField (ENIPListServicesResponseItems).sinFamily>, <StrFixedLenField (ENIPListServicesResponseItems).serviceName>]¶
- _name = 'ENIP ListServices Response Items'¶
- aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPListServicesResponseItems'>, <class 'scapy.packet.Packet'>]¶
- class ENIPListServicesResponse(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
- fields_desc: List[AnyField] = [<FieldLenField (ENIPListServicesResponse).itemCount>, <PacketField (ENIPListServicesResponse).listItems>]¶
- _name = 'ENIP ListServices Response'¶
- aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPListServicesResponse'>, <class 'scapy.packet.Packet'>]¶
- class ENIPRegisterSession(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
- fields_desc: List[AnyField] = [<LEShortField (ENIPRegisterSession).protocolVersion>, <LEShortField (ENIPRegisterSession).optionsRegisterSession>]¶
- _name = 'ENIP RegisterSession'¶
- aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIPRegisterSession'>, <class 'scapy.packet.Packet'>]¶
- class ENIP(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
EtherNet/IP packet encapsulation.
If “data” isn’t set, it will be automatically set based on commandCode.
The header is 24 bytes fixed length.
It can be used with TCP. Maybe UDP as well?
- fields_desc: List[AnyField] = [<LEShortEnumField (ENIP).commandCode>, <LEShortLenField (ENIP).dataLength>, <LEIntField (ENIP).sessionHandle>, <LEIntEnumField (ENIP).status>, <LongField (ENIP).senderContext>, <LEIntField (ENIP).options>, <scapy.fields.MultipleTypeField object>]¶
- _name = 'EtherNet/IP Request'¶
- aliastypes = [<class 'peat.protocols.enip.enip_packets.ENIP'>, <class 'scapy.packet.Packet'>]¶
8.6.6.9. PCCC¶
- class PCCCoverCIP(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]¶
- fields_desc: List[AnyField] = [<ByteField (PCCCoverCIP).serviceCode>, <ByteField (PCCCoverCIP).lengthReqPath>, <IntField (PCCCoverCIP).reqPath>, <ByteField (PCCCoverCIP).lengthReqID>, <ShortField (PCCCoverCIP).vendorID>, <IntField (PCCCoverCIP).serialNum>, <ByteField (PCCCoverCIP).pcccCmd>, <ByteField (PCCCoverCIP).sts>, <ShortField (PCCCoverCIP).tnsw>, <ByteField (PCCCoverCIP).pcccFnc>, <LEIntField (PCCCoverCIP).plcFwPhysAddr>, <ByteField (PCCCoverCIP).readSize>]¶
- aliastypes = [<class 'peat.protocols.pccc.pccc_functions.PCCCoverCIP'>, <class 'scapy.packet.Packet'>]¶
8.6.7. Utilities/common functionality¶
- convert_to_snake_case(camel_string)[source]¶
Converts a string from CamelCase to snake_case.
- Return type:
- clean_empty_dirs(to_clean)[source]¶
Recursively remove all empty directories on the path.
- Return type:
- fmt_duration(seconds)[source]¶
Format a time duration into a human-readable string.
Wrapper for
humanfriendly.format_timespan().- Return type:
- fmt_size(size_in_bytes)[source]¶
Convert a integer size into a human-readable string.
Wrapper for
humanfriendly.format_size().- Return type:
- short_pth_str(path, max_parts=4)[source]¶
Generate a string of a subset of a path.
- Parameters:
- Return type:
- Returns:
Shortened file path string
- move_item(obj, new_position, item)[source]¶
Move an item in a list to a new position in the list.
- Return type:
- move_file(src, dest)[source]¶
Move a file, handling duplicates and/or destination directory creation.
If src is a file and dest is a directory, then dest will be changed to be the name of the src file with the original dest as the parent, following the behavior of the
cpcommand.- Return type:
- Returns:
The updated path of the file
- merge(d1, d2, no_copy=False)[source]¶
Merges two dictionaries.
Values in d1 take priority over d2. If one of the dictionaries is
None, then the other is returned.- Return type:
- parse_date(raw_time, year_first=None)[source]¶
Parse a raw string into a datetime object or ISO 8601 date string.
This leverages dateutil’s fuzzy parser, which can handle a wide array of input time formats.
Warning
This function does not do any special timezone handling. If a timezone is presented, then it’s included in the datetime object’s
tzinfo, otherwise it’s timezone-unaware. Ensure you properly convert to UTC!Note
To get a ISO 8601-formated string, call
.isoformat()on the returned datetime object. This will result in a string in the following format:<YYYY-MM-DD>T<HH:MM:SS.mmmmmm>- Parameters:
raw_time (
str) -- time string to parse, in almost any format (within reason)iso_format -- If the a ISO 8601 format string should be returned instead of a datetime object
year_first (
bool|None) -- Whether to interpret the first value in an ambiguous 3-integer date (e.g. 01/05/09) as the year. Passed straight to dateutil.parse().
- Return type:
- Returns:
datetime.datetimeobject, orNoneif the parse fails.
- get_formatted_platform_str()[source]¶
Record information about the current system and environment.
- Return type:
- get_resource(package, file)[source]¶
Unpack and return the path to a resource included with PEAT.
Some examples of these are schemas and definitions for TC6 XML.
The resources are unpacked into a temporary directory. These temporary files are cleaned up when PEAT exits.
Details: https://importlib-resources.readthedocs.io/en/latest/migration.html
- collect_files(path, sub_dirs=True)[source]¶
Convert a path into a list of absolute file path strings.
If path is a file, then the returned list will only contain that file.
- check_file(file, ext=None)[source]¶
Checks if a path exists and is valid, and returns the
Pathto it.
- fix_file_owners(to_fix)[source]¶
Change owner of file(s) and directorie(s) to actual user running PEAT instead of root.
- Return type:
- save_results_summary(data, results_type, log_debug=False)[source]¶
Helper function to write JSON results to a file if the path is configured.
- Return type:
- write_temp_file(data, filename)[source]¶
Write arbitrary data to a file in the PEAT temporary directory.
Note
The temporary directory is configured via the
TEMP_DIRconfiguration option (config.TEMP_DIR). If this isNone, then this function is a No-OP (it does nothing).
- write_file(data, file, overwrite_existing=False, format_json=True, merge_existing=False)[source]¶
Write data to a file.
Warning
File names MUST be valid for ALL platforms, including Windows! If validation fails, then the name will be sanitized and changed, and therefore will differ from the original name passed to this function!
Note
If the file extension is
.json, then the data will saved as canonical JSON. Data will be processed into types that are JSON-compatible prior to conversion. Standard Python types and objects are fine.bytesare decoded as UTF-8.Note
If the containing directory/directories don’t exist, they will be created before the file is written (same behavior as
mkdir -p).- Parameters:
data (
str|bytes|bytearray|Container|Iterable) -- Content to write to the filefile (
Path) -- Path of file to write. This path should be an absolute path. Any directories on the path that don’t exist will be created automatically. If not, it will default to the configured OUT_DIR for PEAT.overwrite_existing (
bool) -- If existing files should be overwritten (instead of writing to a new file with a numeric extension)format_json (
bool) -- If JSON data should be formatted with 4 space indentationmerge_existing (
bool) -- If the file already exists and is JSON, then read the data from the existing file, merge the new data with it, then overwrite the file with the merged data.
- Return type:
- Returns:
If the write was successful
- gen_hashes(source, hash_algorithms=None)[source]¶
Generate hashes of text,
bytes, or the contents of a file.- Parameters:
- Return type:
- Returns:
- The generated hashes as a
dictkeyed by hash name:
{'md5': '...', 'sha1': '...', ...}
- The generated hashes as a
- utc_now()[source]¶
This simple helper function ensures a proper timezone-aware UTC-timezone
datetime.datetimeobject is returned.Further reading: https://blog.miguelgrinberg.com/post/it-s-time-for-a-change-datetime-utcnow-is-now-deprecated
- Return type:
- time_now()[source]¶
Get the current time.
Retrieve the current time in the format specified in consts.
- Returns:
The current time
- Return type:
- are_we_superuser()[source]¶
Checks if PEAT is running with root privileges (effective user ID/euid of 0) or Administrator on Windows.
- Return type:
- deep_get(dictionary, keys, default=None)[source]¶
Safely read the value of a deeply nested key from nested
dictobjects.>>> from peat.utils import deep_get >>> x = {'1': {'2': {'3': 'hi'}}} >>> deep_get(x, "1.2.3") 'hi'
- Parameters:
dictionary (
dict) -- Dictionary to searchkeys (
str) -- dot-separated string of keys to query, e.g."1.2.3"to query a dict like{'1': {'2': {'3': 'hi'}}}default -- Default value to return if the query fails, similar to
dict.get()
- Return type:
- Returns:
The value of the object, or the value of
defaultif the query failed (this isNoneif unset).
8.6.8. Data Utilities¶
- class DeepChainMap(*maps)[source]¶
Variant of
collections.ChainMapthat supports edits of nesteddictobjects.In PEAT, this is used for providing nested sets of device and protocol options (configurations) with the ability the modify the underlying sources (e.g. a set of global runtime defaults) and still preserve the order of precedence and transparent overriding (e.g. options configured at runtime for a specific device will still override the global defaults, even though the global defaults were also modified at runtime).
Nested objects can override keys at various levels without overriding the parent structure. This is best explained via examples.
>>> from peat.data.data_utils import DeepChainMap >>> layer1 = {} >>> layer2 = {"key": 9999} >>> layer3 = {"deep_object": {"deep_key": "The Deep"}} >>> deep_map = DeepChainMap(layer1, layer2, layer3) >>> deep_map["key"] 9999 >>> layer1["key"] = -1111 >>> deep_map["key"] -1111 >>> layer2["key"] 9999 >>> deep_map["deep_object"]["deep_key"] 'The Deep' >>> layer1["deep_object"] = {"another_key": "another_value"} >>> deep_map["deep_object"]["deep_key"] 'The Deep' >>> deep_map["deep_object"]["another_key"] 'another_value'
- lookup_by_str(container, value, lookup)[source]¶
String of attribute to search for, e.g.
"ip"to lookup interfaces usingInterface.ipattribute on the value.
- find_position(obj, key, value)[source]¶
Find if and where an object with a given value is in a
list.
- strip_empty_and_private(obj, strip_empty=True, strip_private=True)[source]¶
Recursively removes empty values and keys starting with
_.- Return type:
- only_include_keys(obj, allowed_keys)[source]¶
Filters any keys that don’t match the allowed list of keys.
- Return type:
- dedupe_model_list(current)[source]¶
Deduplicates a
listofBaseModelobjects while preserving the original order.Models that are a subset of another (contains some keys and values) will be merged together and their values combined.
Warning
This function is expensive to call, ~O(n^2 log n) algorithm (in)efficiency. Do not call more than absolutely needed!
- none_aware_attrgetter(attrs)[source]¶
Variant of
operator.attrgetter()that handles values that may beNone.- Return type:
8.6.9. Logging Utils¶
- generate_log_dict(record)[source]¶
Generate a dict with detailed metadata about the log event, for use in Elasticsearch and in JSON-formatted logs.
This information gets used in record[“extra”][“json_log_string”]
- use_colors()[source]¶
If colorized terminal output should be used.
Disable colored terminal output if
NO_COLORenvironment variable is set, per the Command Line Interface Guidelines (CLIG). :rtype:boolconfig.NO_COLORis True if user has disabled via:CLI arg
--no-colorYAML config
no_color: trueEnvironment variable
PEAT_NO_COLOR
- setup_logging(file=None, json_file=None, debug_info_file=None)[source]¶
Configures the logging interface used by everything for output. The third-party package
loguruis used for logging.Verbosity can be configured via
config.VERBOSE. Terminal output can be disabled viaconfig.QUIET.- Parameters:
file (
Path|None) -- File path to write human-readable logs. If aPathobject is provided, it is used as an absolute path. IfNone, standard log file output will be disabled.json_file (
Path|None) -- Store JSON formatted logs to a file, in JSON Lines (jsonl) format. These share the same format as Elasticsearch logging, and can be used to reconstruct thevedar-logsElasticsearch index. IfNone, JSON logging will be disabled.debug_info_file (
Path|None) -- Text file with system info, configuration values, dumps of scapy internal state, and other info that may be helpful for debugging or troubleshooting customer issues.
- Return type:
8.6.10. Settings¶
8.6.10.1. SettingsManager¶
- _env_var_constructor(loader, node)[source]¶
Implements a custom YAML tag for loading optional environment variables.
If the environment variable is set, returns the value of it. Otherwise, returns
None.Example usage in the YAML configuration:
key: !ENV 'MY_APP_KEY'
- _join_var_constructor(loader, node)[source]¶
Implements a custom YAML tag for concatenating other tags in the document to strings.
This allows for a much more DRY (Don’t Repeat Yourself) configuration file.
- class SettingsManager(label, env_prefix, init_env=True)[source]¶
Stores and manages configuration values from multiple sources.
Typical usage is to subclass this class and configure the possible variables and default values as class attributes, much like
dataclasses.Warning
All class attributes MUST have a type! Otherwise, they will be skipped over and not appear in the list of defaults. This is due to the implementation being a hack on top of
__annotations__.Order of precedence for configurations
Runtime changes (example:
config.DEBUG = 2), including CLI argumentsEnvironment variables (example:
export PEAT_DEBUG=2)Configuration file (YAML or JSON)
Default values set in subclasses of this class (example:
DEBUG: int = 0)
Precedence is managed by a
collections.ChainMap.- Parameters:
- _load_values(conf, load_to, key_prefix='')[source]¶
Read and set configuration values from a input dictionary.
Note
Any values that are
Noneare skipped and will NOT be loaded- Parameters:
load_to (
str) -- Where in the Settings object the loaded configuration should be stored. Valid options are:runtime_configs,env_configs, andfile_configs.key_prefix (
str) -- Optional value to prepend to the keys being looked up. Example use case is for loading environment variables prefixed withPEAT_, whereconfig=dict(os.environ).
- Return type:
- load_from_dict(conf)[source]¶
Update runtime configuration values from a dictionary.
- Parameters:
- Raises:
AttributeError -- If a configuration option in
confis not already defined on the class- Return type:
- load_from_environment(env_prefix=None)[source]¶
Update configuration values from environment variables.
- Parameters:
env_prefix (
str|None) -- String prefixing the environment variable names, e.g.PEAT_to load variables such asPEAT_DEBUGintoDEBUG. IfNone, then this is set toself.env_prefix.- Raises:
AttributeError -- If a configuration option in the environment is not already defined on the class
- Return type:
- load_from_file(file)[source]¶
Load stored values from a YAML or JSON file.
Note that these settings can be overridden by environment variables or values set at runtime.
- Parameters:
file (
Path) -- Path to a YAML or JSON file to load settings from- Return type:
- Returns:
If the load was successful
- Raises:
AttributeError -- If a configuration option loaded from the file is not already defined on the class
- save_to_file(outdir, save_yaml=True, save_json=True)[source]¶
Save the currently stored values to YAML and JSON files.
- typecast(key, value)[source]¶
Convert and store a value as the appropriate Python data type.
Store the variable as the appropriate Python data type, such as bool, int, float, str, Path, list, etc. This converts “0.5” to 0.5, “/home/” to Path(“/home/”), etc.
If the type is properly annotated (e.g. VAR: str = ‘stuff’), then we use the annotation, otherwise try to infer the type from the default value. However, the backup method does not work if the default value is
None.- Parameters:
- Return type:
- Returns:
The typecasted value as a valid Python datatype matching the annotation
- Raises:
KeyError -- If the attribute named by
keydoesn’t exist
- is_default_value(key)[source]¶
If an item’s current value matches the default value.
- Parameters:
key (
str) -- Name of the item to check- Return type:
- Returns:
If the item has a value that matches the default value
- Raises:
AttributeError -- If the attribute named by
keydoesn’t exist
8.6.11. Constants¶
- WINDOWS: Final[bool] = False¶
If the local system is a Windows system (this is FALSE if in WSL)
- Type:
- WSL: Final[bool] = False¶
If the local system is a Windows Subsystem for Linux (WSL) environment
- Type:
- LOG_TIME_FMT: Final[str] = '%Y-%m-%d %H:%M:%S'¶
TIME_FMT without an underscore, and a colon instead of a dash for H/M/S
- Type:
- START_TIME_UTC: Final[datetime] = datetime.datetime(2026, 2, 7, 5, 32, 29, 141572, tzinfo=datetime.timezone.utc)¶
UTC time of when PEAT was imported that can be used for time stamping
- Type:
datetime
- START_TIME_LOCAL: Final[datetime] = datetime.datetime(2026, 2, 7, 5, 32, 29, 141572, tzinfo=datetime.timezone(datetime.timedelta(0), 'UTC'))¶
Local time of when PEAT was imported that can be used for time stamping
- Type:
datetime
- START_TIME: Final[str] = '2026-02-07_05-32-29'¶
Formatted local time (TIME_FMT) of when PEAT was imported (“2022-09-13_15-04-33”)
- Type:
- RUN_ID: Final[int] = 177044234927¶
Unique “ID” to track all artifacts associated with a single run of PEAT RUN_ID := 12-digit integer (current UTC time + random 2-digit integer) NOTE: previously, the Run ID was 10 digits (changed 02/20/2020)
- Type:
- IANA_IP_PROTOS: Final[dict[int, str]] = {0: 'hopopts', 1: 'icmp', 2: 'igmp', 3: 'ggp', 4: 'ipv4', 5: 'st', 6: 'tcp', 7: 'cbt', 8: 'egp', 9: 'igp', 12: 'pup', 17: 'udp', 22: 'idp', 27: 'rdp', 41: 'ipv6', 43: 'routing', 44: 'fragment', 50: 'esp', 51: 'ah', 58: 'icmpv6', 59: 'none', 60: 'dstopts', 77: 'nd', 78: 'iclfxbm', 103: 'pim', 113: 'pgm', 115: 'l2tp', 132: 'sctp', 255: 'raw', 256: 'max'}¶
Resolves a IANA IP protocol ID to the name of the protocol
- exception DeviceError[source]¶
Errors related to a device or
DeviceModulemodule.
- convert(value)[source]¶
Recursively convert values into JSON-friendly standard Python types.
Note
setobjects are sorted after being converted to alist. The order of a Pythonsetis not defined and therefore may vary between runs. Sorting helps alleviate that somewhat.Note
This is located in consts.py so it can be used in locations that are not safe to import
utils.pyfrom, such assettings_manager.py.
- sanitize_filename(filename)[source]¶
Validate and sanitize filenames to work on all platforms, notably Windows.
This replaces any invalid characters with
_.- Return type:
- gen_random_dev_id()[source]¶
Generate a randomized device ID.
Format:
unknown-dev-<run-id>-<random-integer>- Return type:
- lower_dict(to_lower, children=True)[source]¶
Convert keys of a dict and it’s immediate children dicts to lowercase. Dict keys must be strings.
- str_to_bool(val)[source]¶
Convert a string representation of truth to
TrueorFalse.True values are: ‘y’, ‘yes’, ‘t’, ‘true’, ‘on’, ‘1’, ‘enable’, ‘enabled’, ‘up’
False values are: ‘n’, ‘no’, ‘f’, ‘false’, ‘off’, ‘0’, ‘disable’, ‘disabled’, ‘down’
- Parameters:
val (
str) -- String to evaluate- Return type:
- Returns:
The boolean representation of the string
- Raises:
ValueError -- val is anything other than a boolean value
- SYSINFO: Final[dict[str, str | int | bool]] = {'arch': '64bit', 'cli_arguments': '-j 4 -b html docs _docs_html', 'cli_exe': '/home/runner/work/PEAT/PEAT/.venv/bin/sphinx-build', 'containerized': False, 'cpu': 'x86_64', 'hostname': 'runnervmwffz4', 'os_family': 'ubuntu', 'os_full': 'Linux 6.11.0-1018-azure (Ubuntu 24.04.3 LTS)', 'os_name': 'Linux', 'os_rel': '6.11.0-1018-azure', 'os_ver': '24.04.3', 'pid': 3026, 'podman': False, 'ppid': 3019, 'python_exe': '/home/runner/work/PEAT/PEAT/.venv/bin/python', 'python_impl': 'CPython', 'python_version': '3.12.12', 'run_id': 177044234927, 'start_time': '2026-02-07 05:32:29', 'tcpdump': '/usr/bin/tcpdump', 'timezone': 'Etc/UTC', 'username': 'runner'}¶
Information about the system PEAT is running on from
get_platform_info()- Type:
8.6.12. CLI internals¶
- run_peat(args, start_time)[source]¶
CLI main (note: the entrypoint that calls this is in
__main__.py).- Return type:
- oneshot_main(args)[source]¶
Main logic when running a regular PEAT command, e.g.
peat scan.This is distinct from the other current and future capabilities, such as monitoring or the PEAT HTTP server. “oneshot” refers to the “run and done” (in “one shot”) and non-persistent nature of the traditional PEAT CLI commands.
- Return type:
- export_device_data(args)[source]¶
Export data from all devices in the datastore to files and/or Elasticsearch.
- Return type:
- parse_scan_summary(summary)[source]¶
Extract targets, communication method, and PEAT modules from a scan summary.
- read_host_file(host_file)[source]¶
Parse a scan summary from a file or STDIN into a scan summary dict.
- write_readme()[source]¶
Generate a README describing the output from PEAT.
This file gets written in
./peat_results/(or whatever is configured forOUT_DIR).- Return type:
- Returns:
If the file was written successfully (or if the file already exists)
8.6.12.1. cli_args¶
Commandline argument parsing and help. Used by cli_main.py.
- build_argument_parser(version='0.0.0')[source]¶
Builds the argparse parser for parsing CLI commands and arguments.
The ordering of how the arguments are added to the parsers matters. It looks wacky in the code, but it leads to cleaner output for the user.
- Return type:
8.6.13. Command Parsers¶
Parse and/or process the output of various Linux commands and files,
such as arp -a, /proc/meminfo, /var/log/messages, etc.
References for /proc (aka, “procfs”):
https://www.kernel.org/doc/html/latest/filesystems/proc.html
https://tldp.org/LDP/Linux-Filesystem-Hierarchy/html/proc.html
- class NixParserBase[source]¶
Base class for nix file and command parsers (Linux, VxWorks, etc.).
-
file:
PurePosixPath|None= None¶
-
paths:
list[PurePosixPath] = []¶
- classmethod parse_and_process(to_parse, dev)[source]¶
Parse the data, then process it into the device data model.
- Return type:
-
file:
- class VarLogMessagesParser[source]¶
Parse messages from
/var/log/messages.-
file:
PurePosixPath|None= PurePosixPath('/var/log/messages')¶
-
MESSAGE_REGEX:
str= '(?P<timestamp>\\w+[ \\t]+\\d+[ \\t]+\\d{2}:\\d{2}:\\d{2})[ \\t]+(?P<hostname>\\S+)[ \\t]+(?P<logger>\\S+)\\.(?P<level>\\S+)[ \\t]+(?P<process>[^:]+): (?P<message>.*)'¶
-
file:
- class ProcCmdlineParser[source]¶
Parse output of
/proc/cmdline(the kernel’s startup command line arguments). Parses returns dict with the arguments as key-value pairs.-
file:
PurePosixPath|None= PurePosixPath('/proc/cmdline')¶
-
file:
- class ProcCpuinfoParser[source]¶
Parse output of
/proc/cpuinfoand return dict with the formatted data.-
file:
PurePosixPath|None= PurePosixPath('/proc/cpuinfo')¶
-
file:
- class ProcMeminfoParser[source]¶
Parse output of
/proc/meminfoand return dict with the formatted data, with integer values in bytes.-
file:
PurePosixPath|None= PurePosixPath('/proc/meminfo')¶
-
file:
- class ProcModulesParser[source]¶
Parse output of
/proc/modulesand return a list of the module names.-
file:
PurePosixPath|None= PurePosixPath('/proc/modules')¶
-
file:
- class ProcUptimeParser[source]¶
Parse
/proc/uptimeand return the timedelta for how long the system has been up for.Process sets
dev.uptime.-
file:
PurePosixPath|None= PurePosixPath('/proc/uptime')¶
-
file:
- class ProcNetDevParser[source]¶
Parse and process
/proc/net/dev-
file:
PurePosixPath|None= PurePosixPath('/proc/net/dev')¶
-
file:
- class EtcPasswdParser[source]¶
Parse
/etc/passwdand return the extracted user data.Process adds the users to
dev.usersin the device data model.-
file:
PurePosixPath|None= PurePosixPath('/etc/passwd')¶
-
file:
- class DateParser[source]¶
Parse output of the
datecommand.This gets timezone information, as well as baseline for what year it is for the purposes of timestamping logs from sources such as
/var/log/messages.
- class EnvParser[source]¶
Parse the output of the
envcommand.Environment variables for current shell session
- class IfconfigParser[source]¶
Parse output of
ifconfig -acommand.Shows all network interfaces. This is usually seen on older Linux distributions that install the
net-utilspackage by default, as well as BusyBox systems.
- class ArpParser[source]¶
Parse and process output of
arp -acommand.ARP table, shows all known network devices.
- class SshdConfigParser[source]¶
Parse and process
/etc/ssh/sshd_config.Cleanup the sshd_config to just lines with configs, excluding empty lines and comments.
-
file:
PurePosixPath|None= PurePosixPath('/etc/ssh/sshd_config')¶
-
file:
- class HostnameParser[source]¶
Set dev.hostname to the output of the
hostnamecommand
- class LsRecursiveParser[source]¶
Recursive ls of full file system. This assumes BusyBox’s
lsoutput. Other system’s output may differ.Command:
ls -lenAR /etc /boot /var/log /root /sysopt /sbin /pkg /bin /common /opt /libArgs:
l: one column output
e: full date and time
n: numeric UIDs and GIDs instead of names
A: include files that start with
.and exclude the.and..“files”.R: recurse
- class NetstatSocketsVxWorksParser[source]¶
Parse output of “netstat -anP” command on VxWorks.
-a: more sockets -n: numeric names instead of hostnames resolved -P: show the TID (task ID) that owns the socket