8.5. Device module implementations

Code documentation for the DeviceModule implementations included with PEAT.

Note

The source code for documented classes and functions is available by clicking the source to the right of the documentation for the class or function.

8.5.1. Class diagram

8.5.2. Rockwell Automation

8.5.2.1. Allen-Bradley ControlLogix PLC

PEAT module for the Allen-Bradley ControlLogix device.

Listening services (EN2TR)

  • FTP (TCP 21) (if enabled in config)

  • HTTP (TCP 80)

  • SNMP (UDP 161)

  • ENIP (UDP 2222)

  • CIP (UDP 44818 and TCP 44818)

Listening services (EWEB)

  • FTP (TCP 21) (if enabled in config)

  • HTTP (TCP 80)

  • SNMP (UDP 161)

  • ENIP (UDP 2222)

  • CIP (UDP 44818 and TCP 44818)

Authors

  • Christopher Goes

  • Mark Woodard

class ControlLogix[source]

Allen-Bradley ControlLogix devices.

Supported communication modules: EN2T/D, EWEB, EN2TR, EN2TR/C, L8 CPU

device_type: str = 'PLC'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Rockwell'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Rockwell Automation/Allen-Bradley'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'ControlLogix'

Device brand. Elasticsearch field: host.description.brand.

model: str = '1756'

Device’s default model (if not known). Elasticsearch field: host.description.model.

