8.2. Module developer guide¶
See also
8.2.1. Introduction¶
Each OT device that PEAT supports is implemented as a PEAT device “module”, which is a Python class that encapsulates the functionality for the device. The PEAT device module API is class-based, and provides an interface for interaction with specific devices. Subclasses of DeviceModule are known as “PEAT modules” and provide a set of methods to interact with a device. A module consists of a directory in /peat/modules/ containing a subclass of DeviceModule with the “public” functions for that device model, various code files implementing the functionality the class provides, and any resources the module needs (e.g. SNMP MIBs, XML specs, special binaries, etc.).
The core logic of the module is implemented by subclassing DeviceModule and overriding (implementing) the appropriate methods, pull_project() or _parse(). The methods (functions) of DeviceModule take an instance of DeviceData as an argument. The DeviceData instance contains information about a specific device being interacted with and manages the data and state for that device. In other words, DeviceModule is the implementation of methods to interact with a type of device (e.g. a Rockwell Allen-Bradley ControlLogix PLC), and DeviceData stores and manages data from a specific physical device (e.g. the ControlLogix on a factory floor you’re doing a forensic pull from).
DeviceData manages device data and state and is the implementation of the PEAT “data model”. It contains a set of defined structures for storing device information and state, such as config, logic, firmware, status, and other information or artifacts. Module configuration is stored in options, which is simply a dictionary of key-value pairs with configurations for the module, such as credentials or ports to use for services (e.g. FTP). If no options are given, then the defaults for that device are used.
When you do a scan with the API, it returns a dict DeviceData objects. Each DeviceData object contains information about the device, while the corresponding DeviceModule class provides methods to perform actions on that DeviceData object, such as pull_project(), parse(), etc.
The design and architecture is explained in more detail in Design Documents.
8.2.2. The module API¶
The device module model is powered by the PEAT Module API (ModuleManager). The system serves two purposes:
Runtime lookup of imported modules by a alias and/or filter
Runtime import of 3rd-party modules (via directory, file, or passed as a Python object)
Using the API, it is possible to lookup a module at runtime using a wide variety of identifiers, such as vendor, device class (e.g. “PLC” or “RTU”), brand, or aliases added by the device module. The lookup also enables filtering of devices based on attribute, such as if it supports broadcasts or has a special attribute that may not be a part of the base DeviceModule class.
The API also supports the runtime import of additional external “3rd-party” device modules that implement the DeviceModule class. They can be imported as a folder containing the module or as a Python class object. This enables scenarios such as customer-created modules, as well as making PEAT more extensible in general.
Note
The bundled “executable” version of PEAT needs to know what Python libraries are being used by your module. If you are using a library module (e.g. import csv) that isn’t already used elsewhere in PEAT you will need to tell PyInstaller about it. Edit distribution/peat.spec and add the module name(s) to the hidden_imports list. This is the cause of the error "Failed to import mypackage.mymodule: No module named 'csv'.
8.2.3. Example of writing an PEAT module¶
A module is a Python file containing a Python class that implements the desired PEAT interfaces, such as the data model and parsing. The steps are:
Create the module boilerplate code (including creating a subclass of
DeviceModule)Implement a parser for the vendor output format, overriding
_parse()Store the results in an instance of
DeviceData, following the format as documented in the Data model
The best example of parsing is the SCEPTRE PEAT module (peat.modules.sandia.sceptre_fcd.SCEPTRE), in peat/modules/sandia/sceptre_fcd.py. Look at the _parse() and parse_config() implementations for examples of how parsing is structured and the results are formatted and stored.
8.2.3.1. Walkthrough¶
Note
The source code for classes and functions in this documentation is available by clicking the source to the right of the documentation for the class or function. This can save some time if you don’t have the source code handy.
Note
Refer to the API documentation for details about global variables (e.g. config.OUT_DIR and state.elastic), constants (e.g. consts.START_TIME_UTC), and exception classes.
Note
Refer to Configuration and state deep dive for a detailed explanation and discussion on PEAT’s global configuration and state, including how to add new variables
This guide will walk you though the creation of a PEAT device module. You should be relatively proficient in Python and understand classes and inheritance. You will create the module for the fictional tool, “Awesome Tool”. Awesome Tool is a command-line program that pulls information from Programmable Logic Controllers (PLCs) over a network. If it existed, you would use it by running awesome-tool, and get the results of the tool from awesome_output.json. The finished example AwesomeTool module (awesome_module.py), an example input file (awesome_output.json), and example PEAT output from running the module (example_peat_results/) are in examples/example_peat_module.
Input data the module will process:
{
"hostIp": "192.168.0.20",
"hostName": "SomeDevice",
"osName": "Ubuntu 19.10",
"networkInterfaces": [
{
"ipAddress": "192.168.0.20",
"subnetMask": "255.255.255.0",
"ipGateway": "192.168.0.1",
"telnetEnabled": "yes",
"telnetPort": 21
}
]
}
To begin, create a new file awesome_module.py and open it in your preferred text editor or development environment. Then, create the boilerplate:
"""
Example PEAT module for a fictional device.
To test this example, run an HTTP server locally:
python3 -m http.server 8090 --directory examples/example_peat_module/
peat scan -d AwesomeTool -I examples/example_peat_module/awesome_module.py -i localhost
Authors
- Christopher Goes
"""
import json
from datetime import UTC
from pathlib import Path
from peat import (
DeviceData,
DeviceModule,
Interface,
IPMethod,
Service,
datastore,
utils,
)
from peat.protocols import HTTP
class AwesomeTool(DeviceModule):
"""
Example of a implementation of a PEAT device module.
This class implements the PEAT API by overriding the necessary attributes
and methods from the DeviceModule base class. The attributes tell PEAT what the
module is, how it's configured, and what it's inputs and outputs are.
The methods are the core functionality of the module, such as discovering
devices on a network, or in this case parsing output from a fictional tool.
"""
# What type of device the module is for (e.g. a PLC)
device_type = "PLC" # This populates "host.type"
# Company/organization that manufactures the device(s)
# id: short form, e.g. "SEL"
# name: long form, e.g. "Schweitzer Engineering Laboratories"
vendor_id = "ACME" # This populates "host.vendor.id"
vendor_name = "ACME, Inc." # This populates "host.vendor.name"
# The name and/or file extensions this module is able to parse.
# Standard file globs are accepted, e.g. "*.txt" or "*awesome*.json",
# as well as literal strings ("awesome_output.json").
filename_patterns = ["awesome_output.json"]
# "aliases" makes the module usable with different device arguments, e.g.
# "-d middleware" to refer to this module and any others with an alias
# of "middleware". These aliases are **OPTIONAL**, and don't have to be
# defined.
#
# Example: peat parse -d middleware -I awesome_module.py -- awesome_output.json
module_aliases = ["awesome", "middleware"]
# Configuration options for the module. These can be set in the PEAT
# config YAML file, either globally (all devices) or on a per-host
# level (e.g. a particular device with a known IP).
default_options = {
"awesometool": {
"option1": True,
"another_option": "beep-boop",
"pull_methods": [
"http",
],
}
}
@classmethod
def _pull(cls, dev: DeviceData) -> bool:
# Track if the pull was successful overall.
# This example only uses one protocol, but in other modules with
# multiple protocols, some of them may succeed while others may fail.
# If any of the methods fail, then the pull should return false.
http_successful = False
# Check if HTTP is enabled in the "awesometool.pull_methods" option.
# This example module only has one method (http), but other modules
# have multiple, hence why this option is a list and not a string.
if "http" not in dev.options["awesometool"]["pull_methods"]:
cls.log.warning(
f"Skipping method 'http' for pull from {dev.ip}: "
f"'http' not listed in 'awesometool.pull_methods' option"
)
# Check if the HTTP port is closed. If it's not, then run the
# normal verification check, and if it's successful, use HTTP
elif dev.service_status({"protocol": "http"}) == "closed":
cls.log.warning(f"Failed to pull HTTP on {dev.ip}: HTTP port is closed")
# If device hasn't been verified yet (scan wasn't called), then run
# the verify function for the protocol. If the verify is successful,
# then this will fall through to the final "else" statement and result
# in HTTP being pulled.
# NOTE: dev._is_verified is set by PEAT during scanning
elif not dev._is_verified and not cls._verify_http_unicast(dev):
cls.log.warning(f"Failed to pull HTTP on {dev.ip}: HTTP verification method failed")
elif cls.pull_http(dev):
http_successful = True
return http_successful
@classmethod
def _parse(cls, file: Path, dev: DeviceData | None = None) -> DeviceData | None:
"""
Implementing DeviceModule._parse() tells PEAT how to parse
files. The input to "peat parse" (e.g. a file or piped input)
is passed to _parse() as a pathlib.Path object representing
the data as a standard file (refer to the Python documentation
for details about pathlib.Path). This file can be binary or text,
depending on what the module is expecting.
In this example, the data is assumed to be JSON text, so it's read using
pathlib.Path.read_text() and parsed using Python's JSON library.
JSON is used here to focus on the usage of the API and not the parsing.
In the real world, device data is rarely this clean and generally
requires some amount of analysis work to figure out how to extract
useful information. This sort of work makes up the vast majority
of the time spent developing a PEAT module.
Usage:
peat parse -d AwesomeTool -I awesome_module.py -- awesome_output.json
"""
# Read the JSON config data from the file
# NOTE: Explicitly specifying "utf-8" encoding avoids Windows issues
raw_data = file.read_text(encoding="utf-8")
# Convert the raw JSON text to a Python dictionary ("dict")
data = json.loads(raw_data)
# Create a DeviceData object using the IP address read from the file
# This object stores data associated with a particular device.
# The "datastore" is a global registry of these objects that enables
# information sharing between disparate parts of PEAT and ensures
# duplicates are not created.
#
# The datastore will return a existing object if there is one for
# this IP, otherwise it will create and return a new object.
#
# A DeviceData instance *may* be passed to _parse(). This enables usage
# of parse as part of a pull, e.g. pull artifacts from a device, then call
# parse() on the artifacts to parse them, simplifying the code.
if not dev:
dev = datastore.get(data["hostIp"], "ip") # type: DeviceData
# Populate basic attributes. Refer to the Data Model documentation
# and the DeviceData class for details on the available fields.
dev.name = data["hostName"] # host.name
dev.os.full = data["osName"] # host.os.full
dev.os.name = data["osName"].split(" ")[0] # host.os.name
dev.os.version = data["osName"].split(" ")[1] # host.os.version
# Populate information about network interfaces
for interface in data["networkInterfaces"]:
iface_object = Interface(
type="ethernet",
ip=interface["ipAddress"],
subnet_mask=interface["subnetMask"],
gateway=interface["ipGateway"],
)
# "store()" is a method to insert complex information into the
# data model, such as files or network interfaces. Refer to the
# data model documentation for further examples.
dev.store("interface", iface_object)
# An alternative method is to directly append the object.
# If duplicates aren't a concern, this is the faster method.
# dev.interface.append(iface_object)
if interface.get("telnetEnabled"):
# "service" represents a typical network service running
# on the device, such as a Telnet or FTP server.
# You add information about the service to the Service() model,
# then pass it to store() to insert it into PEAT's data model.
# The specific fields are described in the data model documentation.
service = Service(
protocol="telnet",
port=interface["telnetPort"],
enabled=bool(interface["telnetEnabled"] == "yes"),
transport="tcp",
)
# interface_lookup associates this service with an existing Interface object
# If this isn't needed, then the call would be "dev.store("service", service)"
dev.store(
key="service",
value=service,
interface_lookup={"ip": iface_object.ip},
)
# _parse() must ALWAYS return a DeviceData object
# If you wish to save off any intermediate values, either write them
# to a file using 'dev.write_file()' or store them in 'dev.extra'.
return dev
@classmethod
def _verify_http_unicast(cls, dev: DeviceData) -> bool:
"""
Verify the device is a ACME PLC by retrieving an
HTTP page and parsing it's contents.
"""
cls.log.debug(f"Checking {dev.ip} using HTTP")
port = dev.options["http"]["port"] # Port for this protocol
timeout = dev.options["http"]["timeout"] # Timeout for this protocol
try:
# Use of a "with" statement ensures the HTTP connection is closed
# when this function returns or an error occurs.
with HTTP(dev.ip, port, timeout) as http:
# This is a simplified example. Normally the page result
# would be parsed to extract useful information and
# annotated to the device object before returning. While
# simply pulling the page is usually enough to fingerprint
# a device, PEAT's philosophy is to extract as much useful
# information as possible from any data acquired, even if
# it may get that information again later via different
# methods (e.g. HTTP parsing, then SNMP queries).
response = http.get("example_device_page.html")
# If the device responded with an error (e.g. 404 page not found)
# then the response is None and verification fails.
if not response:
return False
data = response.json()
# Extract values from the response and add to the data model.
# ".pop()" is used to remove the values as they're read.
# This allows the remaining data to be added to "extra"
# without adding duplicates.
dev.name = data.pop("deviceName")
dev.description.model = data.pop("deviceModel")
dev.geo.timezone = data.pop("timezone")
dev.related.ip.update(data.pop("connectedIps"))
# Times in PEAT are datetime objects normalized to UTC timezone
# Note that PEAT provides a robust set of well-tested utilities
# to handle common tasks such as parsing timestamps. Use them!
# (Refer to the Internal API documentation for further details)
ts = utils.parse_date(data.pop("timeStarted"))
dev.start_time = ts.astimezone(tz=UTC)
# Set the "extra" field to leftover data
dev.extra = data
cls.log.debug(f"Successfully verified {dev.ip} using HTTP")
return True
except Exception as err:
# PEAT's philosophy to error handling is "proceed as far as we
# can without sacrificing safety and stability of the network".
# In this case, while the pull from this device failed, other
# devices in the PEAT run may still succeed, so the exception
# is logged and verification returns False.
cls.log.warning(
f"Failed to verify {dev.ip} via HTTP due to an unhandled exception: {err}"
)
return False
@classmethod
def pull_http(cls, dev: DeviceData) -> bool:
cls.log.info(f"Pulling HTTP from {dev.ip}")
with HTTP(
ip=dev.ip,
port=dev.options["http"]["port"], # Port for this protocol
timeout=dev.options["http"]["timeout"], # Timeout for this protocol
) as http:
response = http.get("awesome_output.json")
if not response:
# The level at which you log a failure is up to you.
# It could be a warning or an error, depending on how
# critical the protocol is to recovering an adequate
# amount of data from the device.
cls.log.error(f"HTTP pull failed for {dev.ip}")
return False
try:
data = response.json()
path = dev.write_file(data, "awesome_output.json")
cls.parse(path, dev)
except Exception as ex:
cls.log.warning(f"HTTP pull failed for {dev.ip}: {ex}")
return False
cls.log.info(f"Finished pulling HTTP from {dev.ip}")
return True
# Identification methods are injected (added to the class) after the class
# is defined. These methods do not have to be defined on the class, and can
# be functions defined elsewhere or imported from another library.
#
# Identification methods MUST accept a DeviceData instance as the first
# positional argument, and MUST return a bool indicating success or failure.
#
# Refer to the DeviceModule API documentation for further details on identify methods.
#
# To test this example, run an HTTP server locally:
# python3 -m http.server 8090 --directory examples/example_peat_module/
# peat scan -d AwesomeTool -I examples/example_peat_module/awesome_module.py -i localhost
AwesomeTool.ip_methods = [
IPMethod(
# Name of the method, this can be whatever you want
name="awesome_scrape_http_page",
# Set the description to the Python docstring of the identify function
# This can also be defined as a string.
description=str(AwesomeTool._verify_http_unicast.__doc__).strip(),
# unicast_ip or broadcast_ip (if your method sends broadcast packets)
type="unicast_ip",
# Callback function to perform fingerprinting.
# Note the lack of parenthesis '()'. This is the function object, not
# a call to the function. Functions are first-class objects in Python.
identify_function=AwesomeTool._verify_http_unicast,
# How reliable the method is, in general, rated on scale of 1-10
reliability=8,
# Name of the application protocol
protocol="http",
# Transport protocol, 'tcp', 'udp', or 'other'
transport="tcp",
# The standard port used by this protocol.
# For example, Telnet would be 23.
default_port=8090,
)
]
# Where any serial methods would go. This is not required unless you're
# interacting with a device over a serial link. Look at the serial-supporting
# PEAT modules for examples of this, such as SELRelay.
AwesomeTool.serial_methods = []
# "__all__" is a Python-ism that reduces "namespace pollution"
__all__ = ["AwesomeTool"]
8.2.3.2. Using the created module¶
Since this is a fictional example and awesome-tool does not exist, assume it has been run and generated awesome_output.json (this can be found in examples/example_peat_module/).
How to run the AwesomeTool module:
# "-d AwesomeTool" : the "AwesomeTool" module you created (this matches the name of the Python class)
# "-I awesome_module.py" : import the module code so it's usable by PEAT
# "-- ./awesome_output.json" : the file to parse
peat parse -d AwesomeTool -I awesome_module.py -- ./awesome_output.json
Example terminal output from running the included example module:
$ pdm run peat parse --no-logo -d AwesomeTool -I ./examples/example_peat_module/awesome_module.py -- ./examples/example_peat_module/awesome_output.json
17:01:21.039 INFO log_utils Log file: peat_results/parse_default-config_2024-06-26_17-01-20_171942128019/logs/peat.log
17:01:21.040 INFO peat.init Run directory: parse_default-config_2024-06-26_17-01-20_171942128019
17:01:21.041 INFO parse_api Parsing 1 filepaths
17:01:21.042 INFO parse_api Parsing AwesomeTool file '/home/cegoes/peat/examples/example_peat_module/awesome_output.json'
17:01:21.050 INFO utils Saved parse summary to peat_results/parse_default-config_2024-06-26_17-01-20_171942128019/summaries/parse-summary.json
17:01:21.050 INFO parse_api Completed parsing of 1 files in 0.01 seconds
17:01:21.051 INFO peat.cli_main Finished run in 0.02 seconds at 2024-06-26 17:01:21.051042+00:00 UTC
The results are in device-data-full.json:
{
"description": {
"vendor": {
"id": "ACME",
"name": "ACME, Inc."
}
},
"id": "192.168.0.20",
"ip": "192.168.0.20",
"name": "SomeDevice",
"type": "PLC",
"os": {
"full": "Ubuntu 19.10",
"name": "Ubuntu",
"version": "19.10"
},
"interface": [
{
"type": "ethernet",
"ip": "192.168.0.20",
"subnet_mask": "255.255.255.0",
"gateway": "192.168.0.1",
"services": [
{
"enabled": true,
"port": 21,
"protocol": "telnet",
"transport": "tcp"
}
]
}
],
"service": [
{
"enabled": true,
"port": 21,
"protocol": "telnet",
"transport": "tcp"
}
],
"related": {
"ip": [
"192.168.0.1",
"192.168.0.20"
],
"ports": [
21
],
"protocols": [
"telnet"
]
}
}
8.2.4. Adding a module to PEAT¶
Contributing a new module to be included in PEAT’s codebase.
Create a vendor directory for the device in
peat/modules/, and potentially a sub-directory if the device will have a lot of source modules (in other words, more than one.pyfile). Note: if the vendor already exists, just use the existing directory.Example: the SELRelay device module is in
peat/modules/sel/, and the ION module is inpeat/modules/schneider/ion/
Create the
DeviceModuleclass in a.pyfile with the name of the class in lower-case, e.g.peat/modules/<vendor>/<device>.py(see Example of writing an PEAT module)Example: the ION module is in
peat/modules/schneider/ion/ion.py
Add the module import to the
__init__.pyfiles for each package and sub-package. This will need to be done in each nested directory, for example the ION module will need to be imported inpeat/modules/__init__.py,peat/modules/schneider/__init__.py, andpeat/modules/schneider/ion/__init__.py. Example of an import:# peat/modules/schneider/ion/__init__.py from .ion import ION # peat/modules/schneider/__init__.py from .ion import ION # peat/modules/__init__.py from peat.modules.schneider import ION
Add command line usage examples to the docstrings for the relevant commands in
peat/cli_args.py, for example if your module supports scanning and pulling then you should add examples toscan_examplesandpull_examplesinpeat/cli_args.py.(Optional) If there are any device-specific configuration options needed (like a special username or whatever that needs to be configurable by the user), then do the following:
Add a
default_optionsclass attribute to your module and populate it with options for your module. Options specific to your module should be nested under a key with the name of your module, e.g."sel"for options specific to SEL devices, and general options like protocols should go under a separate key. For example:class SELRelay(DeviceModule): default_options = {211 "ymodem": { "baudrate": 57600, }, "web": { "user": "", "pass": "", }, "sel": { # If the relay should be restarted after the push completes "restart_after_push": False } }
Add the options to the example YAML config file
examples/peat-config.yaml, including default values and detailed descriptions of the options and how they should be used. This will serve as documentation for the options. It’s also the only way for users to know how to configure your module, so don’t be afraid to go into detail!
Add the device information (vendor, model, known tested, firmware, etc.) to the table of supported devices in Introduction section of the PEAT documentation
Write a basic set of tests for your module (refer to the existing tests as well as the Test documentation)
(Optional) Create a dedicated documentation page with details on the device in the documentation. A few examples of this are
sel.rstandsiemens.rst.
8.2.5. Development tips and tricks¶
When developing a module, a common practice is to create a Python file that directly invokes the module and desired methods instead of running the normal PEAT entrypoint. This has a number of advantages, including faster startup and execution time, less extraneous output, ability to attach a debugger, fine-grained control over what methods get executed, ability to drop into an interpreter prompt (the “Read-Eval-Print-Loop”, aka the “REPL”) for live development, and the ability to use Jupyter notebooks.
Create a file mymodule_testing.py with the following code:
from pathlib import Path
from peat import SELRelay, initialize_peat
initialize_peat({'VERBOSE': True, 'DEBUG': 1})
input_path = Path('examples/devices/sel/sel_351/set_all.txt')
dev = SELRelay.parse(input_path)
print(dev.export())
Then, invoke it:
# Run and exit
pdm run python mymodule_testing.py
# Drop into an interpreter for inspecting variables and doing live development
# Note the "-i" argument to python
pdm run python -i mymodule_testing.py
>>> dev
>>> some_other_dev = SELRelay.parse(Path('some_other_file.txt'))
>>> import datastore
>>> pull_dev = datastore.get("192.0.2.22")
>>> SELRelay.pull_project(pull_dev)
>>> pull_dev.firmware.version
8.2.5.1. Debugging from the CLI¶
There are two debugging-specific command line arguments:
--pdb(--launch-debugger): this will begin the run, then break into the Python Debugger (pdb). This is useful for inspecting program state, and stepping through the program.--repl(--launch-interpreter): begin the PEAT run, then break out into the Python interactive interpreter (aka the “Read-Eval-Print-Loop” or “REPL”). This is useful for getting access to the Python API via the executable distribution, checking program state, or testing a hypothesis about how stuff should function in specific conditions.