"""
Core functionality for interrogating SEL relays.
Supported relay models (not an exhaustive list)
- 300G (Generator Relay)
- 311C (Transmission Protection System)
- 311L (Line Current Differential Protection and Automation System)
- 351 (Protection System)
- 351S (Protection Relay)
- 387 (Current Differential and Overcurrent Relay)
- 411L (Advanced Line Differential Protection, Automation, and Control System)
- 451 (Protection Automation Control)
- 487E (Transformer Protection Relay)
- 587Z (High-Impedance Differential Relay)
- 700G (Generator Protection Relay)
- 710 (Motor Protection Relay)
- 751 (Feeder Protection Relay)
- 2032 (Communications Processor)
- 2411 (Programmable Automation Controller)
.. note::
700G FTP server port check will cause errno 11,
"Resource temporarily unavailable". We fix this
by resetting the connection after an ack, which
is exactly what nmap does. While it's not as "kind",
it's probably less likely to trigger an issue than
hitting a device's application code and exiting in
a non-standard way for the protocol.
.. note::
PEAT is explicitly NOT checking FTP during identify.
- The common case is that relays are configured with Telnet+other things
It is highly unusual for a relay to have FTP enabled but not Telnet.
- FTP requires login
- Many other device types listen on FTP (network load)
- login bans after 3-5 attempts by default
- we're essentially brute forcing the device
- FTP login triggers a software alarm
.. note::
On most relays the FTP password is set by default to
the Level 2 password. If the L2 password changes, so
does the FTP password, unless configured otherwise.
.. warning::
Software alarms will trigger the alarm contact on the relay! Software alarms
are generated when elevating to level 2 in Telnet/Serial ("2ac"), logging
into FTP, and possibly elevating to 2ac in the web interface (untested).
Listening network services (services available vary by device)
- FTP (TCP 21) (Not enabled by default)
- Telnet (TCP 23) (usually relays)
- HTTP (TCP 80) (some devices)
- HTTPS (TCP 443) (switches, some other devices)
- SNMP (UDP 161) (switches, some other devices)
Known Tested FIDs.
Not exhaustive, PEAT has been run on many others not documented here.
Also, not all functionality is working or has been tested for these FIDs.
Some may be parsing only, others may only be scan, etc.
- SEL-311C-2-R508-V0-Z104101-D20150219
- SEL-311L-7-R502-V0-Z106006-D20141106
- SEL-351-5-R510-V0-Z103103-D20110429
- SEL-351S-7-R516-V2-Z106105-D20190111
- SEL-411L-1-R124-V0-Z015003-D20190130
- SEL-451-5-R322-V0-Z025013-D20180630
- SEL-487E-3-R317-V1-Z110102-D20190211
- SEL-700G-R200-V0-Z006003-D20180629
- SEL-710-R411-V0-Z007004-D20170623
- SEL-751-R201-V1-Z007003-D20180921
- SEL-751-R300-V3-Z008004-D20210104
- SEL-2032-R115-V1-Z003001-D20151028
Authors
- Christopher Abate
- Christopher Goes
- George Thompson
- Jordan Henry
- Rachel Glockenmeier
- Taegan Williams
"""
import functools
import time
import timeit
from copy import deepcopy
from pathlib import Path
from pprint import pformat
from typing import Literal
import olefile
import serial
from peat import (
CommError,
DeviceData,
DeviceError,
DeviceModule,
Interface,
IPMethod,
SerialMethod,
Service,
consts,
datastore,
state,
utils,
)
from peat.protocols import FTP, check_tcp_port
from .relay_parse import (
event_data_present,
parse_and_process_events,
parse_cfg_txt,
parse_rdb,
parse_set_all,
process_cid_file,
process_info_into_dev,
)
from .sel_comms import populate_file_listing, pull_files
from .sel_http import SELHTTP
from .sel_serial import SELSerial
from .sel_telnet import SELTelnet
# TODO: auto-construct SET_ALL.TXT from individual files for parse,
# e.g. a directory of SET_* files.
# TODO: support parse input files:
# CFG.XML
# VEC_D.TXT, VEC_E.TXT
# *.CEV
# TODO: calculate uptime based on time since last "Relay powered up" event
# AND no power off event.
# Use: dev.retrieve("event", {...})
# TODO: get BADPASS and other auth events from TAR.TXT
# Refer to section 7.35 in the SEL351S manual.
# ACCESS: Asserts while any user is logged in at Access Level B or higher
# Row SALARM ACCESS ALRMOUT * HALARMA HALARMP HALARML HALARM
# 98 0 0 1 0 0 0 0 0
# Row * * PASNVAL ACCESSP GRPSW SETCHG CHGPASS BADPASS
# TODO: .CEV file parsing
# https://github.com/engineerjoe440/pycev
# https://pycev.readthedocs.io/en/latest/
# Associate with DNP3/Modbus registers?
# Create event entries?
# record.fid => Get FID information
# record.trigger_time
# record.frequency
# record.*_channel_ids (analog, digital, status)
# record.settings
# record.group
# record.*_count
# TODO: improve logic/settings parsing: https://github.com/danyill/sel-settings-terminal/
[docs]
class SELRelay(DeviceModule):
"""
Shared functionality for interrogating SEL relays.
"""
device_type = "Relay"
vendor_id = "SEL"
vendor_name = "Schweitzer Engineering Laboratories"
brand = "SEL"
filename_patterns = [
"*.rdb",
"*SET_ALL.TXT",
"*SET_ALL.txt",
"*set_all.txt",
"*CFG.TXT",
"*CFG.txt",
"*cfg.txt",
# SER and CSER
"*SER.TXT",
"*SER.txt",
"*ser.txt",
# HISTORY and CHISTORY
"*HISTORY.TXT",
"*HISTORY.txt",
"*history.txt",
# *.CID (e.g. SET_61850.CID)
"*.CID",
"*.cid",
]
# These are what's known to work. Others may work as well
supported_models = [
"300g",
"311c",
"311l",
"351",
"351s",
"387",
"411l",
"451",
"487e",
"587",
"587z",
"700g",
"710",
"751",
"2032",
"2411",
]
# Add aliases for specific models, e.g. "sel-300g"
module_aliases = [f"sel-{x}" for x in supported_models]
# For descriptions of options, refer to examples/peat-config.yaml
default_options = {
"ftp": {
"user": "",
"pass": "",
"creds": [
# NOTE: default lockout is 5 attempts
("FTPUSER", "TAIL"), # 351S, 700G, 710, 751
("2AC", "TAIL"), # 451, 411L, 487E (uses L2 password)
("FTP", "TAIL"), # 351
# ("ACC", "OTTER"), # 451, 411L, 487E (uses L1 password)
# If anonymous access is enabled in settings on some devices
("anonymous", "anonymous"),
("anonymous", "anonymous@"),
# ("2ac", "TAIL"),
# ("FTP", "OTTER"),
# ("BAC", "OTTER"),
# ("acc", "OTTER"),
# ("bac", "OTTER"),
],
"pull_delay": 0.5,
},
"web": {
"user": "",
"pass": "",
},
"sel": {
"pull_methods": [
"http",
"ftp",
"telnet",
],
"attempt_more_commands": False,
"allow_telnet_file_download": True,
"force_telnet_file_download": False,
"force_serial_pull": False,
"force_ymodem": False,
"only_download_files": [], # ["SET_6.TXT"]
"only_download_dirs": [], # ["SETTINGS", "EVENTS"]
"never_download_files": [], # ["CFG.XML", "SWCFG.ZIP"]
"never_download_dirs": [], # ["EVENTS", "HMI"]
"restart_after_push": False,
"old_ftp": False,
"handle_download_errors": True,
"creds": {
"acc": "OTTER",
# bac: "BREAKER" access level (present on 351S and others)
"bac": "EDITH",
"2ac": "TAIL",
# SEL-451 'cal' default: "Sel-1"
# "Sel-1" also seen on: SEL-451-5-R324-V1-Z027013-D20201009
"cal": "CLARKE",
},
},
}
# 57600 used by ymodem and by 351 by default
sel_fallback_baudrates = [9600, 57600, 19200]
@classmethod
def _verify_serial(cls, dev: DeviceData) -> bool:
"""
Verify a device is a SEL Relay via commands sent over a serial connection.
"""
baudrates = dev.options["baudrates"] # type: list[int]
if not baudrates:
baudrates = cls.sel_fallback_baudrates
# If baudrate is specified in config, then use it and don't enumerate all baudrates
if dev.options["serial"]["baudrate"]:
baudrates = [dev.options["serial"]["baudrate"]]
timeout = dev.options["serial"]["timeout"] # type: float
cls.log.debug(f"Verifying Serial for {dev.serial_port} (timeout: {timeout})")
for baudrate in baudrates:
try:
# TODO: don't use with statement, rely on atexit for cleanup on exit
with SELSerial(
serial_port=dev.serial_port,
baudrate=baudrate,
timeout=timeout,
) as ser:
if not ser.test_connection():
cls.log.debug(
f"Serial connection test failed on "
f"{dev.serial_port} with baud {baudrate}"
)
continue
# Mark serial port as active
dev._is_active = True
if not cls._selascii_get_id(dev, ser):
cls.log.warning(f"Baudrate {baudrate} didn't work for {dev.serial_port}")
continue
iface = Interface(
type="rs_232",
serial_port=dev.serial_port,
baudrate=baudrate,
parity="none",
stop_bits=1,
flow_control="none",
)
dev.store("interface", iface, lookup="serial_port")
if not dev.options["serial"]["baudrate"]:
if not dev._runtime_options.get("serial"):
dev._runtime_options["serial"] = {}
dev._runtime_options["serial"]["baudrate"] = baudrate
if not cls._selascii_verify_post_process(dev, ser):
return False
if not dev.id:
dev.id = dev.serial_port
return True
except serial.SerialException as ex:
# This is needed on Windows to handle several cases
raise ex
except Exception as ex:
cls.log.warning(
f"Failed verify of serial port {dev.serial_port} at baudrate {baudrate}: {ex}"
)
continue
return False
@classmethod
def _verify_telnet(cls, dev: DeviceData) -> bool:
"""
Verify a device is a SEL Relay via Telnet commands.
"""
port = dev.options["telnet"]["port"] # type: int
timeout = dev.options["telnet"]["timeout"] # type: float
cls.log.debug(f"Verifying Telnet for {dev.ip}:{port} (timeout: {timeout})")
try:
with SELTelnet(dev.ip, port, timeout) as tn:
if not cls._selascii_get_id(dev, tn):
return False
if not cls._selascii_verify_post_process(dev, tn):
return False
return True
except Exception as ex:
cls.log.trace(f"Telnet verify failed for {dev.ip} due to exception: {ex}")
return False
@classmethod
def _selascii_get_id(cls, dev: DeviceData, comm: SELTelnet | SELSerial) -> bool:
id_info = comm.get_id()
if not id_info:
cls.log.debug(f"{comm.type} verify failed for {comm.address}: no 'id' data")
return False
fid = id_info.get("FID")
if not fid:
cls.log.debug(
f"{comm.type} verify failed for {comm.address}: "
f"no FID in 'id' command output"
f"\nRaw output: {id_info}"
)
return False
# Process info from the "id" command, including the FID
process_info_into_dev(id_info, dev)
return True
@classmethod
def _selascii_verify_post_process(cls, dev: DeviceData, comm: SELTelnet | SELSerial) -> bool:
# Close the connection cleanly
try:
comm.disconnect()
except Exception as ex:
cls.log.warning(f"Unclean disconnect during {comm.type} verify: {ex}")
# Attempt to get RID, TID, and current time from
# data the relay dumps when "quit" command is run
# TODO: output is sometimes shown on login (also after certain commands)
# Make a generic parser
# Check if in output after login (do a read())
# Check if in output after exit
exit_info = {}
if "exit" in comm.all_output[-1] or "quit" in comm.all_output[-1]:
try:
exit_info.update(comm.parse_exit_info(comm.all_output[-1]))
except Exception as ex:
cls.log.error(f"Failed to parse {comm.type} exit info: {ex}")
if exit_info:
# list so it gets appended if multiple exits occur
dev.write_file([exit_info], "raw-exit-info.json", merge_existing=True)
# Set device name to the Relay ID or Terminal ID if present
if not dev.name:
for id_candidate in ["RID", "TID"]:
if exit_info.get(id_candidate):
dev.name = exit_info[id_candidate]
break
process_info_into_dev(exit_info, dev)
if not dev.name and dev.extra.get("iedName"):
dev.name = dev.extra["iedName"]
if dev.description.model:
cls._check_model(dev.description.model, dev.ip)
cls.log.info(
f"Verified {comm.type} for {comm.address}!"
f"\nModel: {dev.description.model}"
f"\nName: {dev.name}"
f"\nFID: {dev.firmware.id}"
)
return True
@classmethod
def _verify_http(cls, dev: DeviceData, protocol: Literal["http", "https"] = "http") -> bool:
"""
Verify a device is a SEL Relay via the HTTP web interface.
"""
port = dev.options[protocol]["port"]
timeout = dev.options[protocol]["timeout"]
cls.log.debug(
f"Verifying Relay HTTP for {dev.ip}:{port} using {protocol} (timeout: {timeout})"
)
session = SELHTTP(dev.ip, port, timeout)
logged_in = False
if dev._cache.get("verified_web_user") and dev._cache.get("verified_web_pass"):
logged_in = session.login(
dev._cache["verified_web_user"],
dev._cache["verified_web_pass"],
protocol,
)
else:
if dev.options["web"]["user"] and dev.options["web"]["pass"]:
creds = {dev.options["web"]["user"]: dev.options["web"]["pass"]}
else:
creds = dev.options["sel"]["creds"]
for username, password in creds.items():
cls.log.debug(
f"Attempting SEL Relay HTTP login to {dev.ip} with user '{username}'"
)
logged_in = session.login(username, password, protocol)
if logged_in:
dev._cache["verified_web_user"] = username
dev._cache["verified_web_pass"] = password
dev.related.user.add(username)
break
if logged_in:
# Pull info about device including the model number
if not session.get_device_features(dev):
cls.log.warning(
f"Failed to pull additional HTTP info from "
f"{dev.ip}:{port} after successful login"
)
if dev.description.model:
cls._check_model(dev.description.model, dev.ip)
# Cache the session using this protocol
if not dev._cache.get("web_session"):
dev._cache["web_session"] = session
dev._cache["web_protocol"] = protocol
else:
session.disconnect()
cls.log.info(f"HTTP verification successful for {dev.ip}:{port}")
return True
session.disconnect()
cls.log.debug(f"Relay HTTP verification failed for {dev.ip}:{port}")
return False
@classmethod
def _check_model(cls, model: str, dev_id: str) -> bool:
"""
Emit a warning if the model is not supported by PEAT.
Return false if not supported, return true if it is.
"""
if model.lower() not in cls.supported_models:
cls.log.warning(
f"{dev_id} is a SEL device, however the model '{model}' is not "
f"familiar to PEAT and has not been tested, so your mileage "
f"may vary. Please report this signature to the PEAT team! "
f"(peat@sandia.gov)"
)
return False
return True
@classmethod
def pull_configs(cls, dev: DeviceData, comms: FTP | SELTelnet | SELSerial) -> bool:
all_files = pull_files(dev, comms)
if not all_files:
return False
successful = True
# Parse Sequential Event Recorder (SER) events
events = []
if all_files.get("SER.TXT") and all_files["SER.TXT"]["data"].strip():
events = parse_and_process_events(all_files["SER.TXT"]["data"], "SER.TXT", dev)[0]
# Fallback to parsing CSER if SER parsing fails
if not events and all_files.get("CSER.TXT") and all_files["CSER.TXT"]["data"].strip():
parse_and_process_events(all_files["CSER.TXT"]["data"], "CSER.TXT", dev)
# Parse CFG.TXT, if it's present
if (
all_files.get("CFG.TXT")
and all_files["CFG.TXT"]["data"].strip()
and all_files["CFG.TXT"]["data"].strip() != "=>"
):
try:
parse_cfg_txt(all_files["CFG.TXT"]["data"], dev)
except Exception as ex:
cls.log.warning(f"Failed to parse CFG.TXT pulled from {dev.address}: {ex}")
successful = False
if all_files.get("SET_ALL.TXT") and all_files["SET_ALL.TXT"]["data"].strip():
set_all = all_files["SET_ALL.TXT"]
# Path on the device (PurePosixPath)
dev.logic.file.path = set_all["device_path"]
# Path locally where PEAT is running (Path)
if set_all["local_path"]:
dev.logic.file.local_path = set_all["local_path"]
# Populate the "file" fields with the downloaded file
dev.populate_fields()
parse_res = cls.parse_config(set_all["data"], dev)
if not parse_res:
cls.log.error(f"Failed to parse SET_ALL.TXT pulled from {dev.address}")
successful = False
cls.update_dev(dev) # Populate any fields that are unset
else:
cls.log.warning(
f"SET_ALL.TXT was not pulled from {dev.address} and so logic "
f"parsing was skipped. The file may not exist on this device model."
)
for filename, file_info in all_files.items():
if filename.upper().endswith(".CID") and file_info["data"]:
process_cid_file(
data=file_info["data"], filepath=file_info["device_path"], dev=dev
)
return successful
[docs]
@classmethod
def pull_more_commands(cls, dev: DeviceData, comms: SELTelnet | SELSerial) -> bool:
"""
Attempt to get more info via terminal commands
(get_sta, show_eth, show_status, etc.).
If any of the commands are successful, this returns true.
"""
comms.model = dev.description.model # leak some hints
comms.elevate(1, dev.options["sel"]["creds"])
# NOTE: the SEL-2032 is no longer supported by SEL,
# so how PEAT accesses it shouldn't need to change.
if dev.description.model == "2032":
comms.POST_WRITE_SLEEP = 4.0
commands = {
"ser": comms.show_ser, # list[str] (parse with parsing funcs)
"sta": comms.get_sta, # dict[str, Union[datetime, str]], with FID and other info
"dnpmap": functools.partial(comms.exec_read, "dnpmap", added_delay=2.0),
# "modmap": functools.partial(comms.exec_read, "modmap", added_delay=2.0),
"who": functools.partial(comms.exec_read, "who", added_delay=3.0),
"status": functools.partial(comms.exec_read, "status", added_delay=4.0),
"card_17": functools.partial(comms.exec_read, "card 17", added_delay=4.0),
"card_18": functools.partial(comms.exec_read, "card 18", added_delay=4.0),
}
else:
commands = {
# History commands first
# TODO: ser has timeout issues on large SER logs
"ser": comms.show_ser, # list[str] (parse with parsing funcs)
"his": comms.show_his, # list[str] (parse with parsing funcs)
"sta": comms.get_sta, # dict[str, Union[datetime, str]], with FID and other info
"device_time": comms.get_device_time, # datetime
"active_group": comms.get_active_group,
# TODO: fix parsing code in SELAscii.show_mac()
"mac": comms.show_mac, # dict[str, str]
"eth": comms.show_eth, # list[str] (this is pretty raw)
"bre": comms.show_bre, # list[str]
"eve": comms.show_eve, # list[str]
"sum": comms.show_sum, # list[str]
"status": comms.show_status, # list[str]
}
# required by most models for SER and HIS
# if elevation fails, continue anyway
comms.elevate(2, dev.options["sel"]["creds"])
cls.log.info(f"Attempting to run {len(commands)} SEL terminal commands on {dev.address}")
results = {}
for cmd, func in commands.items():
try:
raw_res = func()
if raw_res:
# save timestamp of when each command was run
results[cmd] = {
"cmd": cmd,
"func": repr(func),
"timestamp": utils.utc_now(),
"result": raw_res,
}
else:
cls.log.warning(f"No output from '{cmd}' function on {dev.address}")
except Exception as ex:
cls.log.warning(f"Failed to run function '{cmd}' on {dev.address}: {ex}")
if not results:
cls.log.error(f"No commands were successful on {dev.address}")
return False
else:
cls.log.info(
f"{len(results)} commands successful on {dev.address} "
f"(total attempted: {len(commands)})"
)
# Dump all of the commands to disk for debugging or investigation
dev.write_file(results, "additional-command-outputs.json")
# Parse the results
try:
if results.get("sta"):
process_info_into_dev(results.pop("sta")["result"], dev=dev)
if results.get("device_time"):
dev.extra["device_time"] = results.pop("device_time")["result"]
if results.get("ser"):
ser_data = "\n".join(results.pop("ser")["result"])
parse_and_process_events(ser_data, dataset="SER", dev=dev)
if results.get("his"):
his_data = "\n".join(results.pop("his")["result"])
parse_and_process_events(his_data, dataset="HIS", dev=dev)
if results.get("mac"):
for mac_name, mac_value in results.pop("mac")["result"].items():
dev.related.mac.add(mac_value)
dev.extra[mac_name] = mac_value
if results.get("active_group"):
active_group = results.pop("active_group")["result"]
dev.extra["active_settings_group"] = active_group
# if results.get("eth"):
# pass # TODO
# if results.get("bre"):
# pass # TODO
# if results.get("eve"):
# pass # TODO
# if results.get("sum"):
# pass # TODO
# if results.get("status"):
# pass # TODO
except Exception:
cls.log.exception(f"Unexpected error processing command results from {dev.address}")
dev.extra["additional_command_outputs"] = results
return False
# Only add to Extra what wasn't parsed or used
if results:
dev.extra["additional_command_outputs"] = results
return True
[docs]
@classmethod
def pull_telnet(cls, dev: DeviceData, pull_files: bool = True) -> bool:
"""
Pull and parse configuration file(s) from the relay via Telnet.
"""
with SELTelnet(
ip=dev.ip,
port=dev.options["telnet"]["port"],
timeout=dev.options["telnet"]["timeout"],
) as tn:
if not tn.test_connection():
cls.log.error(f"Failed Telnet pull from {dev.ip}: failed to connect")
return False
if not tn.elevate(1, dev.options["sel"]["creds"]):
cls.log.error(f"Failed Telnet pull from {dev.ip}: login failed")
return False
dev.related.user.update(tn.successful_usernames)
successful = True
# Get more info via terminal commands (get_sta, show_eth, show_status, etc.)
if dev.options["sel"]["attempt_more_commands"]:
# At least one command should succeed
if not cls.pull_more_commands(dev=dev, comms=tn):
cls.log.error("Failed to pull more Telnet commands")
successful = False
if pull_files:
if not cls.pull_configs(dev, tn):
cls.log.warning(f"There were issues downloading files via Telnet for {dev.ip}")
successful = False
else:
cls.log.debug(f"Skipping file pull via Telnet for {dev.ip}")
return successful
[docs]
@classmethod
def pull_serial(cls, dev: DeviceData) -> bool:
"""
Pull and parse configuration file(s) from the relay via serial.
"""
# TODO: cache the serial object between verify and pull,
# to avoid closing and reopening connection for no reason
if not dev.options["serial"]["baudrate"]:
if not cls._verify_serial(dev):
cls.log.error(
f"Failed pull from {dev.serial_port}: _verify_serial() failed "
f"(verify was run because no baudrate was specified for the "
f"device in the PEAT YAML configuration file)"
)
return False
with SELSerial(
serial_port=dev.serial_port,
baudrate=dev.options["serial"]["baudrate"],
timeout=dev.options["timeout"],
force_ymodem=dev.options["sel"]["force_ymodem"],
) as ser:
if not ser.test_connection():
cls.log.error(f"Failed pull from {dev.serial_port}: connection failed")
return False
if not ser.elevate(1, dev.options["sel"]["creds"]):
cls.log.error(f"Failed pull from {dev.serial_port}: login failed")
return False
dev.related.user.update(ser.successful_usernames)
if dev.options["sel"]["force_ymodem"]:
# YMODEM only works on Linux or OSX (or any system with rz/sz commands)
if consts.WINDOWS:
raise DeviceError(
f"YMODEM pulls do not work on Windows "
f"(sel.force_ymodem was configured for {dev.serial_port})"
)
cls.log.warning(f"Forcing use of YMODEM for serial pull from {dev.serial_port}")
successful = True
if not cls.pull_configs(dev, ser):
cls.log.warning(
f"There were issues downloading files via Serial for {dev.serial_port}"
)
successful = False
# Get more info via terminal commands (get_sta, show_eth, show_status, etc.)
if dev.options["sel"]["attempt_more_commands"]:
# At least one command should succeed
if not cls.pull_more_commands(dev=dev, comms=ser):
cls.log.error("Failed to pull more serial commands")
successful = False
return successful
[docs]
@classmethod
def pull_ftp(cls, dev: DeviceData) -> bool:
"""
Pull and parse configuration file(s) from the relay via FTP.
"""
if not cls._setup_ftp(dev):
cls.log.error(f"FTP pull failed for {dev.ip}: failed setup")
return False
try:
with FTP(
ip=dev.ip,
port=dev.options["ftp"]["port"],
timeout=dev.options["ftp"]["timeout"],
) as ftp:
username = dev.options["ftp"]["user"]
if not ftp.login(username, dev.options["ftp"]["pass"]):
cls.log.error(
f"Failed to pull config from {dev.ip}: FTP login "
f"failed (user: '{username}')"
)
return False
dev.related.user.add(username)
delay = dev.options["ftp"]["pull_delay"]
cls.log.info(
f"FTP logged in to {dev.ip} as '{username}', sleeping for "
f"{delay} seconds before running 'getwelcome()'..."
)
time.sleep(delay)
if not ftp.getwelcome():
cls.log.error(
f"Failed to pull FTP from {dev.ip}: "
f"'getwelcome' failed after login succeeded"
)
return False
# Download files
if not cls.pull_configs(dev, ftp):
cls.log.error(f"Failed to pull FTP from {dev.ip}: no files were downloaded")
return False
return True
except CommError as ex:
cls.log.warning(f"Failed FTP pull from {dev.ip}: connection failed")
cls.log.trace(f"Exception: {ex}")
except Exception as ex:
cls.log.warning(f"Failed FTP pull from {dev.ip} due to an unhandled exception: {ex}")
return False
[docs]
@classmethod
def pull_http(cls, dev: DeviceData, protocol: Literal["http", "https"] = "http") -> bool:
"""
Pull configuration and other data from the relay via HTTP.
"""
port = dev.options[protocol]["port"]
timeout = dev.options[protocol]["timeout"]
cls.log.info(f"Pulling data via HTTP from {dev.ip}:{port} (timeout: {timeout})")
if not dev._cache.get("web_session"):
cls.log.debug(
f"No web session cached in pull_http() for "
f"{dev.ip}:{port}, creating new session..."
)
dev._cache["web_session"] = SELHTTP(dev.ip, port, timeout)
dev._cache["web_protocol"] = protocol
else:
cls.log.debug(f"Using existing web session for {dev.ip}:{port}")
session = dev._cache["web_session"]
if not session.relay_logged_in:
cls.log.debug(f"Session not logged in, logging in to {dev.ip}:{port}")
username = dev._cache.get("verified_web_user")
if not username:
username = dev.options["web"]["user"]
if not username:
username = "ACC"
password = dev._cache.get("verified_web_pass")
if not password:
password = dev.options["web"]["pass"]
if not password:
password = "OTTER"
if not session.login(username, password, protocol):
cls.log.error(
f"Failed to login to web interface on {dev.ip}:{port} with user '{username}'"
)
session.disconnect()
if dev._cache.get("web_session"):
del dev._cache["web_session"]
if dev._cache.get("web_protocol"):
del dev._cache["web_protocol"]
return False
dev.related.user.add(username)
dev._cache["verified_web_user"] = username
dev._cache["verified_web_pass"] = password
web_methods = [
session.get_status,
session.get_communications,
session.get_port_settings,
session.get_front_panel_settings,
session.get_sequential_events,
session.get_historical_events,
session.get_meter_automation,
session.get_meter_protection,
session.get_meter_energy,
session.get_output_data,
]
# This gets called during verify, no reason to call it again.
# However, if it HASN'T been called yet, then it needs to be
# called BEFORE the other methods. Thus, insert(0).
if not dev._is_verified:
web_methods.insert(0, session.get_device_features)
cls.log.info(
f"Beginning web pull from {dev.ip}:{port} using {len(web_methods)} web methods"
)
# Note the number of successful methods somewhere
dev._cache["num_successful_methods"] = 0
was_successful = True
for method in web_methods:
time.sleep(0.2) # attempt to avoid overloading web server
cls.log.info(f"Running '{method.__name__}' for {dev.ip}:{port}")
try:
method_result = method(dev) # type: bool
if not method_result:
cls.log.warning(
f"No data from HTTP method '{method.__name__}' on {dev.ip}:{port}"
)
else:
dev._cache["num_successful_methods"] += 1
except Exception:
cls.log.exception(f"'{method.__name__}' failed on {dev.ip}:{port}")
was_successful = False
continue
cls.log.info(
f"Finished pulling data via web interface from {dev.ip}:{port} "
f"({dev._cache['num_successful_methods']} methods were "
f"successful out of {len(web_methods)} methods attempted)"
)
return was_successful
@classmethod
def _pull(cls, dev: DeviceData) -> bool:
"""
Pull and parse all configuration files and data from the SEL relay.
"""
# If the user hasn't specified a protocol to use, there's a
# serial port configured for the host, and the type of operation
# is serial, then do a serial pull.
if dev.options["sel"]["force_serial_pull"] or (
dev.serial_port and state.comm_type == "serial"
):
return cls.pull_serial(dev)
if not dev.options["sel"]["pull_methods"]:
cls.log.error(f"The 'sel.pull_methods' option is empty or null for {dev.ip}")
return False
for method in dev.options["sel"]["pull_methods"]:
if method not in cls.default_options["sel"]["pull_methods"]:
cls.log.error(
f"Invalid 'sel.pull_methods' method '{method}' for {dev.ip}, it must "
f"be one of {cls.default_options['sel']['pull_methods']}"
)
return False
pull_results = {} # type: dict[str, bool]
files_pulled = False
for method in dev.options["sel"]["pull_methods"]:
if dev.service_status({"protocol": method}) == "closed":
cls.log.warning(f"Failed to pull {method} on {dev.ip}: {method} port is closed")
continue
# TODO: handle case where dev._is_verified is False,
# e.g. if pull() is called directly without a scan.
if method == "telnet":
pull_files = False
if (
not files_pulled and dev.options["sel"]["allow_telnet_file_download"]
) or dev.options["sel"]["force_telnet_file_download"]:
pull_files = True
pull_results[method] = cls.pull_telnet(dev, pull_files=pull_files)
if pull_results[method] and pull_files:
files_pulled = True
elif method == "ftp":
pull_results[method] = cls.pull_ftp(dev)
if pull_results[method]:
files_pulled = True
else:
web_protocol = None
if dev._cache.get("web_protocol"):
web_protocol = dev._cache["web_protocol"]
else:
for proto in ["http", "https"]:
s = dev.retrieve("service", {"protocol": proto})
if s and (s.enabled or s.status == "open"):
web_protocol = proto
if web_protocol:
if not dev._cache.get("web_protocol"):
dev._cache["web_protocol"] = web_protocol
try:
# pull using the open protocol (http or https)
pull_results[method] = cls.pull_http(dev, web_protocol)
except Exception:
cls.log.exception(
f"Web pull via '{web_protocol}' of {dev.ip} "
f"failed due to unhandled exception"
)
pull_results[method] = False
else:
cls.log.warning(
f"Failed to determine HTTP/HTTPS protocol for {dev.ip},"
f"falling back to plain 'http'."
)
pull_results[method] = cls.pull_http(dev, "http")
if not pull_results[method]:
cls.log.warning(f"{method} pull failed for {dev.ip}")
else:
cls.log.debug(f"{method} pull successful for {dev.ip}")
return all(m for m in pull_results.values())
@classmethod
def _push(
cls,
dev: DeviceData,
to_push: str | bytes | Path,
push_type: consts.PushType,
) -> bool:
"""
Reconfigure the SEL device by writing config files.
.. warning::
Modifying the port configuration (SET_P*) will kill any connections
to that port. This can affect the port PEAT is using to push configs!
.. note::
Filenames must be capitalized, e.g. SET_ALL.TXT or SET_1.TXT
Args:
dev: Device to push to
to_push: Path to relay configuration files to upload (SET_*.TXT)
push_type: this should be 'config'
Returns:
If the push was successful
"""
if push_type != "config":
cls.log.critical(f"Unsupported push type {push_type}, expected 'config'")
return False
if not isinstance(to_push, Path):
cls.log.error(
f"Expected path for push, got type '{type(to_push)}'. "
f"Ensure you specify a path to a config file or directory "
f"with configs to upload, not raw data."
)
return False
# Ensure we can connect via FTP
# TODO: before pushing, read FTP user out of config
# just do: dev.lookup("service", {"protocol": "ftp"}).user
if not cls._setup_ftp(dev):
cls.log.error(f"FTP push failed for {dev.ip}: failed setup")
return False
# TODO: Ability to push using a SET_ALL to reconstruct files, e.g.
# peat push -d selrelay -i 192.0.2.123 -- SET_ALL.TXT
# NOTE: I don't think we can push the actual SET_ALL.TXT,
# may return a permission denied error.
# TODO: support pushing multiple configs at once, e.g.
# peat push -d selrelay -i 192.0.2.123 -- SET_1.TXT SET_6.TXT
# This will require modifying the push API and CLI args a bit
# to support multiple filenames as arguments.
# TODO: rebuild connection on failures, e.g. timeout mid-push
# TODO: if SET_P* file(s) have a different IP, they need to be
# done last, otherwise the subsequent configs will fail.
# sort config update order so SET_P* files are last, with
# ethernet ports at the end.
# TODO: if SET_P in config name, then prepare ftp connection
# to reconnect/rebuild, and update the IP used to connect
# if it was changed. set a internal PEAT flag indicating
# a IP change occurred during the same run.
# TODO: option to pull current configs and only push those whose hash changed
# peat push -d selrelay -i 192.0.2.123 -- SET_1.TXT
cls.log.info(f"Reading config(s) from {to_push}")
if to_push.is_dir():
config_files = sorted(to_push.glob("SET_*.TXT"))
if "SET_ALL.TXT" in (x.name for x in config_files):
cls.log.warning(f"Ignoring SET_ALL.TXT in config files for push to {dev.ip}.")
config_files = [x for x in config_files if x.name != "SET_ALL.TXT"]
if not config_files:
cls.log.error(f"Push failed: couldn't find any configs in {to_push.as_posix()}")
return False
cls.log.debug(f"Using {len(config_files)} config files from dir {to_push}")
elif to_push.is_file():
# TODO: support pushing *.rdb files (just extract the set_all), e.g.
# peat push -d selrelay -i 192.0.2.123 -- sel_351.rdb
# We'll need to implement splitting SET_ALL into separate config files
# before implementing this.
config_files = [to_push]
cls.log.debug(f"Push: using config file {to_push.as_posix()}")
else:
cls.log.critical(f"Invalid file for push to {dev.ip}: {to_push}")
return False
cls.log.trace(f"config_files: {config_files}")
try:
with FTP(dev.ip, dev.options["ftp"]["port"], 120.0) as relay:
if not relay.login(
user=dev.options["ftp"]["user"], passwd=dev.options["ftp"]["pass"]
):
cls.log.error(f"Failed to push config to {dev.ip}: FTP login failed")
return False
delay = dev.options["ftp"]["pull_delay"]
cls.log.debug(
f"FTP logged in to {dev.ip}, sleeping for {delay} "
f"seconds before running getwelcome()..."
)
time.sleep(delay)
if not relay.getwelcome():
cls.log.error(
f"Failed to push config to {dev.ip}: "
f"getwelcome failed after login succeeded"
)
return False
cls.log.debug(
f"Sleeping for {delay} seconds before pushing configs to {dev.ip}..."
)
time.sleep(delay)
# Find where the configs are
# According to the documentation, all 700-series relays
# put their configs in the root directory. However, this
# is not always the case, as we've seen a directory structure
# on a 700G with the fancy front panel display.
if not dev.extra.get("file_listing") or not dev.extra.get(
"settings_root_directory"
):
populate_file_listing(dev, relay)
# Push only the files on the relay already and in
# the order the relay displays them
settings_root = dev.extra["settings_root_directory"]
on_relay = [x.upper() for x in dev.extra["file_listing"][settings_root]]
# Compare local file names with those on the device
local_names = [x.name for x in config_files]
local_not_on_relay = [x for x in local_names if x not in on_relay]
if local_not_on_relay:
cls.log.warning(
f"{len(local_not_on_relay)} files being pushed are not "
f"currently present on the relay (they're new files). "
f"Ensure you are pushing the right files to the "
f"appropriate device! List of files: "
f"{local_not_on_relay}"
)
on_relay_not_local = [x for x in on_relay if x not in local_names]
if on_relay_not_local:
cls.log.debug(
f"{len(on_relay_not_local)} files exist on the relay, "
f"but are not present in the configs being pushed. This "
f"is expected if you're pushing a subset of configs "
f"(e.g. just SET_1.TXT). List of files: "
f"{on_relay_not_local}"
)
# If the settings are in a "SETTINGS" directory, then change
# directories before transferring configs ("cd SETTINGS")
if settings_root == "SETTINGS":
cls.log.info("Changing directory to 'SETTINGS' before pushing configs")
relay.cd("/SETTINGS")
file_delay = 2
cls.log.info(
f"Preparing to transfer {len(config_files)} config "
f"files to {dev.ip}. This will take at least "
f"{file_delay * (len(config_files) + 1)} seconds."
)
transfer_start = timeit.default_timer()
# Transfer all configs except communication port configs
for conf_file in config_files:
if "SET_P" not in conf_file.name:
cls.log.info(
f"Transferring config '{conf_file.name}' ({conf_file.as_posix()})"
)
with conf_file.open("rb") as f:
relay.upload_text(conf_file.name, f)
cls.log.info(f"{conf_file.name} config sent")
cls.log.debug(f"Sleeping for {file_delay} seconds...")
time.sleep(file_delay)
# Transfer configuration port configs
for conf_file in config_files:
if "SET_P" in conf_file.name:
cls.log.info(
f"Transferring communication port config "
f"'{conf_file.name}' ({conf_file.as_posix()})"
)
with conf_file.open("rb") as f:
relay.upload_text(conf_file.name, f)
cls.log.info(f"{conf_file.name} communication port config sent")
cls.log.debug(f"Sleeping for {file_delay} seconds...")
time.sleep(file_delay)
transfer_time = timeit.default_timer() - transfer_start
cls.log.info(
f"Completed transfer of {len(config_files)} config "
f"files to {dev.ip} in {utils.fmt_duration(transfer_time)}"
)
# Prevent disconnection errors from skipping restart
try:
relay.disconnect()
except Exception:
pass
cls.log.info(f"Completed configuration of relay {dev.ip} via FTP")
# Reboot the device if 'sel.restart_after_push' is true
if dev.options["sel"]["restart_after_push"]:
pre_delay = 5
cls.log.info(f"Waiting {pre_delay} seconds before restarting {dev.ip}")
time.sleep(pre_delay)
return cls.restart_relay_telnet(dev)
else:
cls.log.info(
"Note: the relay was NOT restarted, as most config changes "
"don't require a reboot to take effect. If you need to restart "
"the relay, re-run the push with 'sel.restart_after_push' set to true."
)
return True
except CommError as ex:
cls.log.debug(f"Failed to push config to {dev.ip}: connection failed")
cls.log.trace(f"Exception: {ex}")
return False
except Exception as ex:
cls.log.warning(
f"Failed to push config to {dev.ip} due to an unhandled exception: {ex}"
)
return False
@classmethod
def _setup_ftp(cls, dev: DeviceData) -> bool:
cls.log.trace2(f"_setup_ftp() for {dev.ip}")
if dev.options["ftp"].get("user") and dev.options["ftp"].get("pass"):
return True
port = dev.options["ftp"]["port"]
if not dev.retrieve("service", {"port": port}):
if not check_tcp_port(dev.ip, port, reset=True):
cls.log.error(f"Failed FTP setup: TCP port {port} is not open on {dev.ip}")
return False
else:
svc = Service(port=port, transport="tcp", status="open")
dev.store("service", svc, lookup="port")
# TODO: document this behavior
# formally codify this as "default fallback behavior"
# based on what we know from manuals and experience
# with different models. if creds are known, then
# they should be specified in the PEAT config YAML.
if not dev._runtime_options.get("ftp"):
dev._runtime_options["ftp"] = deepcopy(cls.default_options["ftp"])
elif not dev._runtime_options["ftp"].get("creds"):
dev._runtime_options["ftp"]["creds"] = deepcopy(cls.default_options["ftp"]["creds"])
# Set informed defaults if login credentials aren't manually specified
if dev.options["ftp"]["creds"] == cls.default_options["ftp"]["creds"]:
cls.log.trace(f"FTP creds for {dev.ip} are: INFORMED DEFAULTS")
if dev.description.model in ["351S", "700G", "710", "751"]:
utils.move_item(dev._runtime_options["ftp"]["creds"], 0, ("FTPUSER", "TAIL"))
elif dev.description.model in ["451", "411L", "487E"]:
utils.move_item(dev._runtime_options["ftp"]["creds"], 0, ("2AC", "TAIL"))
dev._runtime_options["ftp"]["creds"].insert(1, ("ACC", "OTTER"))
elif dev.description.model in ["351"]:
utils.move_item(dev._runtime_options["ftp"]["creds"], 0, ("FTP", "TAIL"))
elif dev.description.model in ["2032"]:
utils.move_item(dev._runtime_options["ftp"]["creds"], 0, ("2AC", "TAIL"))
else:
cls.log.trace(f"FTP creds for {dev.ip} are: USER PROVIDED")
timeout = dev.options["ftp"]["timeout"]
delay = dev.options["ftp"]["pull_delay"]
attempts = 0
for creds in dev.options["ftp"]["creds"]:
attempts += 1
try:
with FTP(dev.ip, port, timeout) as relay:
if not relay.login(creds[0], creds[1]):
cls.log.trace(f"FTP login creds {creds} failed for {dev.ip}")
continue
cls.log.trace(
f"FTP logged in to {dev.ip}, sleeping for {delay} "
f"seconds before running getwelcome()..."
)
time.sleep(delay)
if not relay.getwelcome():
cls.log.warning(
f"FTP setup failed for {dev.ip}: "
f"getwelcome failed after login succeeded"
)
return False
cls.log.debug(f"FTP login succeeded on {dev.ip} after {attempts} attempt(s)")
dev._runtime_options["ftp"]["user"] = creds[0]
dev._runtime_options["ftp"]["pass"] = creds[1]
dev.related.user.add(creds[0])
ftp_svc = Service(
protocol="ftp", port=port, status="verified", transport="tcp"
)
dev.store("service", ftp_svc, lookup=["protocol", "port"])
return True
except CommError as ex:
cls.log.debug(f"FTP setup failed for {dev.ip}: connection failed")
cls.log.trace(f"Exception: {ex}")
return False
except Exception as ex:
cls.log.warning(
f"FTP verification failed for {dev.ip} due to an unhandled exception: {ex}"
)
return False
cls.log.debug(
f"FTP setup failed for {dev.ip}: no credentials were valid ({attempts} attempts)"
)
return False
@classmethod
def _parse(cls, file: Path, dev: DeviceData | None = None) -> DeviceData | None:
# "file" can be one of the following:
# - a SET_ALL.TXT file
# - a *.rdb file
# - CFG.TXT, SER.TXT, CSER.TXT, *.CID, and others
#
# Some examples are in peat/tests/modules/sel/data_files/
raw_data = file.read_bytes()
# CFG.TXT, SER.TXT, CSER.TXT, HISTORY.TXT, CHISTORY.TXT, *.CID
f_name = file.name.lower()
if "cfg" in f_name or "ser" in f_name or "history" in f_name or f_name.endswith(".cid"):
if not dev:
dev = datastore.get(f"selrelay_{file.stem.lower()}", "id")
if f_name.endswith(".cid"):
cid_extracted = process_cid_file(raw_data, file, dev)
if not cid_extracted:
return None
elif "ser" in f_name or "history" in f_name:
if not event_data_present(raw_data):
cls.log.warning(f"No event data in {file.name}")
return dev
events, info = parse_and_process_events(raw_data, file.name, dev)
if not events and not info:
return None
else:
parse_cfg_txt(raw_data, dev)
cls.update_dev(dev) # Populate any fields that are unset
return dev
# NOTE: isOleFile will interpret the argument as a filename string
# if the length of the data is smaller than 1536 bytes. To avoid this,
# we always pass it a file pointer.
with file.open("rb") as fp:
is_rdb = olefile.isOleFile(fp)
# *.rdb SEL project files use a file structure called
# Compound File Binary (CFB) Format or Microsoft OLE2 File.
# isOleFile() checks the file for the magic number at the
# start of the header to determine if it is the correct format.
temp_path = None
if is_rdb:
cls.log.debug("Input data is in CFB format, parsing as a RDB project")
to_parse = parse_rdb(raw_data) # *.rdb
# Save the extracted SET_ALL data from the rdb to a temporary file
# This is helpful if debugging an issue with parsing of rdb files
temp_path = utils.write_temp_file(to_parse, "extracted_SET_ALL.txt")
else:
cls.log.debug("Input data is text, parsing as a non-RDB config file")
to_parse = raw_data.decode("utf-8") # SET_ALL.TXT
dev = cls.parse_config(set_all=to_parse, dev=dev)
if not dev:
return None
if is_rdb:
dev.write_file(raw_data, "raw-rdb-config.rdb")
if temp_path and temp_path.exists():
utils.move_file(temp_path, dev.get_out_dir())
dev.logic.file.local_path = file
cls.update_dev(dev) # Populate any fields that are unset
return dev
@classmethod
def parse_config(cls, set_all: str, dev: DeviceData | None = None) -> DeviceData | None:
try:
parsed, dev = parse_set_all(set_all_data=set_all, dev=dev)
except Exception:
cls.log.exception("Failed config parsing due to a unhandled exception")
return None
dev.write_file(parsed, "parsed-config.json")
if dev.description.model:
cls._check_model(dev.description.model, dev.description.model)
# OS information (this is inferred based on fact it's an SEL relay)
dev.os.family = "rtos"
dev.os.name = "ThreadX"
dev.os.vendor.name = "Express Logic"
# Set the device name if not already set. Key used varies
# depending on how it's configured and the device model.
for id_key in ["relay_id", "terminal_id", "station_id"]:
id_val = parsed.get(id_key)
if id_val:
dev.extra[id_key] = id_val # Store in extra
if not dev.name: # Set the name if not set
dev.name = parsed[id_key]
# Only store the logic portions of the config as the "logic"
logic_keys = ["close_logic", "output_logic", "protection_schemes", "trip_logic"]
raw_logic = {}
formatted_logic = {}
for k in logic_keys:
if k in parsed:
raw_logic[k] = parsed[k]
if k == "protection_schemes":
formatted_logic[k] = parsed[k]
else:
formatted_logic[k] = {
group: ",".join(f"{k}={v}" for k, v in logic.items())
for group, logic in parsed[k].items()
}
else:
cls.log.warning(f"No logic key {k} in config, not storing in logic...")
formatted_logic = pformat(formatted_logic)
# Save the configuration and logic to the data model and a file
dev.logic.original = set_all
dev.logic.parsed = formatted_logic
dev.write_file(raw_logic, "raw-logic.json")
dev.write_file(formatted_logic, "formatted-logic.txt")
dev.write_file(set_all, "raw-setall-config.txt")
return dev
[docs]
@classmethod
def restart_relay_telnet(cls, dev: DeviceData) -> bool:
"""
Execute a system restart on the SEL relay via Telnet.
"""
cls.log.info(f"Restarting device {dev.ip} via Telnet...")
with SELTelnet(
ip=dev.ip,
port=dev.options["telnet"]["port"],
timeout=dev.options["telnet"]["timeout"],
) as tn:
if not tn.elevate(2, dev.options["sel"]["creds"]):
cls.log.error(f"Failed to restart {dev.ip} via Telnet: login failed")
return False
dev.related.user.update(tn.successful_usernames)
if tn.restart_device():
cls.log.info(f"Successfully restarted device {dev.ip} via Telnet")
return True
cls.log.error(f"Failed to restart device {dev.ip} via Telnet")
return False
[docs]
def sel_port_check(dev: DeviceData, protocol: str) -> bool:
"""lambda function set the TCP RST flag for SEL scanning."""
return check_tcp_port(dev.ip, dev.options[protocol]["port"], reset=True)
SELRelay.ip_methods = [
# NOTE: We are explicitly NOT port checking FTP. See the note in the
# docstring at the top of this file.
IPMethod(
name="SEL Relay Telnet login",
description=str(SELRelay._verify_telnet.__doc__).strip(),
type="unicast_ip",
identify_function=SELRelay._verify_telnet,
reliability=6,
protocol="telnet",
transport="tcp",
default_port=23,
port_function=functools.partial(sel_port_check, protocol="telnet"),
),
IPMethod(
name="SEL Relay HTTP login",
description=str(SELRelay._verify_http.__doc__).strip(),
type="unicast_ip",
identify_function=functools.partial(SELRelay._verify_http, protocol="http"),
# 1 higher than HTTPS's reliability so we prefer HTTP over HTTPS
# for older or less robust devices, like the relays.
reliability=8,
protocol="http",
transport="tcp",
default_port=80,
port_function=functools.partial(sel_port_check, protocol="http"),
),
# NOTE: the SEL-2730M switch exclusively uses HTTPS and doesn't allow plain HTTP
IPMethod(
name="SEL Relay HTTPS login",
description=str(SELRelay._verify_http.__doc__).strip(),
type="unicast_ip",
identify_function=functools.partial(SELRelay._verify_http, protocol="https"),
reliability=7,
protocol="https",
transport="tcp",
default_port=443,
port_function=functools.partial(sel_port_check, protocol="https"),
),
]
SELRelay.serial_methods = [
SerialMethod(
name="SEL Relay serial",
description=str(SELRelay._verify_serial.__doc__).strip(),
type="direct",
identify_function=SELRelay._verify_serial,
reliability=5,
)
]
__all__ = ["SELRelay"]