default_options: dict[str, Any] = {'ftp': {'pass': '', 'user': 'Administrator'}, 'rockwell': {'pull_methods': ['cip', 'ftp', 'http', 'snmp']}, 'web': {'pass': '', 'user': 'Administrator'}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

annotate_fields: dict[str, Any] = {'os.name': 'VxWorks', 'os.vendor.id': 'WindRiver', 'os.vendor.name': 'Wind River Systems'}

Fields that will be annotated (populated) by default for most operations, such as scan pull, parse, etc. Examples include known OS versions or hardware architecture. These fields will populated ONLY IF they are already unset on the device being annotated. Format is the path to the field to populate, e.g. os.name, os.vendor.id, etc.

module_aliases: list[str] = ['clx', 'allen-bradley']

Alternative names for looking up the module, e.g. for the -d CLI option. Aliases that can be used to refer to the device via the PEAT’s Module API (peat.module_manager).

classmethod update_dev(dev)[source]

Update the device’s data with metadata, inferences, and lookups.

Note

Data values are only changed if they’re unset. In other words, existing values will NOT be overwritten.

What’s populated:

  • Basic Module attributes, e.g. cls.vendor_name => dev.description.vendor.name.

  • Any Module-defined fields in cls.annotate_fields, if present.

  • Calls populate_fields(), which populates fields such as adding description values, network interfaces, and other values. This call will also implicitly lookup MAC addresses, IP addresses, and/or hostnames, unless disabled with the appropriate PEAT global configuration options.

Parameters:

dev (DeviceData) -- DeviceData instance to annotate.

Return type:

None

classmethod pull_ftp(dev)[source]

Pull files from a EWEB communication module via FTP.

Files pulled

  • *.eds

  • ReadMe.txt

  • Anything else on the device

Return type:

bool

Returns:

If the pull was successful

classmethod pull_snmp(dev)[source]

Pull data via SNMP from EWEB/EN2T communication modules.

Return type:

bool

Returns:

If the pull was successful

classmethod pull_http(dev)[source]

Pull device metadata, memory, syslog, and other data from a EN2T communication module via HTTP.

Note

Raw data pulled via each HTTP method is saved to a JSON file in the device results directory with a label of raw-<method>, where <method> is the method name, e.g. raw-home.

Return type:

bool

Returns:

If the pull was successful

classmethod pull_logic(dev)[source]

Pull raw process logic from the device via CIP.

By default, any slots (modules) that aren’t “Adapter” or “I/O” type are queried for logic.

Parameters:

dev (DeviceData) -- device data to use for storage and caching

Return type:

dict[str, dict[str, dict]]

Returns:

Logic value dict

classmethod parse_logic(dev, logic_values)[source]

Parse logic pulled from the device via CIP.

Return type:

str

ip_methods: list[IPMethod] = [IPMethod(name='ControlLogix FTP', description='Verify via FTP for devices with EWEB communication modules.', identify_function=<bound method ControlLogix._verify_ftp of <class 'peat.modules.rockwell.controllogix.ControlLogix'>>, reliability=8, protocol='ftp', transport='tcp', type='unicast_ip', default_port=21), IPMethod(name='ControlLogix HTTP page scraping', description='Verify via HTTP for devices with EN2T/EN2TR communication modules.', identify_function=<bound method ControlLogix._verify_http of <class 'peat.modules.rockwell.controllogix.ControlLogix'>>, reliability=8, protocol='http', transport='tcp', type='unicast_ip', default_port=80), IPMethod(name='ControlLogix SNMP sysDescr', description='Verify via SNMP for devices with EN2T/EN2TR communication modules by\n        querying for SNMP :term:`OID` ``1.3.6.1.2.1.1.1.0`` (``sysDescr``) and\n        checking the value.', identify_function=<bound method ControlLogix._verify_snmp of <class 'peat.modules.rockwell.controllogix.ControlLogix'>>, reliability=6, protocol='snmp', transport='udp', type='unicast_ip', default_port=161), IPMethod(name='ControlLogix CIP ListIdentity unicast', description='Verify via a unicast :term:`CIP` ListIdentity packet.', identify_function=<bound method ControlLogix._verify_cip_unicast of <class 'peat.modules.rockwell.controllogix.ControlLogix'>>, reliability=9, protocol='cip', transport='udp', type='unicast_ip', default_port=44818), IPMethod(name='ControlLogix CIP ListIdentity broadcast', description='Send a :term:`CIP` broadcast packet to broadcast IP and\n        wait for responses from devices.', identify_function=<bound method ControlLogix._verify_cip_broadcast of <class 'peat.modules.rockwell.controllogix.ControlLogix'>>, reliability=9, protocol='cip', transport='udp', type='broadcast_ip', default_port=44818)]

Methods for identifying devices via IP or Ethernet.

8.5.2.1.1. Rockwell Scanning

Methods for enumerating Allen-Bradley ControlLogix devices on a IP network.

Authors

  • Casey Glatter

  • Christopher Goes

  • Patricia Schulz

broadcast_scan(ip, port=44818, timeout=5.0)[source]

Scan by sending a broadcast packet and waiting for responses from devices.

Parameters:
  • ip (str) -- IP to broadcast using

  • port (int) -- Port to send broadcast to

  • timeout (float) -- Time to wait for responses, in seconds

Return type:

list[dict[str, dict | int | str]]

Returns:

List of basic device descriptions

fingerprint_device(sock)[source]

Listen for response from a device, verify it is a Allen-Bradley, then determine basic metadata about it.

Return type:

dict[str, dict | int | str] | None

enumerate_device_modules(ip, port=44818, chassis_size=8, timeout=5.0)[source]

Enumerate ControlLogix device modules.

Parameters:
  • ip (str) -- IPv4 address of device

  • port (int) -- TCP port to use for enumeration

  • chassis_size (int) -- Number of modules on the device

  • timeout (float) -- Number of seconds to wait before timing out

Return type:

dict[int, dict[str, int | str]]

Returns:

Metadata of any modules discovered during enumeration, keyed by slot number

extract_info_response(info)[source]

Extract metadata from response to ListIdentity or GetAttributesAll.

Return type:

dict[str, int | str]

annotate_slots(existing_module, new_module)[source]

Annotate the slots of a module with another module’s info.

Return type:

None

8.5.2.1.2. Rockwell Communications

ControlLogix Common Industrial Protocol (CIP) implementation.

Originally based on code from Agostino Ruscito’s pycomm library and heavily modified.

Authors

  • Craig Buchanan

  • Christopher Goes

class ClxCIP(ip, port, timeout=5.0, cpu_slot=0)[source]

Common Industrial Protocol (CIP) implementation for Allen-Bradley ControlLogix devices.

open()[source]

Prepares the driver for operation.

Return type:

bool

close()[source]

Cleans up the driver when finished.

Return type:

bool

get_all_data()[source]
Return type:

dict[str, dict]

get_instance_list(class_path)[source]

Returns the list of instance ids for a given class path.

Parameters:

class_path (tuple | tuple[int, Any]) -- The class to query

Return type:

list

Returns:

[<instance_id>,..]

get_attributes(instance_path, attribute_list=None)[source]

Returns attributes for a single instance at a given instance path.

Parameters:
  • instance_path (tuple | tuple[int, Any]) -- The instance to query

  • attribute_list (list[int] | None) -- The instance attributes to query (default: all attributes)

Return type:

dict

Returns:

{<attribute_id>: <attribute_value>,..}

get_attributes_multi(class_path, attribute_list=None, instance_list=None)[source]

Returns attributes for multiple instances at a given class path.

Example: attr = self.get_attributes_multi((CLASS_CODE['X Object'],))

Parameters:
  • class_path (tuple | tuple[int, Any]) -- The class path to query

  • attribute_list (list | None) -- The instance attributes to query (default: all attributes)

  • instance_list (list | None) -- The instances to query (default: all instances at class path)

Return type:

dict[int, dict]

Returns:

{<instance_id>: {<attribute_id>: <attribute_value>,..},..}

get_template_data(path=())[source]

Returns a tuple (template_attributes, template_tags) for all template instances at the given path.

Parameters:

path (tuple) -- Path to the instance containing the template class

Return type:

tuple[dict[int, dict], dict]

Returns:

(attr, tags)

  • attr = {<tem_id>:{<attr_id>:<attr_val>,..},..}

  • tags = {<tem_id>:[tag_data],..}

get_io_module_data(path=())[source]

Returns a tuple (io_module_attributes, io_module_tags) for all io_module instances at a given path.

Parameters:

path (tuple) -- Path to the instance containing the io module class

Return type:

tuple[dict[int, dict], dict]

Returns:

(attr, tags)

  • attr = {<iom_id>:{<attr_id>:<attr_val>,..},..}

  • tags = {<iom_id>:[tag_data],..}

get_program_data(path=())[source]

Returns a tuple (program_attributes, program_symbol_attributes, program_routine_attributes, program_routine_tags) for all program instances at a given path.

Parameters:

path (tuple) -- Path to the instance containing the program class

Return type:

tuple[dict, dict, dict, dict]

Returns:

(attr, sym_attr, rout_attr, rout_tags)

  • attr = {<prog_id>:{<attr_id>:<attr_value>,..},..}

  • sym_attr = {<prog_id>:{<sym_id>:{<attr_id>:<attr_val>,..},..},..}

  • rout_attr = {<prog_id>:{<rout_id>:{<attr_id>:<attr_val>,..},..},..}

  • rout_tags = {<prog_id>:{<rout_id>:[tag_data],..},..}

get_routine_data(path=())[source]

Returns a tuple (routine_attributes, routine_tags) for all routine instances at a given path.

Parameters:

path (tuple | tuple[int, Any]) -- Path to the instance containing the routine class

Return type:

tuple[dict[int, dict], dict]

Returns:

(attr, tags)

  • attr = {<rout_id>: {<attr_id>: <attr_value>,..},..}

  • tags = {<rout_id>: [tag_data],..}

get_map_data(path=())[source]

Returns a tuple (map_attributes, map_cxn_attributes) for all map instances at a given path.

Parameters:

path (tuple | tuple[int, Any]) -- Path to the instance containing the map class

Return type:

tuple[dict, dict]

Returns:

(attr, cxn_attr)

  • attr = {<map_id>: {<attr_id>: <attr_val>,..},..}

  • cxn_attr = {<map_id>: {<cxn_id>: {<attr_id>: <attr_val>,..},..},..}

get_unknown6e_data(path=())[source]

Returns a tuple (unknown6e_attributes, unknown6e_tags) for all unknown6e instances at a given path.

Parameters:

path (tuple) -- Path to the instance containing the unknown6e class

Return type:

tuple[dict[int, dict], dict]

Returns:

(attr, tags)

  • attr = {<u6e_id>: {<attr_id>: <attr_val>,..},..},

  • tags = {<u6e_id>: [tag_data],..}

get_instance_list_tag_buffer(class_path)[source]

Returns a data buffer representing a list of all instance IDs of a specified class path.

Parameters:

class_path (tuple | tuple[int, Any]) -- The class to query

Return type:

list

Returns:

Data buffer representing a list of all instance IDs of a specified class path

get_attributes_tag_buffer(instance_path, attribute_list)[source]

Returns a data buffer representing the specified attributes of an instance at a specified path.

Parameters:
  • instance_path (tuple | tuple[int, Any]) -- The instance to query

  • attribute_list (list[int]) -- Attributes to query

Return type:

list[int | bytes]

Returns:

Data buffer (list(attribute_data)) representing the specified attributes of an instance

read_tag(instance_path, tag_offset=0)[source]

Return a data buffer representing the tag data of an instance at a specified path (at a specified offset).

Parameters:
  • instance_path (tuple | tuple[int, Any]) -- The instance to query

  • tag_offset (int) -- The tag offset to start reading

Return type:

list[int]

Returns:

Data buffer representing the tag data of an instance

read_tag_fragmented(instance_path, tag_offset=0)[source]

Return a data buffer representing the tag data of an instance at a specified path (at a specified offset).

Parameters:
  • instance_path (tuple | tuple[int, Any]) -- The instance to query

  • tag_offset (int) -- The tag offset to start reading

Return type:

list[int]

Returns:

Data buffer representing the tag data of an instance

read_template(instance_path, size, offset=0)[source]

Return a data buffer representing the tag data of a template instance at a specified path (at a specified offset).

Parameters:
  • instance_path (tuple | tuple[int, Any]) -- The template instance to query

  • size (int) -- The size of the template instance to query

  • offset (int) -- The tag offset to start reading

Return type:

list

Returns:

Data buffer representing the tag data of a template instance

read_tag_with_size(instance_path, size)[source]

Return a data buffer representing the tag data of an instance at a specified path (with a specified size).

Parameters:
  • instance_path (tuple | tuple[int, Any]) -- The instance to query

  • size (int) -- The size of the instance to query

Return type:

list

Returns:

Data buffer representing the tag data of an instance

get_size_of_type(type_id, template_attributes, type_size_d1=0, type_size_d2=0, type_size_d3=0)[source]

Returns the size of the given type.

Parameters:
  • type_id (int) -- the ID of the type

  • template_attributes (dict) -- template object attributes

  • type_size_d1 (int) -- the size of the first dimension (if array)

  • type_size_d2 (int) -- the size of the second dimension (if array)

  • type_size_d3 (int) -- the size of the third dimension (if array)

Return type:

int

Returns:

Integer value of the size of the type (in bytes)

validate_reply_data(cip_data, service, min_size)[source]

Validates the reply data.

Parameters:
  • cip_data (bytes) -- The CIP data to verify

  • service (int) -- The reply service code

  • min_size (int) -- Used to check if the reply data is too short

Return type:

bool

Returns:

If validation was successful

get_instance_list_data_from_reply(cip_data)[source]

Returns a tuple (instance_list_data, next_instance_id) from get_instance_list reply data.

Parameters:

cip_data (bytes) -- Reply data at the CIP layer

Return type:

tuple[int | bytes, int]

Returns:

(data, next_instance_id)

  • data = [instance_list_data]

  • next_instance_id = instance id to read next if insufficient packet

  • space (-1 if complete or error)

get_attributes_data_from_reply(cip_data)[source]

Returns the tuple (attribute_data, attribute_count) from get_attributes reply data.

Parameters:

cip_data (bytes) -- Reply data at the CIP layer

Return type:

tuple[int | bytes, int]

Returns:

(data, count)

  • data = [instance_attribute_data]

  • count = number of attributes in the reply (-1 if complete or error)

read_tag_data_from_reply(cip_data)[source]

Return the tuple (tag_data, tag_size) from read_tag reply data.

Parameters:

cip_data (bytes) -- Reply data at the CIP layer

Return type:

tuple[int | bytes, int]

Returns:

(data, offset)

  • data = [tag_data]

  • size = size of the data in the reply (-1 if complete or error)

read_tag_fragmented_data_from_reply(cip_data)[source]

Return the tuple (tag_data, tag_size, tag_address) from read_tag_fragmented reply data.

Parameters:

cip_data (bytes) -- Reply data at the CIP layer

Return type:

tuple[int | bytes, int, bytes]

Returns:

(data, size, address)

  • data = [tag_data]

  • size = size of the data in the reply (-1 if complete or error)

  • address = base memory address of the tag in the device

read_tag_with_size_data_from_reply(cip_data)[source]

Return the tag_data from read_tag_with_size reply data.

Parameters:

cip_data (bytes) -- Reply data at the CIP layer

Return type:

int | bytes

Returns:

[tag_data]

path_to_string(path)[source]

Returns a string representing the path from a tuple.

Parameters:

path (tuple | tuple[int, Any]) -- tuple representation of the path

Return type:

bytes

Returns:

Byte string representation of the path (bytes)

parse_get_instance_list(cip_data)[source]

Returns a list of instance ids (int) from raw instance list CIP data.

Parameters:

cip_data (bytes) -- [<instance_list_data>]

Return type:

list[int]

Returns:

[instance_id0, instance_id1, ...]

parse_get_attributes(cip_data, cip_class)[source]

Returns a dictionary of attributes from the raw CIP data and the CIP class id (int).

Parameters:
  • cip_data (bytes) -- [<attributes_data>] (Represented as bytes currently)

  • cip_class (int) -- CIP class of the data

Return type:

dict[int, int]

Returns:

Dict with {<attr_id>:<attr_val>,..}

parse_template(cip_data, member_count)[source]

Parses a template tag and returns a data structure representing the template.

Parameters:
  • cip_data (list) -- A buffer (list of bytes) of the template tag

  • member_count (int) -- The number of members represented by the template

Return type:

dict[str, str | dict]

Returns:

{"Name":<template_name>,
"Structure":{
   <member_offset1>:{
       "Name":<member_name>,
       "Type":<member_type>,
       "Info":<member_info>,
       }, ..
   }
}

class ClxHTTP(*args, **kwargs)[source]

HTTP interface for Allen-Bradley ControlLogix Ethernet communication modules.

Supported communication modules and functions

  • EN2TR: all functions except serverlog
    • memory

    • network

    • home

    • device_identity

    • syslog

    • diagnetwork

    • modules

    • module_list

  • EWEB: all functions except memory, syslog, and device_identity
    • network

    • home

    • serverlog

    • diagnetwork

    • modules

    • module_list

  • L8 CPU (built-in Ethernet): network (no other functions work)
    • network

MEMORY_GROUPS = [{'name': 'watchdog_log', 'page': 'WatchDog', 'search': 'WatchDog Log'}, {'name': 'internal_memory', 'page': 'Internal%20Memory', 'search': 'Internal Memory'}, {'name': 'parameter_area', 'page': 'Parameter%20Area', 'search': 'Parameter Area'}]
HOME = 'home.asp'

Device metadata page.

SERVERLOG = 'serverlog.asp'

HTTP server log page.

DIAGNETWORK = 'diagnetwork.asp'

Network diagnostics page.

CHASSIS_WHO = 'chassisWho.asp'

Page with a list of modules.

UPTIME_RE = re.compile('((?P<years>\\d+) years)?[ ,]*((?P<months>\\d+) months)?[ ,]*((?P<days>\\d+) days)?[ ,]*(?P<hours>\\d{1,2})h:(?P<minutes>\\d{1,2})m:(?P<seconds>\\d{1,2})\\.?(?P<milliseconds>\\d{3})?s', re.IGNORECASE|re.ASCII)
SYSLOG_PAGE_RE = re.compile('rokform/SysListDetail\\?name=Full List&id=(\\d+)&comp=SysLog', re.IGNORECASE|re.ASCII)
get_memory()[source]

Extracts and aggregates memory information from several pages.

Return type:

dict[str, dict[str, str]]

retrieve_memory_page(memory_page)[source]

Parses memory from webpage to the values as a hex string.

Return type:

dict

classmethod parse_memory_page(text)[source]
Return type:

tuple[str, dict[str, str]]

static add_padding(input_data)[source]

Adds padding for memory values, e.g. convert 01 to 00000001.

Return type:

str

classmethod process_memory(dev, memory_regions)[source]

Add data from the parsed memory to a DeviceData object.

Return type:

None

get_network()[source]

Extracts and aggregates network information from several pages.

Return type:

dict[str, dict | list]

retrieve_statistics(name)[source]

Retrieves and parses a page of statistics into a dictionary.

Return type:

dict[str, str]

classmethod parse_statistics(text)[source]

Parse and extract a network statistics from HTML text.

Return type:

dict[str, str]

retrieve_net_table(name)[source]

Retrieves and parses a table from a network information page into a dictionary.

Return type:

list[dict[str, str]]

classmethod parse_net_table(text)[source]

Parse and extract a network data table from HTML text.

Return type:

list[dict[str, str]]

classmethod process_network(dev, info)[source]

Add data from the parsed network information pages to a DeviceData object.

Return type:

None

get_home()[source]

Home page with basic information about device.

Return type:

dict

classmethod parse_home(text)[source]

Parse home.asp HTML data.

Return type:

dict

classmethod process_home(dev, info)[source]

Add data from the parsed home page to a DeviceData object.

Return type:

None

get_device_identity()[source]

Get Device Identity page data.

Return type:

dict

classmethod parse_device_identity(text)[source]

Parse device identity page data.

Return type:

dict

classmethod process_device_identity(dev, info)[source]
Return type:

None

get_syslog()[source]

Get Syslog data (page: Syslog -> Full List).

Return type:

list[dict]

classmethod parse_syslog_page(text)[source]

Parse Syslog data page.

Return type:

list[dict]

classmethod process_syslog(dev, syslog)[source]

Add data from the parsed syslog pages to a DeviceData object.

Return type:

None

get_serverlog(username='Administrator', password='')[source]

Get serverlog data from a EWEB module. :rtype: list[dict]

Note

This page requires authentication. The default credentials are attempted but could differ on other devices. They are configurable via the “web” option in the PEAT configuration file.

classmethod parse_serverlog_page(text)[source]

Parse serverlog data page from a EWEB module.

Return type:

list[dict]

classmethod process_serverlog(dev, serverlog)[source]

Add data parsed EWEB serverlog page to a DeviceData object.

This log contains a history of web page requests to the EWEB module and the source IP of those requests. :rtype: None

  • Possible values for event.category: network, web

  • Possible values for event.kind: event, pipeline_error

  • Possible values for event.type: access, allowed, denied, error, user

get_diagnetwork()[source]

Retrieve and parse network interface settings page (diagnetwork).

Return type:

dict

classmethod parse_diagnetwork(text)[source]

Parse and extract data from diagnetwork.asp HTML page data.

Return type:

dict

classmethod process_diagnetwork(dev, info)[source]

Add data from the parsed diagnetwork page to a DeviceData object.

Return type:

None

get_modules()[source]

Get information about all the modules on the chassis.

Return type:

list[dict]

get_module_list()[source]

Get list of modules on the chassis.

Return type:

list[dict]

classmethod parse_module_list(text)[source]

Parse and extract list of modules from HTML text.

Return type:

list[dict]

retrieve_module(slot)[source]

Information about a specific module in the chassis.

Return type:

dict

classmethod parse_module_page(text)[source]

Parse and extract information about a PLC rack module from HTML text.

Return type:

dict

classmethod process_modules(dev, raw_modules)[source]

Add data from the parsed modules to a DeviceData object.

Return type:

None

get_all(dev)[source]

Retrieves and processes data using all available methods.

Return type:

bool

8.5.2.1.3. Rockwell Parsing

Rockwell Allen-Bradley ControlLogix PLC logic parsing methods.

Authors

  • Craig Buchanan

  • Christopher Goes

parse_logic(logic_dict, driver=None)[source]

Parse memory layout, symbol list, and logic from a attributes dict.

The logic dict should have the following keys

  • template_attributes

  • template_tags

  • symbol_attributes

  • program_attributes

  • program_symbol_attributes

  • program_routine_attributes

  • program_routine_tags

  • map_attributes

  • map_cxn_attributes

Parameters:
  • logic_dict (dict[str, dict]) -- Attributes dict

  • driver (ClxCIP | None) -- CIP driver to use for tag lookups, if needed

Return type:

str

Returns:

The parsed memory layout, symbol list, and logic as a humand-readable string, or an empty string if the parse failed

extract_memory_layout(symbol_attributes, program_attributes, program_routine_attributes, program_symbol_attributes, map_attributes, map_cxn_attributes)[source]
Return type:

list[tuple[int, str]]

memory_layout_to_str(memory_list)[source]

Parses a memory list into a formatted layout of the memory.

Parameters:

memory_list (list[tuple[int, str]]) -- List of Value/Label tuples defining the memory layout

Return type:

str

Returns:

Formatted human-readable layout of the memory.

extract_symbol_list(symbol_attributes, program_symbol_attributes, program_attributes)[source]
Return type:

list[tuple[int, str]]

parse_process_logic(program_routine_tags, program_routine_attributes, symbol_list, template_tags, driver=None)[source]
Return type:

str

ControlLogix ladder logic parsing libraries.

Authors

  • Craig Buchanan

  • Christopher Goes

  • Greg Walkup

decompile_ladder_process_logic(process_logic, symbol_list, template_tags, starting_address=0)[source]

Transforms device bytecode into a string representing the ladder logic.

Parameters:
  • process_logic (list[int]) -- the bytecode representation of the relay ladder routine

  • symbol_list (list[tuple[int, str]])

  • template_tags (dict)

  • starting_address (int) -- the starting address of the relay ladder routine in the device

Return type:

str

Returns:

String representing the source code of a relay ladder routine from the respective bytecode format from the device

disassemble_ladder_process_logic(process_logic, starting_address=0)[source]

Disassembles a buffer (list of bytes) representing the process logic.

Return type:

str

instruction_buffer_to_instruction_list(instruction_buffer)[source]

Converts an instruction buffer to an instruction list.

Parameters:

instruction_buffer (list) -- instruction buffer as little-endian formatted list of bytes

Return type:

list

Returns:

Instruction list as properly formatted list of 32-bit integers

decompile_process_logic_segment(instruction_list, starting_address, indent_level=0)[source]

Decompiles a segment of process logic.

Decompiles a segment of process logic by calling each instruction’s decompile method. Note that each decompile call may consume more than one instruction and may recursively call this function.

Parameters:
  • instruction_list (list[int]) -- The list of instructions to decompile. It is implicitly assumed that if any operation is in the list, the list also contains all of the instructions required by its decompiliation function.

  • starting_address (int) -- The address of the first instruction in the list

  • indent_level (int) -- The indentation level to use

Return type:

str

Returns:

A string representing the decompiliation of the given instructions

disassemble_instruction(instruction)[source]

Obtains the human-readable instruction from the bytecode instruction.

Parameters:

instruction (int) -- uint byte-code instruction

Return type:

str

Returns:

String representing human-readable disassembled instruction

ControlLogix structured text logic parsing libraries.

Authors

  • Craig Buchanan

  • Christopher Goes

exception DisassembleStringLogicError(value)[source]
decompile_string_process_logic(logic_string, template_tags=None, driver=None)[source]

Returns the original source code of the structured text logic from the zlib-compressed string of the disassembled process logic.

Parameters:
  • logic_string (str) -- the zlib decompressed string

  • template_tags (dict | None)

  • driver (ClxCIP | None) -- Driver to use to resolve Symbol objects, if available

Return type:

str

Returns:

The original source code of the structured text

disassemble_string_process_logic(logic_bytes, logic_language)[source]

Returns the disassembled logic of a text-based program as it is stored on the Logix5000 device.

Parameters:
  • logic_bytes (bytes) -- the buffer (byte list) of the logic captured from the Logix5000 device

  • logic_language (int) -- ID of the logic language to parse

Return type:

tuple[str, str]

Returns:

Tuple of a string of the disassembled text-based process logic and a string of the text-based logic

format_logic_string_fbd(logic_string_raw)[source]

Returns the meaningful logic string from the raw string buffer stored on the Logix5000 device (function block diagram).

Parameters:

logic_string_raw (bytes) -- The raw string buffer from the device

Return type:

bytes

Returns:

The meaningful logic string

format_logic_string_sfc(logic_string_raw)[source]

Returns the meaningful logic string from the raw string buffer stored on the Logix5000 device (sequential function chart).

Parameters:

logic_string_raw (bytes) -- The raw string buffer from the device

Return type:

bytes

Returns:

The meaningful logic string

format_logic_string_st(logic_string_raw)[source]

Returns the meaningful logic string from the raw string buffer stored on the Logix5000 device (structured text).

Parameters:

logic_string_raw (bytes) -- The raw string buffer from the device

Return type:

bytes

Returns:

The meaningful logic string

8.5.2.2. Studio5000/RSLogix 5000 L5X file parsing

PEAT module to consume L5X files that have been exported by a Rockwell IDE. This could be RSLogix 50, 500, 5000, Studio 5000, or whatever it is/was called. Only more recent versions of the Rockwell IDE can export/import L5X, and only some of those can process “full L5X” as opposed to sections of the logic file.

The L5X format is documented by Rockwell. While the descriptions of fields is not great, it can be helpful for figuring out what a field does. The reference is available on the PEAT wiki (go to Allen-Bradley section).

Authors

  • John Mulder

  • Jennifer Trasti

  • Christopher Goes

class L5X[source]

Parser for consuming L5X files that have been exported by Rockwell IDE.

device_type: str = 'Project'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Rockwell'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Rockwell Automation/Allen-Bradley'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

filename_patterns: list[str] = ['*.l5x', '*.L5X']

Patterns of files the module is capable of parsing, if any. Patterns are a literal name (*SET_ALL.TXT) or a Unix shell glob (*.rdb), are case-insensitive, and must start with a wildcard character (*). Globs can be anything accepted by glob.

module_aliases: list[str] = ['Logic_L5X', 'L5X_logic', 'l5x_logic', 'Rockwell_l5x']

Alternative names for looking up the module, e.g. for the -d CLI option. Aliases that can be used to refer to the device via the PEAT’s Module API (peat.module_manager).

default_options: dict[str, Any] = {}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

8.5.2.2.1. Parsing

Functionality for pulling some interesting fields out of RSLogix L5X files.

Uses the l5x library for parsing, most functions return data structured for PEAT.

Authors

  • John Mulder

  • Jennifer Trasti

  • Christopher Goes

get_ip_address(project)[source]

Takes a l5x.Project object. Returns a string of the IP communication path.

Return type:

str

get_logic(project)[source]

Return a logic dict created from the project object.

Return type:

dict

get_modules_data(project)[source]

Gets module device data from an L5X project.

Return type:

list[dict]

get_programs(project)[source]

Takes a l5x.Project object. Returns dict of programs where key is the program name and value is a dict of some known interesting attributes of the program.

Return type:

dict

get_tags(project)[source]

Takes a l5x.Project object. Returns dict of tags where key is the tag name and value is a dict of attributes of the tag

Return type:

dict

get_comm_ports(project)[source]
Return type:

list[dict]

get_project_description(project)[source]
Return type:

str

8.5.3. Camlin

8.5.3.1. Totus DGA

class Totus[source]

PEAT module for the Camlin Totus G9 Dissolved Gas Analyzer (DGA).

Listening services

  • SSH (TCP 22)

  • HTTP (TCP 80)

Data collected

  • Metadata

  • Status

  • Network configuration

  • DNP3 registers

  • Modbus registers

Authors

  • Christopher Goes

  • Thomas Byrd

device_type: str = 'DGA'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Camlin'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Camlin Ltd'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'Totus'

Device brand. Elasticsearch field: host.description.brand.

model: str = 'G9'

Device’s default model (if not known). Elasticsearch field: host.description.model.

default_options: dict[str, Any] = {'http': {'pass': '', 'user': ''}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

ip_methods: list[IPMethod] = [IPMethod(name='Totus DGA scrape HTTP homepage', description='Verify the device is a DGA via HTTP.', identify_function=<bound method Totus._verify_http of <class 'peat.modules.camlin.totus.Totus'>>, reliability=6, protocol='http', transport='tcp', type='unicast_ip', default_port=80)]

Methods for identifying devices via IP or Ethernet.

8.5.3.1.1. Totus HTTP

class TotusHTTP(*args, **kwargs)[source]

HTTP interface for the Camlin Totus Dissolved Gas Analyzer (DGA).

DEFAULT_HEADERS: dict = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36', 'X-Requested-With': 'XMLHttpRequest', 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="90"', 'sec-ch-ua-mobile': '?0'}
property logged_in: bool
login(username, password)[source]
Return type:

bool

get_and_process_all(dev)[source]

Get all data and process any successful retrievals into device data model.

Return type:

bool

Returns:

If at least one method was successful

static process_hardware_info(dev, hw_info)[source]
Return type:

None

static process_timedate(dev, time_info)[source]
Return type:

None

static process_ntp_config(dev, ntp_config)[source]

Add GPS/NTP remotes to set of “related” hosts and IPs.

Return type:

None

static process_ntp_status(dev, ntp_status)[source]

Add GPS/NTP peers to set of “related” hosts and IPs.

Return type:

None

static process_network_configuration(dev, net_config)[source]
Return type:

None

static process_users(dev, users)[source]
Return type:

None

static process_roles(dev, roles)[source]

Permissions allocated to roles.

NOTE: This MUST be called after process_users()!

Return type:

None

static process_system_info(dev, system_info)[source]
Return type:

None

static process_serial_ports(dev, serial_ports)[source]

Serial ports on device.

Return type:

None

static process_ssh_keys(dev, ssh_keys)[source]

Extract usernames, IPs, and/or hostnames from SSH public keys.

Return type:

None

static process_openvpn(dev, openvpn)[source]

Extract hostnames and/or IPs from OpenVPN configs.

Return type:

None

static process_wifihotspot(dev, wifihotspot)[source]
Return type:

None

static process_modbus_interfaces(dev, modbus_interfaces)[source]

Modbus configuration.

Return type:

None

static process_dnp3_channels(dev, dnp3_channels)[source]

DNP3 configuration.

Return type:

None

static parse_modbus_map(modbus_map_html)[source]

Parse raw HTML page with the Modbus register map.

Return type:

list[dict]

static process_modbus_map(dev, mb_data)[source]
Return type:

None

static parse_dnp3_map(dnp3_map_html)[source]

Parse raw HTML page with the DNP3 register map.

Return type:

dict[str, list[dict]]

static process_dnp3_map(dev, dnp3_data)[source]
Return type:

None

8.5.4. Fortinet

8.5.4.1. Fortigate Firewall

PEAT module for Fortinet Fortigate devices. It has been tested with the Fortigate FG100F firewall.

Types of files that can be parsed: - Config files (.conf) - Diagnostic output (“Debug Logs”) - Events from Fortianalyzer - Memory events

Authors

  • Christopher Goes

  • Danyelle Loffredo

  • Juan Dorantes Cardenas

class Fortigate[source]

Fortinet Fortigate firewalls. This module supports the FG100F firewall.

device_type: str = 'Firewall'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Fortinet'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Fortinet, Inc.'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'FortiGate'

Device brand. Elasticsearch field: host.description.brand.

filename_patterns: list[str] = ['*Fortigate*.conf', '*ortigate*.conf', 'FG100F*.log', 'fortianalyzer-event*.log', 'memory-event-*.log', 'sys_config']

Patterns of files the module is capable of parsing, if any. Patterns are a literal name (*SET_ALL.TXT) or a Unix shell glob (*.rdb), are case-insensitive, and must start with a wildcard character (*). Globs can be anything accepted by glob.

can_parse_dir: bool = True

If the module will accept a directory as the source path for parsing. If this is True, a Path like Path("./some_files/") would be a valid target for parsing. Any handling of files in the directory will have to be handled by the module, not PEAT.

module_aliases: list[str] = ['fg100', 'fg']

Alternative names for looking up the module, e.g. for the -d CLI option. Aliases that can be used to refer to the device via the PEAT’s Module API (peat.module_manager).

default_options: dict[str, Any] = {'fortigate': {'log_pull_timeout': 30.0, 'pull_methods': ['ssh', 'https']}, 'ssh': {'pass': '', 'user': ''}, 'web': {'pass': '', 'user': ''}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

classmethod pull_ssh(dev)[source]

Retrieve the sys_config via SCP over SSH from /config/sys_config. :rtype: bool

Warning

SSH pull will only work if admin-scp option is enabled on the device. SSH to device, then run config system global, set admin-scp enable, end, and exit.

classmethod pull_https(dev)[source]

Pull configuration and other data from the relay via HTTPS.

Return type:

bool

ip_methods: list[IPMethod] = [IPMethod(name='Fortigate SSH login', description='None', identify_function=<bound method Fortigate._verify_ssh of <class 'peat.modules.fortinet.fortigate.Fortigate'>>, reliability=6, protocol='ssh', transport='tcp', type='unicast_ip', default_port=22), IPMethod(name='Fortigate web page check', description='None', identify_function=<bound method Fortigate._verify_https of <class 'peat.modules.fortinet.fortigate.Fortigate'>>, reliability=8, protocol='https', transport='tcp', type='unicast_ip', default_port=443)]

Methods for identifying devices via IP or Ethernet.

8.5.5. General Electric

8.5.5.1. GE D25 RTU

class D25Telnet(ip, port=23, timeout=5.0, menu_sleep_seconds=5.0, raw_dir=None)[source]

Implementation of the command interface for the GE D25 RTU over Telnet.

login(username, password)[source]

Logs into D25 telnet with supplied username and password.

Return type:

bool

is_desired_menu(menu)[source]

Checks to see if telnet session is at main menu.

Return type:

bool

select_menu_option(option)[source]

Sends a number, letter, or command corresponding to a menu option.

Return type:

None

navigate_menus(menu_options)[source]

Navigates to desired menu in telnet sessions.

Return type:

bool

get_data_digital_io(io)[source]

Parse data of Digital Input Display submenu.

Return type:

str

get_data_erroruser_logs(remove_time)[source]

Parse data function Error and User logs. Returns up to 20 latest log entries.

Return type:

str

get_internet_stats()[source]

A parse data function for Internet stats.

Return type:

str

class GERTU[source]

PEAT module for the GE D25 RTU.

Listening services

  • Telnet (TCP 23)

Authors

  • Christopher Goes

  • Justin Cox, Idaho National Laboratory (INL)

device_type: str = 'RTU'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'GE'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'General Electric'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'Multilin'

Device brand. Elasticsearch field: host.description.brand.

model: str = 'D25'

Device’s default model (if not known). Elasticsearch field: host.description.model.

supported_models: list[str] = ['D25']

Device models this module supports or is known to work with.

default_options: dict[str, Any] = {'ge': {'menu_sleep_seconds': 5.0}, 'telnet': {'pass': 'Password', 'user': 'User'}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

ip_methods: list[IPMethod] = [IPMethod(name='GE D25 RTU Telnet', description='Verify GE D25 RTU via Telnet by attempting to connect and\n        login to the Telnet user interface.', identify_function=<bound method GERTU._verify_telnet of <class 'peat.modules.ge.ge_rtu.GERTU'>>, reliability=5, protocol='telnet', transport='tcp', type='unicast_ip', default_port=23)]

Methods for identifying devices via IP or Ethernet.

clean_up_data(data, rm_time=True)[source]

Filters and removes garbage data from telnet session.

Return type:

str

parse_digital_io_menu(data, io)[source]

Parse Digital Input and Digital Output display menus.

Return type:

dict

parse_analog_io_menu(data, io)[source]

Parse Analog Input and Analog Output display menus.

Return type:

dict

parse_errorlog_menu(data)[source]

Parse Error Log Menu.

Return type:

dict

parse_userlog_menu(data)[source]

Parse User Log Menu.

Return type:

dict

parse_internet_stats_menu(data)[source]

Get Internet Statistics by parsing internet stat sub-menus.

Return type:

dict

8.5.5.2. GE Relays

class GERelay[source]

PEAT module for GE Multilin Relays.

Listening services

  • HTTP (TCP 80)

Web pages

  • /IEC61850InfoMenu.htm

  • /CustomerSupport.htm

  • /ProcessCardMenu.htm

  • /DeviceInfoMenu.htm
    • /DisplayDump.htm

    • /DNPPoints.htm

    • /HF03DisableStatus.htm

    • /FlexInteger.htm

    • /FlexAnalog.htm

    • /FlexLogicParameters.htm

  • /memoryMap.htm
    • ?0x<HEXADDRESS>

  • /USBStats.htm

  • /FaultReport.htm

  • /RoutingAndArpTable.htm

  • /EventRecorder.htm
    • ?<# of alerts, 0 to show all>

  • /DefaultSettingsDiagnostics.htm

  • /FlexOperandStates.htm

Authors

  • Christopher Goes

  • Daniel Hearn, Idaho National Laboratory (INL)

device_type: str = 'Relay'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'GE'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'General Electric'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'Multilin'

Device brand. Elasticsearch field: host.description.brand.

can_parse_dir: bool = True

If the module will accept a directory as the source path for parsing. If this is True, a Path like Path("./some_files/") would be a valid target for parsing. Any handling of files in the directory will have to be handled by the module, not PEAT.

supported_models: list[str] = ['D30', 'F35', 'N60', 'T60', 'L90']

Device models this module supports or is known to work with.

ip_methods: list[IPMethod] = [IPMethod(name='GE Relay HTTP homepage', description='Verify GERelay by checking for strings in homepage.', identify_function=<bound method GERelay._verify_http of <class 'peat.modules.ge.ge_relay.GERelay'>>, reliability=7, protocol='http', transport='tcp', type='unicast_ip', default_port=80)]

Methods for identifying devices via IP or Ethernet.

parse_ge_html(text, key_value_pairs=False)[source]

HTML parsing for pages scraped from GE devices.

Originally based on code by Ryan Vrecenar in sel_http.py.read_html().

Parameters:
  • text (str) -- the raw HTML text to parse

  • key_value_pairs (bool) -- If a table should be parsed as key-value pairs. This mainly applies to the USBStats page, but it could apply to other pages as well.

Return type:

dict[str, list[dict]]

clean_text(data)[source]

Attempt to remove whitespace and new line characters.

Return type:

str

8.5.6. iDirect

8.5.6.1. iDirect Modem

iDirect X-series modems (X3, X5, X7).

Authors

  • Christopher Goes

class Idirect[source]

iDirect X-series modems (X3, X5, X7).

device_type: str = 'Modem'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'iDirect'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'iDirect, Inc.'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

module_aliases: list[str] = ['idirect-x3', 'idirect-x5', 'idirect-x7']

Alternative names for looking up the module, e.g. for the -d CLI option. Aliases that can be used to refer to the device via the PEAT’s Module API (peat.module_manager).

default_options: dict[str, Any] = {'idirect': {'pull_methods': ['ssh', 'ssl']}, 'ssh': {'creds': [('root', 'iDirect')], 'pass': '', 'user': ''}, 'telnet': {'creds': [], 'pass': '', 'user': ''}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

ip_methods: list[IPMethod] = [IPMethod(name='Idirect SSH login', description='Verify if the given protocol can connect to the iDirect modem.\n\n        !! NOTE: Telnet is not yet implemented !!\n\n        Args:\n            dev: Device specific data and configuration.\n            protocol: Which protocol to use (telnet or ssh).\n\n        Returns:\n            Check if a device is a iDirect modem by logging in.', identify_function=functools.partial(<bound method Idirect._verify_protocol of <class 'peat.modules.idirect.idirect.Idirect'>>, protocol='ssh'), reliability=6, protocol='ssh', transport='tcp', type='unicast_ip', default_port=22), IPMethod(name='Idirect HTTPS SSL certificate', description='Verify a device is a iDirect modem via HTTPS SSL certificate inspection.', identify_function=<bound method Idirect._verify_https_ssl_certificate of <class 'peat.modules.idirect.idirect.Idirect'>>, reliability=9, protocol='https', transport='tcp', type='unicast_ip', default_port=443)]

Methods for identifying devices via IP or Ethernet.

class FalconOptParser[source]

Parse the contents of falcon.opt into a dict. This is usually located in /sysopt/config/sat_router/falcon.opt.

classmethod parse(to_parse)[source]

Parse raw data into a Python data structure, such as a dict or list.

Return type:

dict[str, dict[str, str]]

classmethod process(to_process, dev)[source]

Process parsed data into the device data model.

Return type:

None

8.5.7. Sandia

8.5.7.1. SCEPTRE Field Device

PEAT module for Sandia SCEPTRE virtual field devices.

Sandia SCEPTRE virtual field control devices (vFCDs), including the SCEPTRE Virtual RTU and SCEPTRE Virtual Relay.

SCEPTRE virtual relays and RTUs are running a FTP server, which is used to pull the device configuration XML file as well as the firmware binary image. The XML file is then parsed and the extracted information saved.

Development, testing, and demonstration of this module is possible without a running SCEPTRE instance by setting up a local FTP server using the Python Twisted framework. To do so, run the following command in a terminal:

pip3 install Twisted
sudo -H $(which twistd) -n ftp --auth=anonymous -r examples/devices/sceptre/

Then, in another terminal, run peat:

peat pull -d sceptre -i 127.0.0.1 --assume-online -v -c ./examples/peat-config-sceptre-testing.yaml

That config file sets the following options:

  • ftp.port: 2121

  • ftp.user: anonymous

  • ftp.pass: anonymous

  • sceptre.ftp_testing: true

Listening services

  • FTP (TCP 21)

Data collected

  • Full device configuration file

  • Device firmware binary image

  • Device type (e.g. “Relay” or “RTU”)

  • Device name (e.g. “relay-louie”)

  • Cycle time

  • Logic

Authors

  • Christopher Abate

  • Christopher Goes

class SCEPTRE[source]

SCEPTRE Virtual RTUs and Virtual Relays.

device_type: str = 'RTU'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Sandia'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Sandia National Laboratories'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

model: str = 'SCEPTRE'

Device’s default model (if not known). Elasticsearch field: host.description.model.

brand: str = 'SCEPTRE'

Device brand. Elasticsearch field: host.description.brand.

filename_patterns: list[str] = ['*.xml', 'config.xml']

Patterns of files the module is capable of parsing, if any. Patterns are a literal name (*SET_ALL.TXT) or a Unix shell glob (*.rdb), are case-insensitive, and must start with a wildcard character (*). Globs can be anything accepted by glob.

default_options: dict[str, Any] = {'ftp': {'pass': 'sceptre', 'user': 'sceptre'}, 'sceptre': {'bennu_filename': 'bennu-field-deviced.firmware', 'ftp_testing': False}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

annotate_fields: dict[str, Any] = {'os.name': 'Ubuntu', 'os.vendor.id': 'Canonical', 'os.vendor.name': 'Canonical'}

Fields that will be annotated (populated) by default for most operations, such as scan pull, parse, etc. Examples include known OS versions or hardware architecture. These fields will populated ONLY IF they are already unset on the device being annotated. Format is the path to the field to populate, e.g. os.name, os.vendor.id, etc.

classmethod pull_config(dev)[source]

Retrieves the device configuration via FTP.

The SCEPTRE vFCDs use a single XML file for their configuration, called ‘config.xml’ by default.

Return type:

bool

classmethod parse_config(file, dev=None)[source]

Parse a SCEPTRE field device XML configuration file. :rtype: DeviceData

Note

Examples of configs can be found on the bennu GitHub, in addition to the ones in the PEAT repo.

ip_methods: list[IPMethod] = [IPMethod(name='SCEPTRE FTP', description='Verify it\'s a SCEPTRE Bennu device by logging in with FTP and\n        checking if any file named "bennu" is on the server.', identify_function=<bound method SCEPTRE._verify_ftp of <class 'peat.modules.sandia.sceptre_fcd.SCEPTRE'>>, reliability=9, protocol='ftp', transport='tcp', type='unicast_ip', default_port=21)]

Methods for identifying devices via IP or Ethernet.

8.5.8. Schneider Electric

8.5.8.1. Modicon M340 PLC

The Schneider Electric Modicon M340 PLC.

The Schneider does NOT have separate “logic” and “config” files, just one big blob that we call a “project file”. Therefore, the semantics of “pull/parse config/logic” are slightly skewed in this module.

Services

  • FTP (TCP 21)

  • DHCP client (UDP 68)

  • TFTP (UDP 69)

  • HTTP (TCP 80)

  • HTTPS (TCP 443)

  • SNMP (UDP 161)

  • Modbus/TCP (TCP 502)

  • Teredo (UDP 3544)

Reliable services for scanning: FTP, HTTP/S, SNMP, Modbus/TCP

Authors

  • Christopher Goes

  • Patrica Schulz

  • Mark Woodard

class M340[source]

Schneider Modicon M340 PLC.

device_type: str = 'PLC'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Schneider'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Schneider Electric'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'Modicon'

Device brand. Elasticsearch field: host.description.brand.

model: str = 'M340'

Device’s default model (if not known). Elasticsearch field: host.description.model.

filename_patterns: list[str] = ['*.apx']

Patterns of files the module is capable of parsing, if any. Patterns are a literal name (*SET_ALL.TXT) or a Unix shell glob (*.rdb), are case-insensitive, and must start with a wildcard character (*). Globs can be anything accepted by glob.

default_options: dict[str, Any] = {'ftp': {'creds': [['loader', 'fwdownload']]}, 'm340': {'generate_openplc_project': None, 'use_network_for_config': True}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

annotate_fields: dict[str, Any] = {'os.name': 'VxWorks', 'os.vendor.id': 'WindRiver', 'os.vendor.name': 'Wind River Systems', 'os.version': '7'}

Fields that will be annotated (populated) by default for most operations, such as scan pull, parse, etc. Examples include known OS versions or hardware architecture. These fields will populated ONLY IF they are already unset on the device being annotated. Format is the path to the field to populate, e.g. os.name, os.vendor.id, etc.

classmethod pull_apx_file(ip)[source]

Pull the project file blob from a device.

This is equivalent to the “Station.apx” file extracted from a .STA archive saved using Unity.

Parameters:

ip (str) -- IPv4 address of device to pull from

Return type:

bytes

Returns:

The project file pulled from the device

classmethod parse_logic_to_text(logic_blocks)[source]

Parse device project file blob into a printable string for debugging.

Return type:

str

classmethod dump_project(logic_blob)[source]

Raw dump of blocks for debugging purposes.

Return type:

str

classmethod generate_openplc_project(dev)[source]

Generate a directory that can be opened with the OpenPLC editor.

Return type:

Path | None

<project-name>_openplc_project/

beremiz.xml plc.xml

ip_methods: list[IPMethod] = [IPMethod(name='M340 FTP', description='Check if a device is a M340 via FTP by logging in with default FTP\n        credentials and looking for files with particular names.', identify_function=<bound method M340._verify_ftp of <class 'peat.modules.schneider.m340.m340.M340'>>, reliability=6, protocol='ftp', transport='tcp', type='unicast_ip', default_port=21), IPMethod(name='M340 SNMP sysDescr', description='Check if a device is a M340 by querying SNMP for OID\n        ``1.3.6.1.2.1.1.1.0`` (``sysDescr``) and comparing it\n        against specific strings.', identify_function=<bound method M340._verify_snmp of <class 'peat.modules.schneider.m340.m340.M340'>>, reliability=7, protocol='snmp', transport='udp', type='unicast_ip', default_port=161), IPMethod(name='M340 Modbus/TCP', description='Check if a device is a M340 via Modbus/TCP by sending a :term:`UMAS`\n        identification packet to the device.', identify_function=<bound method M340._verify_modbus of <class 'peat.modules.schneider.m340.m340.M340'>>, reliability=8, protocol='modbus_tcp', transport='tcp', type='unicast_ip', default_port=502)]

Methods for identifying devices via IP or Ethernet.

8.5.8.1.1. M340 parsing

Parsing and extraction of configuration and logic for Schneider Modicon PLCs.

Authors

  • Mark Woodard

  • Christopher Goes

extract_logic_blocks(logic_blob)[source]

Extracts the process logic blocks from a Schneider M340 APX project file blob.

Parameters:

logic_blob (bytes) -- Logic blob pulled from the PLC

Return type:

dict[str, dict | list]

Returns:

Logic blocks extracted from the blob

strip_xml(data)[source]

Strips unnecessary XML data and extracts field elements from a data blob.

Return type:

list[bytes]

chunkify(blob)[source]

Processes a raw APX file and extracts distinct chunks.

There are four types of chunks: APX, 0000, 0100, 0200

APX: Fixed length Project header 0000: Fixed length Unknown purpose 0100: Fixed length Unknown purpose 0200: Variable length Contains data blocks (ST logic, module configs, etc.)

Parameters:

blob (bytes) -- The blob to decompress portions of

Return type:

dict[int, dict[str, int | str | bytes]]

Returns:

The extracted chunks

extract_chunk(data)[source]

Extract and decompress data from a chunk.

Return type:

bytes

extract_variables(chunk)[source]

Extracts variables and their metadata from a chunk.

Return type:

dict[str, dict[str, int | str | bytes]]

extract_initial_values(chunk)[source]

Extracts variable initial values from a chunk.

Two types of variables: coils 01, registers 00. Each entry can set consecutive variables in memory starting from the memory location.

Return type:

dict[int, int]

set_init_vals(var, init)[source]

Sets the initial values of extracted variables using extracted initial values.

Parameters:
  • var (dict[str, dict]) -- Variables to annotate with initial values

  • init (dict) -- Extracted initial values to annotate variables with

Return type:

dict[str, dict]

Returns:

Variables updated to include initial values

parse_config_to_dict(config_blob)[source]

Extracts configuration information from a M340 APX project file blob.

This will return less information than pull_config does, as it does not include information pulled directly over the network. Essentially, this function is the components of pull_config that do not involve network access.

Parameters:

config_blob (bytes) -- Schneider project file

Return type:

dict[str, str | dict]

Returns:

A dictionary containing information parsed from the project file

add_logic_to_tc6(logic_blocks, tc6, sceptre=False)[source]

Adds the M340-specific logic portions to a TC6 instance.

Currently, this is the variables and Structured Text.

Parameters:
  • logic_blocks (dict) -- Logic and variables

  • tc6 (TC6) -- TC6 class instance (NOTE: this will be modified!)

  • sceptre (bool) -- Make the resulting logic compatible with OpenPLC/SCEPTRE PLC

Return type:

None

get_type_string(var_type, sceptre)[source]
Return type:

str

8.5.8.1.2. M340 pulling

Methods to pull from Schneider Modicon M340 PLCs.

This includes process logic, configuration, and firmware.

Authors

  • Christopher Goes

  • Mark Woodard

  • Patrica Schulz

pull_network_config(ip, timeout=1.0, snmp_community='public')[source]

Pull configuration information using SNMP, FTP, and Modbus/TCP.

Parameters:
  • ip (str) -- IP address of the device

  • timeout (float) -- Time to wait for a service to respond, in seconds

  • snmp_community (str) -- SNMPv1 Community String to use

Return type:

dict

Returns:

The device configuration information

8.5.8.1.3. UMAS protocol

Scapy packets for the Schneider-proprietary UMAS protocol.

This protocol is used by the Schneider Modicon M340 PLC.

Authors

  • Patrica Schulz

  • Christopher Goes

class Modbus(_pkt, /, *, transactionID=1, protocolID=0, length=None, unitID=0, functionCode=90, data=b'')[source]
fields_desc: List[AnyField] = [<ShortField (Modbus).transactionID>, <ShortField (Modbus).protocolID>, <FieldLenField (Modbus).length>, <XByteField (Modbus).unitID>, <ByteField (Modbus).functionCode>, <StrLenField (Modbus).data>]
aliastypes = [<class 'peat.modules.schneider.m340.umas_packets.Modbus'>, <class 'scapy.packet.Packet'>]
class UMASQuery(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<XByteField (UMASQuery).connectionCode>, <XByteField (UMASQuery).commandCode>, <ShortField (UMASQuery).num>, <LEShortField (UMASQuery).data>]
aliastypes = [<class 'peat.modules.schneider.m340.umas_packets.UMASQuery'>, <class 'scapy.packet.Packet'>]
class UMASResponse(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<XByteField (UMASResponse).connectionCode>, <scapy.fields.MayEnd object>, <ShortField (UMASResponse).num>, <LEShortField (UMASResponse).dataLen>]
aliastypes = [<class 'peat.modules.schneider.m340.umas_packets.UMASResponse'>, <class 'scapy.packet.Packet'>]
class UMASPoll(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<XByteField (UMASPoll).connectionCode>, <XByteField (UMASPoll).commandCode>]
aliastypes = [<class 'peat.modules.schneider.m340.umas_packets.UMASPoll'>, <class 'scapy.packet.Packet'>]
class UMASConnection(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<XByteField (UMASConnection).connectionCode>, <XByteField (UMASConnection).commandCode>, <LEShortField (UMASConnection).unknown1>, <LEShortField (UMASConnection).unknown2>, <FieldLenField (UMASConnection).length>, <StrLenField (UMASConnection).name>]
aliastypes = [<class 'peat.modules.schneider.m340.umas_packets.UMASConnection'>, <class 'scapy.packet.Packet'>]
class UMASConnectionResponse(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)[source]
fields_desc: List[AnyField] = [<XByteField (UMASConnectionResponse).connectionCode>, <XByteField (UMASConnectionResponse).commandCode>, <XByteField (UMASConnectionResponse).newConnectionCode>]
aliastypes = [<class 'peat.modules.schneider.m340.umas_packets.UMASConnectionResponse'>, <class 'scapy.packet.Packet'>]
start_pull_packet(cid)[source]
Return type:

Modbus

pull_packet(cid, seq)[source]
Return type:

Modbus

stop_pull_packet(cid, seq)[source]
Return type:

Modbus

poll_packet(cid)[source]
Return type:

Modbus

connect_packet()[source]
Return type:

Modbus

send_umas_packet(sock, packet, response_class, tracker=None)[source]

Send a packet and receive the response from a UMAS device.

Parameters:
  • sock (socket) -- The TCP socket to use

  • packet (Packet | bytes) -- The packet to send

  • response_class (type[Packet]) -- The Packet subclass to process the response as

  • tracker (list | None) -- tracks all packets sent, including raw payloads and metadata

Return type:

Packet

Returns:

The data field of the response

update_tracker(direction, packet, payload, sock, tracker)[source]

8.5.8.2. Sage RTU

Schneider Electric Sage RTUs.

AMD x86 CPU. Default creds are Admin/Telvent1! for most authenticated services.

TCP/UDP Open Ports

  • FTP (TCP 21)

  • SSH (TCP 22)

  • Telnet (TCP 23)

  • HTTP (TCP 80)

  • HTTPS (TCP 443)

  • L2TP (UDP 1701)

Authors

  • Aaron Lombrozo

  • Aidan Kollar

  • Christopher Goes

  • James Gallagher

  • Ryan Vrecenar

class Sage[source]

Schneider Electric Sage RTU.

device_type: str = 'RTU'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Schneider'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Schneider Electric'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'Sage'

Device brand. Elasticsearch field: host.description.brand.

model: str = ''

Device’s default model (if not known). Elasticsearch field: host.description.model.

filename_patterns: list[str] = ['*_Config*.tar.gz', '*_config*.tar.gz', '*_Firmware*.tar.gz', '*_firmware*.tar.gz', 'rtusetup.xml', 'ACCESS.XML']

Patterns of files the module is capable of parsing, if any. Patterns are a literal name (*SET_ALL.TXT) or a Unix shell glob (*.rdb), are case-insensitive, and must start with a wildcard character (*). Globs can be anything accepted by glob.

can_parse_dir: bool = True

If the module will accept a directory as the source path for parsing. If this is True, a Path like Path("./some_files/") would be a valid target for parsing. Any handling of files in the directory will have to be handled by the module, not PEAT.

annotate_fields: dict[str, Any] = {'architecture': 'x86', 'os.name': 'VxWorks', 'os.vendor.id': 'WindRiver', 'os.vendor.name': 'Wind River Systems'}

Fields that will be annotated (populated) by default for most operations, such as scan pull, parse, etc. Examples include known OS versions or hardware architecture. These fields will populated ONLY IF they are already unset on the device being annotated. Format is the path to the field to populate, e.g. os.name, os.vendor.id, etc.

default_options: dict[str, Any] = {'ftp': {'pass': 'Telvent1!', 'user': 'Admin'}, 'sage': {'ftp_filesystems': ['/ata0a', '/ramDrv'], 'pull_methods': ['telnet', 'ftp', 'ssl', 'ssh', 'sftp', 'http', 'https'], 'ssh_filepaths': ['/ata0a/scripts/vxworks_start.scp', '/ata0a/scripts/startup.scp']}, 'ssh': {'disabled_algorithms': {'pubkeys': ['rsa-sha2-512', 'rsa-sha2-256']}, 'key_filename': '', 'look_for_keys': False, 'pass': 'Telvent1!', 'passphrase': 'Telvent1!', 'timeout': 10.0, 'user': 'Admin'}, 'telnet': {'pass': 'Telvent1!', 'timeout': 10.0, 'user': 'Admin'}, 'web': {'pass': 'Telvent1!', 'timeout': 30.0, 'user': 'Admin'}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

ip_methods: list[IPMethod] = [IPMethod(name='Sage FTP login', description="Verify via FTP login and check of current directory ('pwd').", identify_function=<bound method Sage._verify_ftp of <class 'peat.modules.schneider.sage.sage.Sage'>>, reliability=7, protocol='ftp', transport='tcp', type='unicast_ip', default_port=21), IPMethod(name='Sage Telnet login', description='Verify if the given protocol can connect to Sage (ssh/telnet for now).\n\n        Args:\n            dev (DeviceData): Device specific data and configuration.\n            protocol (str): Which protocol to use (telnet/ssh).\n\n        Returns:\n            bool: Check if a device is a Sage RTU by logging in.', identify_function=functools.partial(<bound method Sage._verify_protocol of <class 'peat.modules.schneider.sage.sage.Sage'>>, protocol='telnet'), reliability=6, protocol='telnet', transport='tcp', type='unicast_ip', default_port=23), IPMethod(name='Sage SSH login', description='Verify if the given protocol can connect to Sage (ssh/telnet for now).\n\n        Args:\n            dev (DeviceData): Device specific data and configuration.\n            protocol (str): Which protocol to use (telnet/ssh).\n\n        Returns:\n            bool: Check if a device is a Sage RTU by logging in.', identify_function=functools.partial(<bound method Sage._verify_protocol of <class 'peat.modules.schneider.sage.sage.Sage'>>, protocol='ssh'), reliability=6, protocol='ssh', transport='tcp', type='unicast_ip', default_port=22), IPMethod(name='Sage SSL certificate', description='Verify a device is a Sage RTU via SSL certificate inspection.', identify_function=<bound method Sage._verify_https_ssl_certificate of <class 'peat.modules.schneider.sage.sage.Sage'>>, reliability=9, protocol='https', transport='tcp', type='unicast_ip', default_port=443), IPMethod(name='Sage HTTP page scrape', description='Verify a device is a Sage RTU via parsing of the web interface.', identify_function=functools.partial(<bound method Sage._verify_http of <class 'peat.modules.schneider.sage.sage.Sage'>>, protocol='http'), reliability=8, protocol='http', transport='tcp', type='unicast_ip', default_port=80), IPMethod(name='Sage HTTPS page scrape', description='Verify a device is a Sage RTU via parsing of the web interface.', identify_function=functools.partial(<bound method Sage._verify_http of <class 'peat.modules.schneider.sage.sage.Sage'>>, protocol='https'), reliability=8, protocol='https', transport='tcp', type='unicast_ip', default_port=443)]

Methods for identifying devices via IP or Ethernet.

8.5.8.2.1. Sage parsing

Parsing of files from Sage RTUs.

Works with 2400 and 3030M models, and should work with others.

Authors

  • Christopher Goes

  • Ryan Vrecenar

parse_access_xml(f_handle, device_info)[source]

Parse users and access permissions out of ACCESS.XML.

Return type:

None

process_access_xml(dev, info)[source]

Process data extracted from ACCESS.XML into the PEAT device data model.

Return type:

None

parse_rtu_setup(f_handle, device_info)[source]

Parse XML tree from rtusetup.xml.

Parameters:
  • f_handle (IO[bytes]) -- file handler to read data from and parse xml

  • device_info (dict) -- dictionary of keys and values for target device

Return type:

None

process_rtusetup_info(dev, info)[source]

Process data extracted from rtusetup.xml into the PEAT device data model.

Return type:

None

parse_tte(f_handle, device_info)[source]

Parse XML tree from tte.xml to find time trigger ethernet settings.

Parameters:
  • f_handle (IO[bytes]) -- file handler to read data from and parse xml

  • device_info (dict) -- dictionary of keys and values for target device

Return type:

None

parse_time_elements(_time, device_info)[source]

From TIME branch in ElementTree in time, parse out version and date of update for OS.

Return type:

None

parse_gblfrz_elements(gblfrz, device_info)[source]

From GBLFRZ branch in ElementTree in time, parse out version and date of update for OS.

Return type:

None

parse_gps_elements(gps, device_info)[source]

From GPS branch in ElementTree in time, parse out version and date of update for OS.

Return type:

None

parse_time(f_handle, device_info)[source]

Parse xml tree from time.xml to find time, gblfrz, and gps settings.

Parameters:
  • f_handle (IO[bytes]) -- file handler to read data from and parse xml

  • device_info (dict) -- dictionary of keys and values for target device

Return type:

None

bootline_parse(bootline)[source]

Parse useful information out of vxworks bootline string, which is embedded in bootline.xml.

Return type:

dict[str, str]

parse_bootline_xml(f_handle, device_info)[source]

Parse XML tree from bootline.xml to find boot options including default IP address, location of operating system, hostname, username and password.

Parameters:
  • f_handle (IO[bytes]) -- file handler to read data from and parse xml

  • device_info (dict) -- dictionary of keys and values for target device

Return type:

None

process_bootline(dev, data)[source]

Process data extracted from VxWorks bootline into the PEAT device data model.

Return type:

None

parse_ether(f_handle, device_info)[source]

Parse XML tree from ethernet.xml to find route settings and ethernet port settings.

Parameters:
  • f_handle (IO[bytes]) -- file handler to read data from and parse xml

  • device_info (dict) -- dictionary of keys and values for target device

Return type:

None

parse_firewall_rule(firewall_line, firewall_rule)[source]
Return type:

None

parse_firewall(f_handle, device_info)[source]
Return type:

None

parse_isagraf(f_handle, device_info)[source]
Return type:

None

parse_startup_script(f_handle, device_info)[source]
Return type:

None

parse_vxworks_script(f_handle, device_info)[source]
Return type:

None

parse_private_key(f_handle, device_info)[source]
Return type:

None

parse_certificate(f_handle, device_info)[source]
Return type:

None

parse_ike_config(f_handle, device_info)[source]
Return type:

None

parse_ike_ca(f_handle, device_info)[source]
Return type:

None

parse_ike_privkey(f_handle, device_info)[source]
Return type:

None

parse_ike_cert(f_handle, device_info)[source]
Return type:

None

parse_port_comm_settings(proto, port_settings)[source]
Return type:

None

parse_port_proto_settings(proto_name, proto, port_settings)[source]
Return type:

None

parse_port(f_handle, device_info)[source]
Return type:

None

parse_features(f_handle, device_info)[source]
Return type:

None

parse_3835(f_handle, device_info)[source]
Return type:

None

parse_dcana(f_handle, device_info)[source]
Return type:

None

parse_calc(f_handle, device_info)[source]
Return type:

None

parse_rtu_sts(f_handle, device_info)[source]
Return type:

None

parse_timing(f_handle, device_info)[source]
Return type:

None

parse_sfb(f_handle, device_info)[source]
Return type:

None

parse_acc_rate(f_handle, device_info)[source]
Return type:

None

parse_alarming(f_handle, device_info)[source]
Return type:

None

parse_alarms(f_handle, device_info)[source]
Return type:

None

parse_almdev(f_handle, device_info)[source]
Return type:

None

parse_bbdi(f_handle, device_info)[source]
Return type:

None

parse_cbc(f_handle, device_info)[source]
Return type:

None

parse_ast(f_handle, device_info)[source]
Return type:

None

parse_anunctor(f_handle, device_info)[source]
Return type:

None

parse_com_assignments(f_handle, device_info)[source]
Return type:

None

parse_sprcodes(f_handle, device_info)[source]
Return type:

None

parse_leds(f_handle, device_info)[source]
Return type:

None

parse_relays(f_handle, device_info)[source]
Return type:

None

parse_nrgcalc(f_handle, device_info)[source]
Return type:

None

parse_bbrelay(f_handle, device_info)[source]
Return type:

None

parse_btstconf(f_handle, device_info)[source]
Return type:

None

parse_aci(f_handle, device_info)[source]
Return type:

None

parse_aci1250(f_handle, device_info)[source]
Return type:

None

parse_userlog_settings_xml(f_handle, device_info)[source]

Parse XML tree from userlog.xml to find logging settings.

Parameters:
  • f_handle (IO[bytes]) -- file handler to read data from and parse xml

  • device_info (dict) -- dictionary of keys and values for target device

Return type:

None

parse_cmdlog_settings_xml(f_handle, device_info)[source]
Return type:

None

parse_soelog_settings_xml(f_handle, device_info)[source]
Return type:

None

parse_ipcom_syslog(f_handle, device_info)[source]

Parse syslog and syslog.0 files from ipcom/ directory on the Sage.

Return type:

None

process_ipcom_syslog(dev, events)[source]

Process parsed ipcom/syslog events into the PEAT device data model.

Return type:

None

parse_logfiles(f_handle, device_info)[source]

Parse the contents of SYSLOG.LOG, USERLOG.LOG, and soelog.txt.

This creates a list of dicts with structured event information.

Known locations of logfiles: :rtype: None

  • /ata0a/Webfiles/LOGS/SYSLOG.LOG

  • /ata0a/Webfiles/LOGS/USERLOG.LOG

  • /ramDrv/soelog.txt

process_logfile_events(dev, events)[source]
Return type:

None

parse_file(path, f_handle, device_info)[source]

Parse a Sage file using the appropriate parsing function, and update the device_info dict with extracted data from that file.

Return type:

bool

Returns:

Bool indicating if a parsing function was found for the file.

8.5.8.2.2. Sage Telnet

Telnet functions for Schneider Electric Sage RTUs.

Authors

  • Aaron Lombrozo

  • Christopher Goes

class SageTelnet(ip, port=23, timeout=5, dev=None)[source]
ENCODING: str = 'ascii'
PRE_WRITE_SLEEP: float = 0.0
TYPE: str = 'telnet'
login(user='Admin', passwd='Telvent1!')[source]

Login to the device’s telnet interface.

Return type:

bool

disconnect()[source]

Cleanly disconnect from the device.

Return type:

None

8.5.8.2.3. Sage SSH

SSH functions for Schneider Electric Sage RTUs.

This module provides an interface for reading and parsing SSH commands from Sage RTUs.

Note

The key_filename kwarg used in the SSH connection should be the absolute path to the desired key. This key should be in the OPENSSH key format required by paramiko. The conversion from the default format can be done with PuTTYgen, with an example given in the section SSH Key Generation of the documentation referenced below.

Examples

from peat.modules.schneider.sage.sage_ssh import SageSSH

# These kwargs are the common kwargs for the Sage 3030M device.
ssh_kwargs = {
    "passphrase": "Telvent1!",
    "key_filename": "Admin.ppk",
    "look_for_keys": False,
    "disabled_algorithms": {"pubkeys": ["rsa-sha2-512", "rsa-sha2-256"]},
}

sage_con = SageSSH(
    ip="127.0.0.1",
    port=22,
    timeout=5.0,
    username="Admin",
    password="Telvent1!",
    kwargs=ssh_kwargs,
)
sage_con.login()

sage_con.write("ls")
s = sage_con.read()
sage_con.disconnect()

References

To SSH directly:

ssh -oHostKeyAlgorithms=+ssh-rsa Admin@192.0.2.2

Authors

  • Christopher Goes

  • James Gallagher

  • Kevin Cox

class SageSSH(ip, port=22, timeout=5.0, username=None, password=None, dev=None, kwargs=None)[source]

SSH communication for Sage RTUs.

This class maintains the connection state and parsed information from running common commands on the Sage device. See the parent SSH class for more information on expected input when instantiating the SageSSH class.

Parameters:
  • ip (str) -- Which IP address to use.

  • port (int, optional) -- Which port number to use - default is 22.

  • timeout (float, optional) -- Seconds to wait for the string before timing out.

  • dev (DeviceData, optional) -- Device specific data to use in configuration.

  • kwargs (dict[str, any], optional) -- Additional kwargs for Paramiko ssh client.

ENCODING: str = 'ascii'
PRE_WRITE_SLEEP: float = 0.0
POST_WRITE_SLEEP: float = 0.0
TYPE: str = 'ssh'
login(user='Admin', passwd='Telvent1!')[source]

Login to the device’s ssh interface.

Use the given login credentials to log in to the device via ssh. This connection is primarily maintained by the paramiko library.

Parameters:
  • user (str | None) -- Username to login with.

  • passwd (str | None) -- Password to use to login with. This password is also used for the key passphrase if the ssh key is input.

Return type:

bool

disconnect()[source]

Disconnect from the device.

Cleanly disconnect from the device. This function will close the paramiko connection and save all read commands.

Return type:

None

8.5.8.2.4. Sage commands

Sage-specific command query and parsing.

Useful doc for understanding some command outputs:

Authors

  • Aaron Lombrozo

  • Christopher Goes

  • Kevin Cox

class SageCommands[source]

Sage-specific device commands and parsing.

This class maintains the Sage device commands and command parsing. It’s intended to be inherited, and not meant to be used directly.

TYPE: str = ''
query(command)[source]

Query the Sage device with the given command.

This includes a write, followed by a read. The prompt for this interface is ->.

Parameters:

command (str) -- The command to run.

Returns:

The Sage device response.

Return type:

str

cmd_query(command)[source]

Query the Sage device in the ‘cmd’ interface state.

The prompt for this interface is [vxWorks]#. This state is entered into by running the cmd command.

Parameters:

command (str) -- The command to run.

Returns:

The sage device response.

Return type:

str

get_tasks()[source]

Get tasks by running checkStack command, save the output to a file, and parse the output and return as a dict.

Return type:

dict[str, Any]

query_update_task_entry(tasks)[source]

Query appropriate tasks to get the full entry point alias. The response is added to the input tasks dict (modified in-place).

Parameters:

tasks (dict[str, Any]) -- Dictionary containing tasks.

Return type:

None

query_read_memory_from_tasks(tasks)[source]

Query and update the read memory from the given task mapping. The memory read duration is added to understand length of time for response.

The tasks dict is modified in-place.

Parameters:

tasks (dict[str, Any]) -- Dictionary containing tasks.

Return type:

None

parse_ti_response(ti_response, tid_dict)[source]

Parse the Sage device ti command response.

Parse result of ti query to update the ENTRY point, or mark for later deletion if no longer found.

Parameters:
  • ti_response (str) -- Sage device ti command response

  • tid_dict (dict[str, Any]) -- Sage device TID mappings

Return type:

bool

Returns:

True if the task was found, False if not.

parse_check_stack_response(check_stack_response)[source]

Parse the checkStack command output to create the initial task dictionary.

Parameters:

check_stack_response (str) -- Sage output for checkStack command

Returns:

Parsed tasks from checkStack

Return type:

tasks

mark_duplicate_tasks(tasks)[source]

Mark the tasks as duplicate if duplicates exist.

Some commands are duplicates. No need to re-read the same memory. Mark duplicates with TID references to skip and fill in later.

This mutates the tasks dict in-place.

Parameters:

tasks (dict[str, Any]) -- Parsed tasks from checkStack

Return type:

None

find_duplicate_memory_reads(tasks)[source]

Fill in duplicate (shared) memory queries that were previously skipped.

This mutates the tasks dict in-place.

Parameters:

tasks (dict[str, Any]) -- Parsed tasks from checkStack

Return type:

None

convert_memory_reads_to_hex_strings(tasks)[source]

Convert memory reads into hex strings and bytes.

This adds values to tasks in-place, and does NOT make a copy.

Parameters:

tasks (dict[str, Any]) -- Parsed tasks from checkStack

Return type:

None

convert_memory_read_to_hex_string(d_response)[source]

Convert the stored memory reads to hex string.

Parameters:

d_response (str) -- Stored memory read.

Returns:

Memory reads converted to hex string.

Return type:

str

save_memory_reads_to_disk(dev, tasks)[source]

Save the stored task memory reads to files on disk.

There are several formats each read is saved to, in separate sub-directories under the memory_reads/ directory.

These are:

  • ascii/*.txt : ASCII strings from the hexdump output

  • hexdump/*.txt : Hexdump-format output from VxWorks “ti” command

  • hex/*.hex: Lowercase Hexadecimal string

  • binary/*.bin: binary format. This is the hex values

    converted to raw binary data and written to a file.

Parameters:
  • dev (DeviceData) -- DeviceData object to use for saving

  • tasks (dict[str, Any]) -- Parsed tasks from checkStack

Return type:

None

parse_vxworks_version(data)[source]

Parse the output of the version command in the VxWorks “C” interpreter, and extract the version of VxWorks.

Extracted information:

  • vxworks_version

  • wind_version

  • timestamp

  • raw_bootline

  • ata

  • host

  • ethernet_ip

  • ethernet_subnet_mask

  • host_ip

  • gateway_ip

  • flags

  • target_name

  • other

Parameters:

data (str) -- Command output to parse

Return type:

dict[str, str]

Returns:

Extracted information as a dictionary, or an empty dict if the parse failed.

process_vxworks_version(data, dev)[source]

Process data from parsed output of VxWorks version command into the PEAT device data model.

Parameters:

data (dict[str, str]) -- version data parsed by parse_vxworks_version()

Return type:

None

parse_sysvar_list(data)[source]

Parse output of sysvar list command on the Sage.

Return type:

dict[str, str]

process_sysvar_list(variables, dev)[source]

Process output of sysvar list command on the Sage.

Return type:

None

parse_user_list(data)[source]

Parse output of user list command on the Sage.

Return type:

list[dict[str, str]]

process_user_list(users, dev)[source]

Process output of user list command on the Sage.

Return type:

None

parse_ipf(data)[source]
Return type:

dict

8.5.9. Schweitzer Engineering Laboratories (SEL)

8.5.9.1. SEL Relays

Core functionality for interrogating SEL relays.

Supported relay models (not an exhaustive list)

  • 300G (Generator Relay)

  • 311C (Transmission Protection System)

  • 311L (Line Current Differential Protection and Automation System)

  • 351 (Protection System)

  • 351S (Protection Relay)

  • 387 (Current Differential and Overcurrent Relay)

  • 411L (Advanced Line Differential Protection, Automation, and Control System)

  • 451 (Protection Automation Control)

  • 487E (Transformer Protection Relay)

  • 587Z (High-Impedance Differential Relay)

  • 700G (Generator Protection Relay)

  • 710 (Motor Protection Relay)

  • 751 (Feeder Protection Relay)

  • 2032 (Communications Processor)

  • 2411 (Programmable Automation Controller)

Note

700G FTP server port check will cause errno 11, “Resource temporarily unavailable”. We fix this by resetting the connection after an ack, which is exactly what nmap does. While it’s not as “kind”, it’s probably less likely to trigger an issue than hitting a device’s application code and exiting in a non-standard way for the protocol.

Note

PEAT is explicitly NOT checking FTP during identify.

  • The common case is that relays are configured with Telnet+other things It is highly unusual for a relay to have FTP enabled but not Telnet.

  • FTP requires login - Many other device types listen on FTP (network load) - login bans after 3-5 attempts by default - we’re essentially brute forcing the device

  • FTP login triggers a software alarm

Note

On most relays the FTP password is set by default to the Level 2 password. If the L2 password changes, so does the FTP password, unless configured otherwise.

Warning

Software alarms will trigger the alarm contact on the relay! Software alarms are generated when elevating to level 2 in Telnet/Serial (“2ac”), logging into FTP, and possibly elevating to 2ac in the web interface (untested).

Listening network services (services available vary by device)

  • FTP (TCP 21) (Not enabled by default)

  • Telnet (TCP 23) (usually relays)

  • HTTP (TCP 80) (some devices)

  • HTTPS (TCP 443) (switches, some other devices)

  • SNMP (UDP 161) (switches, some other devices)

Known Tested FIDs. Not exhaustive, PEAT has been run on many others not documented here. Also, not all functionality is working or has been tested for these FIDs. Some may be parsing only, others may only be scan, etc.

  • SEL-311C-2-R508-V0-Z104101-D20150219

  • SEL-311L-7-R502-V0-Z106006-D20141106

  • SEL-351-5-R510-V0-Z103103-D20110429

  • SEL-351S-7-R516-V2-Z106105-D20190111

  • SEL-411L-1-R124-V0-Z015003-D20190130

  • SEL-451-5-R322-V0-Z025013-D20180630

  • SEL-487E-3-R317-V1-Z110102-D20190211

  • SEL-700G-R200-V0-Z006003-D20180629

  • SEL-710-R411-V0-Z007004-D20170623

  • SEL-751-R201-V1-Z007003-D20180921

  • SEL-751-R300-V3-Z008004-D20210104

  • SEL-2032-R115-V1-Z003001-D20151028

Authors

  • Christopher Abate

  • Christopher Goes

  • George Thompson

  • Jordan Henry

  • Rachel Glockenmeier

  • Taegan Williams

class SELRelay[source]

Shared functionality for interrogating SEL relays.

device_type: str = 'Relay'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'SEL'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Schweitzer Engineering Laboratories'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'SEL'

Device brand. Elasticsearch field: host.description.brand.

filename_patterns: list[str] = ['*.rdb', '*SET_ALL.TXT', '*SET_ALL.txt', '*set_all.txt', '*CFG.TXT', '*CFG.txt', '*cfg.txt', '*SER.TXT', '*SER.txt', '*ser.txt', '*HISTORY.TXT', '*HISTORY.txt', '*history.txt', '*.CID', '*.cid']

Patterns of files the module is capable of parsing, if any. Patterns are a literal name (*SET_ALL.TXT) or a Unix shell glob (*.rdb), are case-insensitive, and must start with a wildcard character (*). Globs can be anything accepted by glob.

supported_models: list[str] = ['300g', '311c', '311l', '351', '351s', '387', '411l', '451', '487e', '587', '587z', '700g', '710', '751', '2032', '2411']

Device models this module supports or is known to work with.

module_aliases: list[str] = ['sel-300g', 'sel-311c', 'sel-311l', 'sel-351', 'sel-351s', 'sel-387', 'sel-411l', 'sel-451', 'sel-487e', 'sel-587', 'sel-587z', 'sel-700g', 'sel-710', 'sel-751', 'sel-2032', 'sel-2411']

Alternative names for looking up the module, e.g. for the -d CLI option. Aliases that can be used to refer to the device via the PEAT’s Module API (peat.module_manager).

default_options: dict[str, Any] = {'ftp': {'creds': [('FTPUSER', 'TAIL'), ('2AC', 'TAIL'), ('FTP', 'TAIL'), ('anonymous', 'anonymous'), ('anonymous', 'anonymous@')], 'pass': '', 'pull_delay': 0.5, 'user': ''}, 'sel': {'allow_telnet_file_download': True, 'attempt_more_commands': False, 'creds': {'2ac': 'TAIL', 'acc': 'OTTER', 'bac': 'EDITH', 'cal': 'CLARKE'}, 'force_serial_pull': False, 'force_telnet_file_download': False, 'force_ymodem': False, 'handle_download_errors': True, 'never_download_dirs': [], 'never_download_files': [], 'old_ftp': False, 'only_download_dirs': [], 'only_download_files': [], 'pull_methods': ['http', 'ftp', 'telnet'], 'restart_after_push': False}, 'web': {'pass': '', 'user': ''}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

classmethod pull_more_commands(dev, comms)[source]

Attempt to get more info via terminal commands (get_sta, show_eth, show_status, etc.).

If any of the commands are successful, this returns true.

Return type:

bool

classmethod pull_telnet(dev, pull_files=True)[source]

Pull and parse configuration file(s) from the relay via Telnet.

Return type:

bool

classmethod pull_serial(dev)[source]

Pull and parse configuration file(s) from the relay via serial.

Return type:

bool

classmethod pull_ftp(dev)[source]

Pull and parse configuration file(s) from the relay via FTP.

Return type:

bool

classmethod pull_http(dev, protocol='http')[source]

Pull configuration and other data from the relay via HTTP.

Return type:

bool

classmethod restart_relay_telnet(dev)[source]

Execute a system restart on the SEL relay via Telnet.

Return type:

bool

ip_methods: list[IPMethod] = [IPMethod(name='SEL Relay Telnet login', description='Verify a device is a SEL Relay via Telnet commands.', identify_function=<bound method SELRelay._verify_telnet of <class 'peat.modules.sel.sel_relay.SELRelay'>>, reliability=6, protocol='telnet', transport='tcp', type='unicast_ip', default_port=23, port_function=functools.partial(<function sel_port_check>, protocol='telnet')), IPMethod(name='SEL Relay HTTP login', description='Verify a device is a SEL Relay via the HTTP web interface.', identify_function=functools.partial(<bound method SELRelay._verify_http of <class 'peat.modules.sel.sel_relay.SELRelay'>>, protocol='http'), reliability=8, protocol='http', transport='tcp', type='unicast_ip', default_port=80, port_function=functools.partial(<function sel_port_check>, protocol='http')), IPMethod(name='SEL Relay HTTPS login', description='Verify a device is a SEL Relay via the HTTP web interface.', identify_function=functools.partial(<bound method SELRelay._verify_http of <class 'peat.modules.sel.sel_relay.SELRelay'>>, protocol='https'), reliability=7, protocol='https', transport='tcp', type='unicast_ip', default_port=443, port_function=functools.partial(<function sel_port_check>, protocol='https'))]

Methods for identifying devices via IP or Ethernet.

serial_methods: list[SerialMethod] = [SerialMethod(name='SEL Relay serial', description='Verify a device is a SEL Relay via commands sent over a serial connection.', identify_function=<bound method SELRelay._verify_serial of <class 'peat.modules.sel.sel_relay.SELRelay'>>, reliability=5, type='direct')]

Methods for identifying devices via a serial connection.

sel_port_check(dev, protocol)[source]

lambda function set the TCP RST flag for SEL scanning.

Return type:

bool

8.5.9.2. SEL RTAC

Core functionality for interrogating a SEL Real-Time Automation Controller (RTAC).

Note

The 3530 RTAC (and possibly others ) disable ping (ICMP) responses by default. If using PEAT against an RTAC on another subnet or VLAN and the online check fails, try again with the --assume-online flag set. Another option is to force TCP SYN checks to be used with port 443.

SEL RTACs store data in a PostgreSQL database, but it can be exported as XML. Each “page” of the RTAC configuration is its own XML file. Instead of importing all of the XML files separately, they should be put into a tarball. PEAT will then decompress it to the file system, read each file into memory, sort based on file type, and finally delete the files from the file system. PEAT can then parse the configuration based on what’s in-memory.

To test Postgres (connecting to the “3530” database table): psql -h <ip> -U <username> -d 3530

Note: Nginx is the web server used by the SEL RTAC devices.

Listening network services

  • HTTP (TCP 80) (This redirects to 443 HTTPS)

  • HTTPS (TCP 443)

  • PostgreSQL (TCP 5432)

Supported models

  • SEL-3530

  • SEL-3530-4

  • SEL-3350

Authors

  • Ryan Vrecenar

  • Christopher Goes

class SELRTAC[source]

SEL Real-Time Automation Controller (RTAC).

device_type: str = 'RTAC'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'SEL'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Schweitzer Engineering Laboratories'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'SEL'

Device brand. Elasticsearch field: host.description.brand.

can_parse_dir: bool = True

If the module will accept a directory as the source path for parsing. If this is True, a Path like Path("./some_files/") would be a valid target for parsing. Any handling of files in the directory will have to be handled by the module, not PEAT.

filename_patterns: list[str] = ['*accesspointrouter*.tar.gz', '*AccessPoints*.tar.gz', 'devices.tar.gz', '*rtacexport*.tar.gz', '*rtac*.tar*', '*rtac*.tgz', 'Tag Processor.xml', 'SystemTags.xml', 'Main Controller.xml', 'Contact I_O.xml']

Patterns of files the module is capable of parsing, if any. Patterns are a literal name (*SET_ALL.TXT) or a Unix shell glob (*.rdb), are case-insensitive, and must start with a wildcard character (*). Globs can be anything accepted by glob.

supported_models: list[str] = ['3530', '3530-4', '3350']

Device models this module supports or is known to work with.

module_aliases: list[str] = ['sel-3530', 'sel-3530-4', 'sel-3350']

Alternative names for looking up the module, e.g. for the -d CLI option. Aliases that can be used to refer to the device via the PEAT’s Module API (peat.module_manager).

default_options: dict[str, Any] = {'postgres': {'pass': '', 'passwords': ['admin', 'rtac'], 'user': '', 'users': ['Admin', 'admin', 'administrator', 'rtac']}, 'sel': {'pull_http': True, 'pull_postgres': True, 'rtac_monitor_count': 3, 'rtac_monitor_enable': False, 'rtac_monitor_pause_for': 4.0}, 'web': {'pass': '', 'passwords': ['admin', 'rtac'], 'user': '', 'users': ['Admin', 'admin', 'administrator']}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

classmethod chk_transient_path(path)[source]

Check if path is defined in transient keys.

Return type:

bool

classmethod pull_config(dev)[source]

Pull and parse configuration from an RTAC.

Return type:

None

classmethod execute(cursor, statement, args=None)[source]

Wrapper code for database execute.

Return type:

None

ip_methods: list[IPMethod] = [IPMethod(name='SEL RTAC login HTTP', description='Verify a device is a SEL RTAC via the HTTP web interface.', identify_function=functools.partial(<bound method SELRTAC._verify_http of <class 'peat.modules.sel.sel_rtac.SELRTAC'>>, protocol='http'), reliability=6, protocol='http', transport='tcp', type='unicast_ip', default_port=80), IPMethod(name='SEL RTAC login HTTPS', description='Verify a device is a SEL RTAC via the HTTP web interface.', identify_function=functools.partial(<bound method SELRTAC._verify_http of <class 'peat.modules.sel.sel_rtac.SELRTAC'>>, protocol='https'), reliability=8, protocol='https', transport='tcp', type='unicast_ip', default_port=443), IPMethod(name='SEL RTAC SSL certificate', description='Verify a device is a SEL RTAC via inspection of the Common Name\n        attribute of a SSL certificate.', identify_function=<bound method SELRTAC._verify_https_ssl_certificate of <class 'peat.modules.sel.sel_rtac.SELRTAC'>>, reliability=9, protocol='https', transport='tcp', type='unicast_ip', default_port=443), IPMethod(name='SEL RTAC login PostgreSQL', description='Verify a device is a SEL RTAC via a PostgreSQL database connection.', identify_function=<bound method SELRTAC._verify_postgres of <class 'peat.modules.sel.sel_rtac.SELRTAC'>>, reliability=5, protocol='postgres', transport='tcp', type='unicast_ip', default_port=5432)]

Methods for identifying devices via IP or Ethernet.

8.5.9.3. SEL 3620 Gateway

SEL-3620 Ethernet Security Gateway.

Note: “This device accepts HTTPS connections only and will reject all HTTP connection requests. ” Authors

  • Amanda Gonzales

  • James Gallagher

class SEL3620[source]

SEL-3620 Ethernet Security Gateway.

device_type: str = 'Gateway'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'SEL'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Schweitzer Engineering Laboratories'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'SEL'

Device brand. Elasticsearch field: host.description.brand.

module_aliases: list[str] = ['sel-3620', '3620']

Alternative names for looking up the module, e.g. for the -d CLI option. Aliases that can be used to refer to the device via the PEAT’s Module API (peat.module_manager).

default_options: dict[str, Any] = {'web': {'pass': '', 'user': '', 'users': ['admin', 'Admin', 'administrator']}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

ip_methods: list[IPMethod] = [IPMethod(name='sel_3620_web_fingerprint', description='Verify a device is a SEL-3620 Gateway via the HTTPS web interface.', identify_function=<bound method SEL3620._verify_http of <class 'peat.modules.sel.sel_3620.SEL3620'>>, reliability=6, protocol='https', transport='tcp', type='unicast_ip', default_port=443)]

Methods for identifying devices via IP or Ethernet.

8.5.9.4. Internal APIs

8.5.9.4.1. SELHTTP

Retrieve and extract data via HTTP from SEL devices.

Authors

  • Aidan Kollar

  • Amanda Gonzales

  • Christopher Goes

  • Idaho National Lab

  • Jason Hall

  • Jessica Robinson

  • Ryan Vrecenar

class SELHTTP(*args, **kwargs)[source]

HTTP scraper for SEL devices.

Models supported: 3530 RTAC and 351, 351S and 451 relays.

disconnect()[source]
Return type:

None

login(user='ACC', passwd='OTTER', protocol='http')[source]

Attempt to login to a relay’s HTTP server and get a session.

If the login succeeds, the credentials are stored in the successful_creds class variable and True is returned.

Warning

This method is NOT to be used for SEL RTACs. Instead, use login_rtac()

Parameters:
  • user (str) -- Username to use for login

  • passwd (str) -- Password to use for login

  • protocol (Literal['http', 'https']) -- HTTP protocol to use (‘http’ or ‘https’)

Return type:

bool

Returns:

If the login was successful

read_page(cmd, param=None)[source]

Creates URL from given cmd and parameter and extracts text data from that page.

Parameters:
  • cmd (str) -- command string for webpage url

  • param (str | None) -- command parameters for webpage url

Return type:

str

Returns:

The relevant page data needed for parsing

Raises:

PeatError -- if the page read fails. This may be because PEAT can’t determine the correct command URL format for the device or if another error occurs, such as a communication error.

parse_page_data(content)[source]

Parse raw HTML page data and extract the contents of the pre section.

The “pre” tag contains rax HTML text. For the SEL relays, this usually contains the output of the command run “under the hood”, e.g. “sho p1”.

Return type:

str

login_rtac(user='ACC', passwd='OTTER', protocol='https')[source]

Login to the HTTP interface for the SEL RTAC. This works for 3530, 3530-4, and 3350.

Note: Nginx is the web server used by the SEL RTAC devices.

Return type:

bool

static dump_htmltable(html_page, config)[source]
Return type:

None

get_index(dev)[source]

Get the data from the gateway dashboard.

Return type:

dict

view_dashboard(dev)[source]

Get the data from the RTAC ‘dashboard’ (device_info.sel).

Return type:

dict

view_usagepolicy(dev)[source]

Get the data from the RTAC ‘usagepolicy’ (customize.sel).

Return type:

dict

view_filesondevice(dev)[source]

Get data about files on a RTAC (file_repository.sel).

Return type:

dict

download_http_files(dev)[source]

Take file listing from view_filesondevice() and download from web server

Return type:

dict

view_features(dev)[source]

Get data about features on a RTAC (licensed_features.sel).

Return type:

dict

view_accounts(dev)[source]

Get data about user accounts on a RTAC (user_table.sel).

Return type:

dict

view_user_roles(dev)[source]

Get data about user roles on a RTAC (list_user_roles.sel).

Return type:

dict

view_syslog(dev)[source]

Get the system log (SOE, Sequence of Events) from a RTAC (soe.csv).

Return type:

dict

view_ip_tables(dev)[source]

View diagnostics and network data from a RTAC (services_rpt.cev).

Return type:

dict

Data includes: network state, firewall, route table,

exeguard, prp diagnostic, arp table.

view_ldap(dev)[source]

Get LDAP data from a RTAC (ldap_server_table.sel).

Return type:

dict

rtac_delete_logs()[source]

Clear log data from a RTAC (system_restore.sel).

Return type:

None

rtac_log_out()[source]

Log out of the RTAC (logout.sel).

Return type:

None

rtac_reboot()[source]

Restart RTAC (reboot_device.cev).

Return type:

None

get_device_features(dev)[source]

Get basic information about a relay.

ver command page.

Data :rtype: bool

  • Device model

  • FID

  • BFID

  • CID

  • Checksum (firmware)

  • Firmware info

  • Part number

  • Serial number

  • Various additional information about capabilities and boards

get_status(dev)[source]

Get device status information from a relay.

Return type:

bool

Returns:

Data extracted from the status page. Refer to parse_status_output() for details on the data extracted from this page.

get_communications(dev)[source]

Get communication interface status and current state from a relay.

NOTE: this is not available on 351S or 351.

Return type:

bool

get_port_settings(dev)[source]

Get communication port settings from a relay.

Return type:

bool

get_front_panel_settings(dev)[source]

Get front panel settings from a relay.

Return type:

bool

get_sequential_events(dev)[source]

Get Sequential Event Recorder (SER) log data from a relay.

This method has been tested against SEL 351, 351S, and 451 relays.

Return type:

bool

get_historical_events(dev)[source]

Get historical event log from a relay.

Tested on SEL 351S and 451 relays. May work on the 351. :rtype: bool

Warning

Older versions of the 351 may not support this (failed on a device with copyright year 2009, your mileage may vary on newer devices).

get_meter_automation(dev)[source]

Get meter automation data from a relay.

Values: dict of {"AMV<int>": <float>}

Return type:

bool

get_meter_protection(dev)[source]

Get meter protection data from a relay.

Values: dict of {"PMV<int>": <float>}

Return type:

bool

get_meter_energy(dev)[source]

Get meter energy data from a relay.

Return type:

bool

get_output_data(dev)[source]

Get output data (main_board, etc.) from a relay.

NOTE: this doesn’t work on SEL-351 or 351S (or possibly just older software).

Return type:

bool

get_logic_data(dev)[source]

Get logic settings and control equations from a relay.

This is the output of the “sho l” command, e.g. “sho l”, “sho l 1” (logic group 1), etc. “sho l” (no argument) shows the logic for the “active” logic group.

There should be 6 logic groups. Other relays may have more or less.

This should work on 351S.

The 451 calls this “Protection”, and structure is far more complicated.

Return type:

bool

login_3620(user='admin', passwd='Admin123!')[source]

Login to the HTTPS web interface for the SEL-3620 Gateway.

Return type:

bool

gateway_accounts(dev)[source]

Get user accounts from gateway (Users.sel).

Return type:

dict

gateway_ldap(dev)[source]

Get LDAP settings (LDAP.sel).

Return type:

dict

gateway_radius(dev)[source]

Get RADIUS settings (RADIUS.sel).

Return type:

dict

gateway_local_groups(dev)[source]

Get local groups (LocalGroups.sel).

Return type:

dict

gateway_network_settings(dev)[source]

Get network settings (NetworkSettings.sel).

Return type:

dict

gateway_static_routes(dev)[source]

Get static route configuration (StaticRoutes.sel).

Return type:

dict

gateway_firewall(dev)[source]

Get firewall settings (Firewall.sel).

Return type:

dict

gateway_nat(dev)[source]

Get NAT settings (NAT.sel).

Return type:

dict

gateway_hosts(dev)[source]

Get host configuration (Hosts.sel).

Return type:

dict

gateway_snmp(dev)[source]

Get SNMP settings (SNMP.sel).

Return type:

dict

gateway_port_settings(dev)[source]

Get serial port settings (SerialPortSettings.sel).

Return type:

dict

gateway_port_profiles(dev)[source]

Get serial port profiles (SerialPortProfiles.sel).

Return type:

dict

gateway_port_mappings(dev)[source]

Get port mappings (PortMappings.sel).

Return type:

dict

gateway_ipsec(dev)[source]

Get IPSec connections (IPsec.sel).

Return type:

dict

gateway_macsec(dev)[source]

Get MACSec connections (MACsec.sel).

Return type:

dict

gateway_allowed_clients(dev)[source]

Get Allowed Clients (AllowedClients.sel).

Return type:

dict

gateway_sshkey(dev)[source]

Get SSH Host Keys (SSH_Host_Key.sel).

Return type:

dict

gateway_password_management(dev)[source]

Get password management messages (PasswordManagement.sel).

Return type:

dict

gateway_system_logs(dev)[source]

Get system logs (SysLogReport.sel).

Return type:

dict

gateway_diagnostics(dev)[source]

Get diagnostic data from SEL gateway (Diagnostics.sel).

Return type:

dict

gateway_proxy_reports(dev)[source]

Get proxy reports from ProxyReports.sel.

Return type:

dict

gateway_cert_export(dev)[source]

Export X509 certificates from (x509.sel).

Return type:

dict

querystr()[source]
Return type:

str

parse_simple_table(table_start, table_end, page_data)[source]
Return type:

dict

parse_simple_list(list_start, list_end, page_data)[source]
Return type:

list[str]

parse_multiple_lists(page_data)[source]
Return type:

dict

parse_simple_line(param_name, page_data)[source]
Return type:

str

normalize_keys(obj)[source]

Convert dictionary keys to lowercase underscore-separated.

Return type:

dict

parse_comments(obj)[source]

Split value strings with comments into the value and the comment.

Return type:

dict

read_html(content)[source]

Replace functionality of the Pandas HTML parser.

Return type:

dict[str, str]

clean_text(data)[source]

Attempt to remove whitespace and new line characters.

Return type:

str

8.5.9.4.2. SELTelnet

class SELTelnet(ip, port=23, timeout=5.0)[source]

Telnet wrapper for SEL relays.

This is a transport implementation of SELAscii. Refer to SELAscii for functions/commands to run.

Example usage of SELTelnet with a SEL 451 on the PEAT testing rack
>>> from peat.modules.sel.sel_telnet import SELTelnet
>>> tn = SELTelnet('192.0.2.123')  
>>> tn.elevate(1)  
True
>>> tn.list_files()  
['CFG.TXT', 'CFG.XML', 'EVENTS', 'REPORTS', 'SETTINGS', 'SWCFG.ZIP', 'SYNCHROPHASORS']
>>> tn.disconnect()  
property comm: Telnet

Underlying communication protocol class instance used for interacting with the device (e.g. Telnet).

test_connection()[source]

Test connection to the device.

Return type:

bool

disconnect()[source]

Cleanly disconnect from the device.

Return type:

None

read(delay=None, strip_whitespace=True)[source]
Return type:

str

read_until(until, strip_whitespace=True)[source]
Return type:

str

can_download_files()[source]

If file downloads are possible via Telnet.

Return type:

bool

download_binary(filename, save_to_file=True)[source]

Download a file from the device.

Warning

File downloads require level 1 (acc) permissions on all devices we’ve seen. Ensure you have elevated the login level with elevate() before calling this method!

Note

Older devices, notably the 451 and 2032 on the PEAT rack, don’t support the file show command and therefore CANNOT download files via Telnet.

Note

The name download_binary is used for compatibility with code that calls the same method on instances of FTP. The interfaces are functionally identical.

Parameters:
  • filename (str) -- Case-insensitive name of the file to download or the directory and file pair, separated by a space.

  • save_to_file (bool) -- If the data should be automatically written to a file in the output directory for the device matching self.address.

Return type:

str | None

Returns:

The contents of the file, or None if there was an error

Raises:

DeviceError -- Automatic privilege elevation failed

8.5.9.4.3. SELSerial

class SELSerial(serial_port, baudrate=9600, timeout=5.0, force_ymodem=False)[source]

Serial functionality for SEL relays over a RS-232 serial link.

This is a transport implementation of SELAscii. Refer to SELAscii for functions/commands to run.

The class can be used either for one-off pulls or establishing a longer-lived connection for continual polling/monitoring.

YMODEM is required for relays that don’t have the “file show” command. PySerial doesn’t support YMODEM, and the Python modem package hasn’t been updated in over 11 years, so we’re forced to subprocess and call the rz/sz commands from the lrzsz project. This is not available on Windows, but is available on Linux (sudo apt install lrzsz) and MacOS (sudo brew install lrzsz).

  • show supported: 700G, 351S, 351

  • read: 451, 2032 (Requires Ymodem to transfer the file)

PRE_WRITE_SLEEP = 1.0
property comm: Serial

Underlying communication protocol class instance used for interacting with the device (e.g. Telnet).

property supports_show: bool

If the device supports the file show command. This allows transfer of files without requiring the use of YMODEM.

property sz_path: str

Absolute file path to the sz executable.

References

property rz_path: str

Absolute file path to the rz executable.

References

test_connection()[source]

Test connection to the device.

Return type:

bool

disconnect()[source]

Cleanly disconnect from the device.

Return type:

None

read(delay=None, strip_whitespace=True)[source]
Return type:

str

read_until(until, strip_whitespace=True)[source]
Return type:

str

write(command)[source]

Send a command to the device.

Parameters:

command (bytes | str | int) -- Command to send (this will be automatically encoded)

Return type:

None

ymodem_read_file(file_id)[source]

Read a file from the relay over YMODEM.

Warning

Requires the rz command from the lrzsz library.

Thus uses the file read <filename> command on the relay, and rz program from the lrzsz package on the client.

Parameters:

file_id (str) -- file to read. This can be a filename, e.g. “CFG.TXT”, or a path to a file, e.g. “SETTINGS SET_A1.TXT” or “SETTINGS/SET_A1.TXT”. Basically, anything that’s accepted by the file read command on the relay.

Return type:

bytes | None

ymodem_push_configs(config_files, configs_dir)[source]

Upload (push) configuration files to the device over serial.

Warning

Requires the sz command from the lrzsz library.

Parameters:
  • config_files (list[str]) -- Names of configuration files to upload

  • configs_dir (Path) -- Directory with configuration files to upload

Return type:

bool

Returns:

If the push was successful

Raises:
  • CommError -- sz command not found

  • CalledProcessError -- sz command execution failed

download_binary(filename, save_to_file=True)[source]
Return type:

str | bytes | None

8.5.9.4.4. SELASCII

class SELAscii(address, timeout=5.0)[source]

Commands and parsers for the SEL ASCII command interface.

DATA_REGEX = re.compile('\\x02([^\\x03]*)\\x03', re.ASCII)
ENCODING = 'ascii'
PRE_WRITE_SLEEP = 0.15
POST_WRITE_SLEEP = 0.1
READ_DELAY = 0.15
LINE_TERMINATOR = '\r\n'
DEFAULT_CREDS = {'2ac': 'TAIL', 'acc': 'OTTER', 'bac': 'EDITH', 'cal': 'CLARKE'}
abstract property comm

Underlying communication protocol class instance used for interacting with the device (e.g. Telnet).

abstract test_connection()[source]

Test connection to the device.

Return type:

bool

abstract disconnect()[source]

Cleanly disconnect from the device.

Return type:

None

abstract read(delay=None, strip_whitespace=True)[source]
Return type:

str

write(command)[source]

Send a command to the device.

Parameters:

command (bytes | str | int) -- Command to send (this will be automatically encoded)

Return type:

None

read_lines(exclude_command=True, read_until_str=False)[source]

Read raw data and process it into a clean trimmed list.

Parameters:
  • exclude_command (bool) -- Exclude the first item in the list, which is generally the command that was executed

  • read_until_str (bool) -- read until ASCII “0x0003” character is encountered, which according to SEL’s documentation indicates the end of output from a command.

Return type:

list[str]

exec_read(command, return_lines=True, added_delay=None, read_until_prompt=False)[source]

Execute a command and return the results.

Parameters:
  • command (str) -- The command to execute, e.g. “date” or “id”

  • return_lines (bool) -- If the return value should be a list of cleaned and trimmed strings. If False, then the raw output is returned unmodified.

  • added_delay (float | None) -- Add a sleep between executing the command and reading the reply. Needed for commands that take a while to execute on the device.

  • read_until_prompt (bool) -- Dynamic way of waiting for large outputs or latency

Return type:

str | list[str]

extract(raw_data)[source]

Extract the actual data payload from raw command output.

Return type:

str

elevate(level, creds=None)[source]

Elevates user privileges on the device.

Warning

This does not handle non-standard privilege levels, such as bac, that are not the standard three levels for SEL (acc/1, 2ac/2, cal/3).

Parameters:
  • level (int) --

    Privilege level to elevate to, as a int. Available levels:

    • 1 : The acc level

    • 2 : The 2ac level

    • 3 : The cal level (aka the calibration level)

  • creds (dict | None) -- Login credentials to use for each level, as a dict. If not specified or None, SEL’s default creds are used. Example: {"acc": "l1pass", "2ac": "l2pass", "cal": "calpass"}

Return type:

bool

Returns:

If the privilege elevation was successful

get_active_group()[source]

Determine the currently active settings group on a relay (gro command)

Return type:

str

Returns:

The active settings group

change_active_group(new_group)[source]

Change the active setting group via the gro command.

Note

If the currently active group matches the group to change to then the group change command will NOT be executed.

Warning

This method will NOT validate if the group number being changed to is valid for this particular relay!

Parameters:

new_group (str) -- Group to change to. Examples: "6", "1"

Return type:

None

restart_device()[source]

Execute a system restart on the SEL relay (STA C command)

Warning

This will clear all relay self-test warnings and failures

Return type:

bool

Returns:

If the restart initiation was successful

get_sta()[source]

sta command (basic device information and status).

Warning

This function currently does not work reliably on all devices

Return type:

dict[str, datetime | str]

Returns:

Data extracted from the command output. Refer to parse_status_output() for details on the data extracted from the command.

get_id()[source]

id command.

Typical information pulled by id :rtype: dict[str, str]

  • FID

  • BFID

  • CID

  • Device name (DEVID)

  • Part number (PARTNO)

  • Serial number (SERIALNO)

  • Hardware configuration (?) (CONFIG)

  • Special

  • IED Name (iedName)

  • Type

  • Configuration version (configVersion)

parse_exit_info(raw)[source]

Extract info returned when we disconnect cleanly (run exit).

Info keys :rtype: dict[str, datetime | str]

  • time_source (Optional)

  • RID

  • device_time

  • TID

Info includes the RID, TID, and current device time. It may also include the time source identifier.

list_files(directory=None)[source]

List files in a directory on the device.

Note

File listing requires level 1 (acc) permissions on all devices we’ve seen. The privilege level will be automatically escalated when this function is called, if needed.

Parameters:

directory (str | None) -- Case-insensitive name of directory to list the contents of. If None or empty string, the current directory is listed.

Return type:

list[str]

Returns:

List of names of files in the directory.

Raises:

DeviceError -- Automatic privilege elevation failed

show_bre()[source]

bre command. Provides the Breaker monitor report.

Return type:

list[str]

show_mac()[source]

mac command.

Return type:

dict[str, str]

show_eth()[source]

eth command.

Return type:

list[str]

get_device_time()[source]

Combines output of date and time commands.

On the SEL-451 and some other models, the output may also contain timezone information. This information will be returned as part of the datetime.datetime object.

Return type:

datetime | None

show_his()[source]

his command.

Return type:

list[str]

show_status()[source]

sta s command.

Known devices that DO NOT support this command: :rtype: list[str]

  • SEL-2032

  • SEL-351

  • SEL-351S

show_ser()[source]

ser command. Sequential Event Recorder (SER).

NOTE: this can take a while if the SER has a lot of entries.

Return type:

list[str]

show_sum()[source]

sum command.

Potentially not present on: :rtype: list[str]

  • SEL-2032

  • SEL-351

show_eve()[source]

eve command. Event reports.

Return type:

list[str]

8.5.9.4.5. General comms functions

clean_filenames(filenames)[source]

Some file listings contain embedded null bytes.

Return type:

list[str]

download_files(files, comms, save_dir, handle_download_errors=True)[source]

Download a selected list of files from a relay in a comms-agnostic manner.

{
    "filename": {
        "name": filename,
        "data": file_data,
        "raw_path": path,
        "device_path": PurePosixPath(...),
        "local_path": Path(...),
    },
    ...
}
Return type:

dict[str, dict[str, bytes | str | Path]]

filter_names(paths, only=None, never=None)[source]
Return type:

list[str]

pull_files(dev, comms)[source]

Recursively download all files from a relay in a comms-agnostic manner.

Parameters:
  • dev (DeviceData) -- The device to pull from

  • comms (FTP | SELTelnet | SELSerial) -- Communication method to use, as a class instance. It must have the methods list_files() and download_binary.

Return type:

dict[str, dict[str, bytes | str]]

Returns:

The files downloaded, keyed by filename, with value being a dict including the file data, the device path, and the local path (if it was saved locally).

populate_file_listing(dev, comms)[source]
Return type:

None

8.5.9.4.6. Relay Parsing

parse_rdb(rdb_data)[source]

Extract usable configuration from a SEL QuickSet database (*.rdb file).

Parse the binary data contained in a rdb project file and convert it to a string that is the same format used in SET_ALL.TXT.

Parameters:

rdb_data (bytes) -- Binary data from an .rdb project file

Return type:

str

Returns:

Device configuration information parsed from the file, in the SET_ALL.TXT format

extract_sections(raw_text)[source]

Convert raw text of a config file into a dict with the text lines for each section.

Return type:

dict[str, list[str]]

parse_ini_style(lines)[source]

INI-style syntax with key-value pairs.

Return type:

dict[str, str]

extract_csv_lines(lines)[source]
Return type:

list[list[str]]

parse_csv_style(raw_lines, section_name)[source]

Parse lines of sections with CSV-style syntax. It’s an odd and inconsistent format, but Python’s CSV parser is apparently able to handle it with ablomb, so we use that instead of trying to write some crazy regex ourselves. A good example of weird formatting is from the SEL-451, look at the SET_ALL files in tests/ and examples/.

Return type:

list[str] | dict[str, str]

parse_sections(raw_sections)[source]
Return type:

dict[str, list[str] | dict[str, str]]

parse_config_data(config_text)[source]

Convert config file sections into a Python dict structure.

SEL config files structure is like a combination of INI and CSV file formats. Sections are in INI style, while variables may be in key=value structure or in a comma-separated CSV structure.

Every config file starts with a [INFO] section, so this section ends up duplicated in every config file.

SET_ALL.TXT contains all of the data from all of the config files.

Example 1:

[L3]
TR,"OC+51PT+51GT+81D1T+LB3+50P1*SH0","","","",""
TRCOMM,"0","","","",""
TRSOTF,"0","","","",""

Example 2:

[INFO]
RELAYTYPE=0351
FID=SEL-351-5-R510-V0-Z103103-D20110429
BFID=SLBT-3CF1-R102-V0-Z100100-D20091207
PARTNO=035152B3A11XX1
Parameters:

config_file_data -- complete contents of a config file, including all sections

Return type:

dict[str, list[str] | dict[str, str]]

Returns:

dict representation of the config file values. Top level keys in the dict are config sections, e.g. "INFO" or "P1". Nested keys are individual values in those sections, e.g. "RELAYTYPE" OR "TRCOMM"

parse_cfg_txt(cfg_txt, dev)[source]

Parse contents of CFG.TXT, including the [INFO] and [CLASSES] sections.

Parameters:
Return type:

dict[str, list[str] | dict[str, str]]

Returns:

Raw configs extracted from CFG.TXT

parse_set_all(set_all_data, dev=None)[source]

Parse the data contained in SET_ALL.TXT.

Parameters:
  • set_all_data (bytes | str) -- Data from a SET_ALL config pulled from an relay or extracted from a RDB project file

  • dev (DeviceData | None) -- DeviceData instance to add data to

Return type:

tuple[dict[str, dict], DeviceData]

Returns:

Tuple with the device configuration information parsed from the file and the DeviceData object generated from the file data (or passed as argument to this function)

process_info_classes(configs, dev)[source]

Process the [INFO] and [CLASSES] sections into the PEAT data model.

Return type:

None

create_dev_from_configs(configs)[source]

Generate a DeviceData object using the best available identification for a device (IP, name, etc.).

Return type:

DeviceData

process_info_into_dev(info, dev)[source]

Put data from the info dict into the PEAT device data model.

Return type:

None

process_fid(id_string, dev)[source]

Get data from the FID string and put it into the PEAT device data model.

Return type:

None

parse_ipaddr(settings)[source]

Parse IPADDR section out of a port config (e.g., P5).

IP/ethernet interfaces. Usually port 5 (P5), though it could potentially be other interfaces.

Return type:

Interface

process_port_settings(port_id, settings, dev)[source]

Process parsed port settings into the device data model.

Return type:

None

process_network_configuration(configs, dev)[source]

Process relay network information into the data model.

Return type:

None

parse_ids(configs)[source]

Extract and parse various relay IDs from a set of configs.

These include:

  • RID (Relay ID)

  • SID (Station ID)

  • TID (Terminal ID)

Parameters:

configs (dict[str, dict]) -- parsed SET_ALL as a dict

Return type:

dict[str, str | dict[str, str]]

Returns:

Parsed relay IDs, including cleaned values, raw values, and the relay IDs for each settings group (e.g. the ID for group 1).

Example of returned data from parse_ids
{
    "relay_id_by_group": {"1": "GENERATOR", "2": "GENERATOR"},
    "station_id_by_group": {},
    "terminal_id_by_group": {"1": "TERMINAL", "2": "TERMINAL"},
    "relay_id": "generator",
    "station_id": "",
    "terminal_id": "terminal",
    "raw_rid": "GENERATOR",
    "raw_sid": "",
    "raw_tid": "TERMINAL",
}

parse_and_process_modbus(configs, dev)[source]

Put Modbus registers into the data model as Register objects.

Parameters:
  • configs (dict[str, dict]) -- parsed SET_ALL as a dict

  • dev (DeviceData) -- device object to add the data to

Return type:

None

parse_and_process_dnp3(configs, dev)[source]

Put DNP3 registers and inferred IO-points into the data model as Register and IO objects.

Parameters:
  • configs (dict[str, dict]) -- parsed SET_ALL as a dict

  • dev (DeviceData) -- device object to add the data to

Return type:

None

parse_protection_schemes(configs, device_type='')[source]
Return type:

dict[str, str | list]

parse_logic_section(configs, logic_keys, logic_type)[source]
Return type:

dict[str, dict[str, str]]

extract_fid(data)[source]
Return type:

str

extract_bfid(data)[source]
Return type:

str

extract_cid(data)[source]
Return type:

str

extract_part_number(data)[source]
Return type:

str

extract_serial_number(data)[source]
Return type:

str

extract_selboot_checksum(data)[source]
Return type:

str

extract_string(data, regex, flags=None)[source]

Extract a string from arbitrary data.

Return type:

str

parse_fid(id_string)[source]

Parse the FID and BFID strings into a dict with the relay model and firmware information.

  • Model: “SEL-351”

  • Variant (??): “5” (some devices may not have this portion)

  • Release: “R510” (major release, generally new or changed features)

  • Point release: “V0” (minor release, usually fixes or minor changes)

  • Settings version: “Z103103” (indicates what version of SEL ACCSELERATOR software to use)

  • Firmware release date: “D20110429” (2011-04-29, or April 29, 2011)

Parameters:

id_string (str) -- Firmware ID string to parse, e.g. FID or BFID

Return type:

dict[str, str]

Returns:

Information extracted from the firmware ID

  • id (the full FID/BFID string)

  • version (“V0”)

  • revision (“R322”)

  • settings_version (“Z025013”)

  • release_date (datetime object)

  • (FID-only) model (“451”)

  • (FID-only) product (“SEL-451-5”)

parse_status_output(lines)[source]

Parse status command output.

This data can be retrieved via Telnet or Serial with the sta command or scraped from the HTTP web interface device status page.

The data extracted will vary by device model. This can then be processed by process_info_into_dev().

Data that may be extracted:

  • device_name: Device name ("STATION A")

  • device_identifier: Device identifier ("Relay 1")

  • device_time: Current date and time on the device

    (datetime instance)

  • FID: FID string

  • CID: CID string ("F029")

  • (TODO) Temperature

  • (TODO) Self-test results for components (RAM, ROM, FPGA, etc.)

  • (TODO) Voltages and currents

Parameters:

lines (list[str]) -- Extracted status info lines

Return type:

dict

Returns:

Data extracted from the command output

parse_ver_output(data)[source]
Return type:

dict[str, str]

event_data_present(data)[source]
Return type:

bool

parse_and_process_events(data, dataset, dev)[source]
Return type:

tuple[list[dict], dict]

parse_ser(raw_lines)[source]

Parse output of SER.TXT, the ser command, or the ser web page.

Examples of SER data:

  • SER.TXT pulled via FTP or Telnet

  • Output from ‘ser’ command (via Telnet or serial)

  • “ser” page in the web interface

Return type:

tuple[list[dict], dict]

Returns:

Events and status info. Events will be empty if there are no events. Status info will be empty if no front matter was present.

parse_history(raw_lines)[source]

Parse output of HISTORY.TXT.

Return type:

tuple[list[dict], dict]

Returns:

Events and status info. Events will be empty if there are no events. Status info will be empty if no front matter was present.

extract_csv_dicts(lines)[source]
Return type:

tuple[dict[str, str], list[dict[str, str]]]

parse_cser(lines)[source]

Extract and parse events and device info from CSER.TXT.

Return type:

tuple[list[dict], dict]

Returns:

Events and device info (FID, CID, and serial number).

parse_chistory(lines)[source]

Extract and parse events and device info from CHISTORY.TXT.

Return type:

tuple[list[dict], dict]

Returns:

Events and device info (just the FID generally with CHISTORY).

process_events(raw_events, dev, dataset)[source]

Process events into the data model.

Parameters:
  • raw_events (list[dict]) -- list of events extracted from an event file (SER, CSER, HISTORY, CHISTORY)

  • dev (DeviceData) -- device object to add events to

  • dataset (str) -- Dataset of the events, e.g. ser, CSER.TXT, HISTORY.TXT

Return type:

None

extract_cid_content(data)[source]
Return type:

str

process_cid_file(data, filepath, dev)[source]

.CID files are zlib-compressed data (zlib-header 0x7801 - No Compression/low). The only known CID file currently is SET_61850.CID, which contains XML data.

This function decompresses the zlib data and saves the decoded text to a new file.

Return type:

str

is_enabled(conf)[source]
Return type:

bool

value_is_enabled(value)[source]
Return type:

bool

clean_id(id_str)[source]
Return type:

str

split_lines(val)[source]
Return type:

list[str]

8.5.9.4.7. RTAC Parsing

SEL RTAC parsing functions.

convert_to_elastic(_input)[source]

Cast input to inferred type (int, bool, None) for elasticDB.

Return type:

None | int | bool | str | float

parse_accesspointrouters(router, device_info)[source]
Return type:

None

parse_devices(device, device_info)[source]
Return type:

None

parse_settings(start_element, terms)[source]
Return type:

dict

parse_device_io_tags(start_element, page_name)[source]
Return type:

dict

parse_device_coils(start_element, page_name)[source]
Return type:

dict

parse_device_discrete_inputs(start_element, page_name)[source]
Return type:

dict

parse_device_modbus_registers(start_element, page_name)[source]
Return type:

dict

parse_maincontroller(maincontroller, device_info)[source]
Return type:

None

parse_contact_ios(contact, device_info)[source]
Return type:

None

tagname_from_ET(row, identifier)[source]
Return type:

str

parse_tagprocessor(tag, device_info)[source]
Return type:

None

parse_systemtags(tag, device_info)[source]
Return type:

None

parse_pous(pou, device_info)[source]

Offline parse programming logic.

Return type:

None

8.5.10. Siemens

8.5.10.1. SIPROTEC 7SJ6x relays

The Siemens SIPROTEC 4 Relay.

Services

  • WebMonitor (TCP 80 and TCP 443)

  • IEC61850 (TCP 102)

  • SNMP (UDP 161)

  • Unknown service (UDP 43690)

  • DIGSI (UDP 50000)

  • WebMonitor comms (UDP 56797)

There is a web interface located at port 80 that uses Java applets for some features (use Internet Explorer and Java version 6).

Authors

  • Christopher Goes

  • Michelle Cabahug

class Siprotec[source]

Siemens SIPROTEC 4 Relay.

device_type: str = 'Relay'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Siemens'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Siemens AG'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'Siprotec'

Device brand. Elasticsearch field: host.description.brand.

model: str = '7SJ6x'

Device’s default model (if not known). Elasticsearch field: host.description.model.

ip_methods: list[IPMethod] = [IPMethod(name='Siprotec WebMonitor HTTP', description='Query Siprotec webmonitor service for particular files via HTTP or HTTPS.', identify_function=functools.partial(<bound method Siprotec._get_webmonitor_data of <class 'peat.modules.siemens.siprotec.Siprotec'>>, protocol='http'), reliability=7, protocol='http', transport='tcp', type='unicast_ip', default_port=80), IPMethod(name='Siprotec WebMonitor HTTP', description='Query Siprotec webmonitor service for particular files via HTTP or HTTPS.', identify_function=functools.partial(<bound method Siprotec._get_webmonitor_data of <class 'peat.modules.siemens.siprotec.Siprotec'>>, protocol='https'), reliability=7, protocol='https', transport='tcp', type='unicast_ip', default_port=443), IPMethod(name='Siprotec SNMP sysDescr', description='Check if a device is a Siprotec by querying SNMP for OID\n        ``1.3.6.1.2.1.1.1.0`` (``sysDescr``) and comparing the\n        output against the string ``SIPROTEC``.', identify_function=<bound method Siprotec._verify_snmp of <class 'peat.modules.siemens.siprotec.Siprotec'>>, reliability=8, protocol='snmp', transport='udp', type='unicast_ip', default_port=161)]

Methods for identifying devices via IP or Ethernet.

8.5.11. Woodward

8.5.11.1. MicroNet Plus

Interact with Woodward MicroNet Plus control systems.

Listening network services

  • FTP (TCP 21)

Authors

  • Christopher Goes

  • Patrica Schulz

  • Jessica Robinson

class MicroNet[source]

Woodward MicroNet Plus.

device_type: str = 'Control System'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Woodward'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Woodward'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'MicroNet'

Device brand. Elasticsearch field: host.description.brand.

model: str = 'Plus'

Device’s default model (if not known). Elasticsearch field: host.description.model.

default_options: dict[str, Any] = {'ftp': {'pass': 'ServiceUser', 'timeout': 10, 'user': 'ServiceUser'}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

classmethod is_file(filename, ftp)[source]

Check if a given file is a file or directory

Return type:

bool

classmethod pull_all_files(dev, directory, output_dir, ftp)[source]

Pull all files available recursively

Parameters:
  • directory (str) -- the next directory to investigate (current directory when this function is called)

  • output_dir (Path) -- the last path (path to direcrtory), which will be added on to each function call

classmethod get_gap_info(dev)[source]

Determine most recent GAP file used from dev.event, Get file info on that GAP file, store in Logic

Return type:

None

ip_methods: list[IPMethod] = [IPMethod(name='Micronet FTP login', description='Verify that this device is a Micronet.\n        The text in the pulled file Registry.txt\n        should contain the string "Micronet".', identify_function=<bound method MicroNet._verify_ftp of <class 'peat.modules.woodward.micronet.MicroNet'>>, reliability=8, protocol='ftp', transport='tcp', type='unicast_ip', default_port=21)]

Methods for identifying devices via IP or Ethernet.

8.5.11.2. 2301E Generator Controller

Woodward 2301e Speed Controller.

Services

  • Servlink (Woodward proprietary protocol) on RS232 and RS422

  • Modbus (optional) on RS232 and RS422

Authors

  • Peter Shurtz

class WDW2301E[source]

Woodward 2301e Speed Controller.

device_type: str = 'Controller'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Woodward'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Woodward, Inc'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'WDW'

Device brand. Elasticsearch field: host.description.brand.

model: str = '2301E'

Device’s default model (if not known). Elasticsearch field: host.description.model.

filename_patterns: list[str] = ['*.wset']

Patterns of files the module is capable of parsing, if any. Patterns are a literal name (*SET_ALL.TXT) or a Unix shell glob (*.rdb), are case-insensitive, and must start with a wildcard character (*). Globs can be anything accepted by glob.

serial_methods: list[SerialMethod] = [SerialMethod(name='wdw2301e_servlink_serial', description='Check if a device is a 2301E via the Woodward-proprietary Servlink\n        protocol over a serial connection.', identify_function=<bound method WDW2301E._verify_serial of <class 'peat.modules.woodward.wdw_2301e.WDW2301E'>>, reliability=3, type='direct')]

Methods for identifying devices via a serial connection.

8.5.11.3. easYgen 3500XT Genset Controller

easYgen 3500XT pulling and parsing functionality,

Pull is done using FTP and ServLink/TCP. Most of config pulled from Servlink, a variant of Modbus-TCP, over port 666. Pulling mimics the Woodward Toolkit storage which saves config into wset file.

Authors

  • Christopher Goes

  • Ryan Vrecenar

class Easygen3500XT[source]
device_type: str = 'Genset Controller'

Type of device, e.g “PLC”, “Relay”, “RTU”, “RTAC”, etc. Elasticsearch field: host.type.

vendor_id: str = 'Woodward'

Short-form vendor name. Elasticsearch field: host.description.vendor.id.

vendor_name: str = 'Woodward, Inc'

Long-form vendor name. Elasticsearch field: host.description.vendor.name.

brand: str = 'easYgen'

Device brand. Elasticsearch field: host.description.brand.

model: str = '3500XT'

Device’s default model (if not known). Elasticsearch field: host.description.model.

filename_patterns: list[str] = ['*.wset', '*.tc']

Patterns of files the module is capable of parsing, if any. Patterns are a literal name (*SET_ALL.TXT) or a Unix shell glob (*.rdb), are case-insensitive, and must start with a wildcard character (*). Globs can be anything accepted by glob.

default_options: dict[str, Any] = {'ftp': {'pass': 'CL0001', 'user': 'CL01'}, 'woodward': {'pull_methods': ['servlink_tcp', 'ftp']}}

Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials.

ip_methods: list[IPMethod] = [IPMethod(name='Woodward easYgen FTP', description="Verify via FTP login and check of current directory ('pwd').", identify_function=<bound method Easygen3500XT._verify_ftp of <class 'peat.modules.woodward.easygen_3500xt.Easygen3500XT'>>, reliability=5, protocol='ftp', transport='tcp', type='unicast_ip', default_port=21), IPMethod(name='Woodward Servlink/TCP', description='Verify if a device is a easyYgen 3500 via the\n        Woodward-proprietary Servlink/TCP protocol.', identify_function=<bound method Easygen3500XT._verify_servlink of <class 'peat.modules.woodward.easygen_3500xt.Easygen3500XT'>>, reliability=7, protocol='servlink_tcp', transport='tcp', type='unicast_ip', default_port=666, port_function=functools.partial(<function easygen_port_check>, port=666)), IPMethod(name='Woodward Servlink/TCP alt port 667', description='Verify if a device is a easyYgen 3500 via the\n        Woodward-proprietary Servlink/TCP protocol.', identify_function=functools.partial(<bound method Easygen3500XT._verify_servlink of <class 'peat.modules.woodward.easygen_3500xt.Easygen3500XT'>>, port=667), reliability=7, protocol='servlink_tcp', transport='tcp', type='unicast_ip', default_port=667, port_function=functools.partial(<function easygen_port_check>, port=667))]

Methods for identifying devices via IP or Ethernet.

serial_methods: list[SerialMethod] = [SerialMethod(name='Servlink serial verification', description='Check if a device is a easYgen 3500XT via the Woodward-proprietary Servlink\n        protocol over a serial connection.', identify_function=<bound method Easygen3500XT._verify_serial of <class 'peat.modules.woodward.easygen_3500xt.Easygen3500XT'>>, reliability=6, type='direct')]

Methods for identifying devices via a serial connection.

easygen_port_check(dev, port)[source]

Function to set the TCP RST flag for Servlink/TCP scanning.

Return type:

bool

8.5.12. Windows

8.5.12.1. WindowsCE

PEAT module for Windows CE embedded devices.

Usage:

peat parse -d WindowsCE -- pillage-system_info.json

Supported devices:

  • Siemens TP700 Comfort HMI Panel

  • Rockwell PanelView Plus 7 Standard HMI

8.5.12.2. Internal APIs

8.5.12.2.1. ServLink protocol

Woodward proprietary Servlink serial protocol

SVL TCP

  • ERROR RESPONSE: 0xE10000

  • BAD/EMPTY RESPONSE: 0x0100020001

Authors

  • Peter Shurtz

svl_socket_txn(ip, wr_bytes)[source]
Return type:

bytes | None

svl_socket_connected(ip)[source]
Return type:

bool

svl_socket_initialized(ip)[source]
Return type:

bool

svl_socket_connect(ip, port=None, timeout=None, delim=None, bufsize=None)[source]
Return type:

socket | None

svl_socket_disconnect(ip)[source]
Return type:

None

8.5.12.2.2. MicroNet parsing

get_info_firmware(dev, filepath, timestamp=None)[source]

Extract general info from firmware file. :rtype: bool

  • Hashes

  • System architecture

  • OS version

get_info_bootrom(dev, filepath, timestamp=None)[source]

Extract general info from bootrom file.

Return type:

bool

parse_log(dev, filepath)[source]

Parse Log.txt.

Return type:

bool

get_ip_addrs(dev, events)[source]

Get IP addresses from the log file and store them in data manager.

Return type:

None

get_gap_info(dev, filename, path, timestamp)[source]

Get info on gap file *gaprev*.out.

Return type:

bool

create_artifact(dev, filepath, timestamp)[source]

Get hashes of each file pulled and store each as dict with fields filename, md5, sha256.

Each of these is stored in dev.extra["file_artifacts"].

Return type:

None