"""
PEAT module for Sandia SCEPTRE virtual field devices.
Sandia SCEPTRE virtual field control devices (vFCDs), including the
SCEPTRE Virtual RTU and SCEPTRE Virtual Relay.
SCEPTRE virtual relays and RTUs are running a FTP server, which is used to
pull the device configuration XML file as well as the firmware binary image.
The XML file is then parsed and the extracted information saved.
Development, testing, and demonstration of this module is possible without a
running SCEPTRE instance by setting up a local FTP server using the Python Twisted
framework. To do so, run the following command in a terminal:
.. code-block:: bash
pip3 install Twisted
sudo -H $(which twistd) -n ftp --auth=anonymous -r examples/devices/sceptre/
Then, in another terminal, run peat:
.. code-block:: bash
peat pull -d sceptre -i 127.0.0.1 --assume-online -v -c ./examples/peat-config-sceptre-testing.yaml
That config file sets the following options:
- ftp.port: 2121
- ftp.user: anonymous
- ftp.pass: anonymous
- sceptre.ftp_testing: true
Listening services
- FTP (TCP 21)
Data collected
- Full device configuration file
- Device firmware binary image
- Device type (e.g. "Relay" or "RTU")
- Device name (e.g. "relay-louie")
- Cycle time
- Logic
Authors
- Christopher Abate
- Christopher Goes
""" # noqa: E501
from pathlib import Path, PurePosixPath
from random import randint
from xml.etree.ElementTree import Element, fromstring
from peat import (
IO,
DeviceData,
DeviceModule,
Interface,
IPMethod,
Register,
Service,
Tag,
config,
datastore,
)
from peat.protocols import FTP
[docs]
class SCEPTRE(DeviceModule):
"""
SCEPTRE Virtual RTUs and Virtual Relays.
"""
device_type = "RTU"
vendor_id = "Sandia"
vendor_name = "Sandia National Laboratories"
model = "SCEPTRE"
brand = "SCEPTRE"
filename_patterns = ["*.xml", "config.xml"]
default_options = {
"ftp": {
"user": "sceptre",
"pass": "sceptre",
},
"sceptre": {
"ftp_testing": False,
"bennu_filename": "bennu-field-deviced.firmware",
},
}
annotate_fields = {
"os.name": "Ubuntu",
"os.vendor.name": "Canonical",
"os.vendor.id": "Canonical",
}
@classmethod
def _verify_ftp(cls, dev: DeviceData) -> bool:
"""
Verify it's a SCEPTRE Bennu device by logging in with FTP and
checking if any file named "bennu" is on the server.
"""
port = dev.options["ftp"]["port"]
timeout = dev.options["ftp"]["timeout"]
ftp_testing = dev.options["sceptre"]["ftp_testing"]
# Workaround hack for edge cases where fields get overwritten by other modules
username = dev.options["ftp"].get("user")
password = dev.options["ftp"].get("pass")
if not username:
username = cls.default_options["ftp"]["user"]
if not password:
password = cls.default_options["ftp"]["pass"]
failed = ""
file_list = []
try:
with FTP(dev.ip, port, timeout) as ftp:
ftp.ftp.getwelcome()
if not ftp.login(username, password):
failed = "login failed"
elif ftp_testing:
dir_result = ftp.dir()
if not dir_result:
failed = "(FTP TESTING) file listing failed ('dir' command)"
elif "bennu" not in str(dir_result):
failed = "(FTP TESTING) bennu firmware not in file listing"
else:
file_list = dir_result[0]
elif not ftp_testing and "bennu" not in str(ftp.nlst()):
nlst_result = ftp.nlst()
if not nlst_result:
failed = "file listing failed ('nlst' command)"
elif "bennu" not in str(nlst_result):
failed = "bennu firmware not in file listing"
else:
file_list = nlst_result
except Exception as ex:
failed = str(ex)
if failed:
cls.log.debug(f"Failed to verify FTP for {dev.ip}: {failed}")
return False
if file_list:
dev._cache["file_list"] = file_list
dev.related.files.update(file_list)
for file in file_list:
if file.startswith("bennu-field-deviced") or ".firmware" in file:
if not dev._runtime_options.get("sceptre"):
dev._runtime_options["sceptre"] = {}
dev._runtime_options["sceptre"]["bennu_filename"] = file
break
if not dev.options["ftp"].get("user"):
dev._options["ftp"]["user"] = username
if not dev.options["ftp"].get("pass"):
dev._options["ftp"]["pass"] = password
dev.related.user.add(username)
svc = Service(protocol="ftp", port=port, transport="tcp", status="verified")
dev.store("service", svc, interface_lookup={"ip": dev.ip})
return True
@classmethod
def _download_ftp(
cls, dev: DeviceData, check_for: str, extension: str
) -> tuple[bytes | None, Path | None]:
try:
with FTP(
ip=dev.ip,
port=dev.options["ftp"]["port"],
timeout=dev.options["ftp"]["timeout"],
) as ftp:
username = dev.options["ftp"]["user"]
password = dev.options["ftp"]["pass"]
if not ftp.login(username, password):
return None, None
dev.related.user.add(username)
filename = ftp.find_file(check_for, extension, "/")
if not filename:
return None, None
data = ftp.download_binary(filename, save_to_file=False)
downloaded_path = None
if filename.startswith("/"):
filename = filename[1:]
if config.DEVICE_DIR:
downloaded_path = dev.write_file(
data=data,
filename=filename,
out_dir=dev.get_sub_dir("ftp_files"),
)
return data, downloaded_path
except Exception as ex:
cls.log.debug(f"Failed to get FTP file {check_for} from {dev.ip}: {ex}")
return None, None
@classmethod
def _pull(cls, dev: DeviceData) -> bool:
result = True
if not cls.pull_firmware(dev):
result = False
if not cls.pull_config(dev):
result = False
return result
[docs]
@classmethod
def pull_config(cls, dev: DeviceData) -> bool:
"""
Retrieves the device configuration via FTP.
The SCEPTRE vFCDs use a single XML file for their configuration,
called 'config.xml' by default.
"""
data, file_path = cls._download_ftp(dev, "config", ".xml")
if data is None:
cls.log.error(f"Failed to pull config from {dev.ip}")
return False
try:
# If file output is disabled there won't be a file on disk
if file_path:
cls.parse_config(file_path, dev)
dev.related.files.add(file_path.name)
else:
cls.parse_config(data, dev)
dev.related.files.add("config.xml")
except Exception as err:
cls.log.exception(f"Failed to parse SCEPTRE config: {err}")
return False
return True
@classmethod
def pull_firmware(cls, dev: DeviceData) -> bytes:
filename = dev.options["sceptre"]["bennu_filename"]
data, file_path = cls._download_ftp(dev, filename, "")
if data is None:
cls.log.error(f"Failed to pull firmware from {dev.ip}")
return b""
dev.firmware.original = data
# If file output is disabled there won't be a file on disk
if file_path:
dev.firmware.file.local_path = file_path
else:
dev.firmware.file.device = dev.get_id()
dev.firmware.file.name = filename
dev.firmware.file.directory = "/"
dev.firmware.file.path = PurePosixPath("/", filename)
dev.related.files.add(filename)
dev.populate_fields()
return data
@classmethod
def _upload_ftp(cls, dev: DeviceData, filename: str, content: str | bytes) -> bool:
try:
with FTP(
ip=dev.ip,
port=dev.options["ftp"]["port"],
timeout=dev.options["ftp"]["timeout"],
) as ftp:
username = dev.options["ftp"]["user"]
password = dev.options["ftp"]["pass"]
if not ftp.login(username, password):
return False
dev.related.user.add(username)
if isinstance(content, str):
ftp.upload_text(filename, content)
elif isinstance(content, bytes):
ftp.upload_binary(filename, content)
else:
cls.log.error(f"Cannot upload data with type '{type(content).__name__}'")
except Exception as err:
cls.log.debug(f"Failed to upload file {filename} to {dev.ip}: {err}")
return False
return True
@classmethod
def _parse(cls, file: Path, dev: DeviceData | None = None) -> DeviceData:
return cls.parse_config(file=file, dev=dev)
[docs]
@classmethod
def parse_config(cls, file: Path | bytes | str, dev: DeviceData | None = None) -> DeviceData:
"""
Parse a SCEPTRE field device XML configuration file.
.. note::
Examples of configs can be found on the
`bennu GitHub <https://github.com/sandialabs/sceptre-bennu/tree/main/data/configs>`__,
in addition to the ones in the PEAT repo.
"""
# Read the config data from the file
if isinstance(file, Path):
raw_config = file.read_text(encoding="utf-8")
elif isinstance(file, bytes):
raw_config = file.decode()
else:
raw_config = file
# Get the root section and basic information
root = fromstring(raw_config).find("field-device")
name = _ele(root, "name")
if not dev:
if not name:
name = f"UNKNOWN_{randint(0, 9999)}"
dev = datastore.get(name, "name")
if name and not dev.name:
dev.name = name
# Minor hack to determine device type by the name of the device,
# which is a common practice in SCEPTRE topologies
if dev.name:
if "rtu" in dev.name.lower():
dev.type = "RTU"
elif "relay" in dev.name.lower():
dev.type = "Relay"
elif "fep" in dev.name.lower():
dev.type = "FEP"
cycle_time = _ele(root, "cycle-time")
if cycle_time:
dev.extra["cycle_time"] = int(cycle_time)
# Parse I/O tags
tags_element = root.find("tags")
if tags_element is not None:
for tag_element in list(tags_element):
tag = Tag()
for key in ["name", "io", "type"]:
if _ele(tag_element, key):
setattr(tag, key, _ele(tag_element, key))
dev.store("tag", tag)
else:
cls.log.warning(f"No tags found in config for {dev.get_id()}")
# Parse network communication modules (DNP3, Modbus/TCP, etc.)
comms = root.find("comms")
if comms is not None and len(comms):
comm_mods = list(comms)
# If client and server, then device is probably a FEP
if (
not dev.type
and any("client" in e.tag for e in comm_mods)
and any("server" in e.tag for e in comm_mods)
):
dev.type = "FEP"
for mod in comm_mods:
extract_interface_info(mod, dev) # Comm interface data
# Parse protocol tags
section_names = {
"analog-input",
"binary-input",
"binary-output",
"coil",
"discrete-input",
"input-register",
}
for child in list(mod):
if child.tag in section_names:
reg = Register(
protocol="_".join(mod.tag.split("-")[:-1]),
read_write="read_write",
measurement_type=child.tag.split("-")[0],
address=_ele(child, "address"),
tag=_ele(child, "tag"),
)
dev.store("registers", reg)
# Sort registers for determinism
# TODO: improve sort order, current version is wacky
dev.registers.sort()
else:
cls.log.error(f"No 'comms' element found in config for {dev.get_id()}")
# I/O modules (connected to the SCEPTRE simulation)
# "input-module" and "output-module" are legacy from older version of bennu
for module_name in ["input", "output", "input-module", "output-module"]:
module_element = root.find(module_name)
if module_element is None:
continue
for child in list(module_element):
if child.tag in ["analog", "binary"]:
name = ""
desc = ""
# Legacy format
d_ele = child.find("device")
if d_ele is not None:
# Update existing tag descriptions with SCEPTRE I/O info
# Example: "Tank1 - level_setpoint - simulink"
d_info = [_ele(d_ele, k) for k in ["name", "field", "provider"]]
name = _ele(d_ele, "name")
desc = " - ".join(d for d in d_info if d)
# Current format
elif _ele(child, "name"):
name = _ele(child, "name")
io_id = _ele(child, "id")
if not io_id:
cls.log.warning(f"No 'id' for element: {child}")
continue
if desc:
# Add description to existing tag
tag = Tag(description=desc)
dev.store("tag", tag, lookup={"io": io_id})
io_obj = IO(
id=io_id,
name=name,
type=child.tag,
direction=module_name.split("-")[0],
description=desc,
)
dev.store("io", io_obj, lookup="id")
# Sort tags for determinism
if dev.tag:
# TODO: improve sort order, current version is wacky
dev.tag.sort()
# Extract the device logic, if it exists
logic = _ele(root, "logic")
# Legacy name "logic-module", with subsection of "logic"
if not logic:
logic_module = root.find("logic-module")
if logic_module is not None:
logic = _ele(logic_module, "logic")
if logic:
if not dev.type:
cls.log.debug("Logic is present, device is probably a Relay")
dev.type = "Relay"
parsed_logic = "\n".join(x.strip() for x in logic.splitlines() if x)
dev.logic.original = logic
dev.logic.parsed = parsed_logic
if isinstance(file, Path):
dev.logic.file.local_path = file
else:
dev.logic.file.device = dev.get_id()
dev.logic.file.name = "config.xml"
dev.populate_fields()
elif not dev.type:
cls.log.debug("No logic present, device is probably a RTU")
dev.type = "RTU"
cls.update_dev(dev)
return dev
def extract_interface_info(com: Element, dev: DeviceData) -> None:
# TODO: merge interface info for devices with client and server
# modbus, modbus-tcp, bacnet, dnp3, sunspec-tcp
protocol = "_".join(com.tag.split("-")[:-1])
conn_type = com.tag.split("-")[-1] # client, server
iface = Interface(application=protocol, enabled=True)
svc = Service(protocol=protocol, role=conn_type, enabled=True)
if _ele(com, "address") is not None:
svc.protocol_id = _ele(com, "address")
if _ele(com, "event-logging"):
svc.extra["event_logging"] = _ele(com, "event-logging")
if _ele(com, "instance"):
svc.extra["instance"] = _ele(com, "instance")
if _ele(com, "scan-rate"):
svc.extra["scan-rate"] = _ele(com, "scan-rate")
# Get related IPs from command interface endpoint
if _ele(com, "command-interface"):
cmd_if = _parse_endpoint(_ele(com, "command-interface"))
if cmd_if.get("ip"):
dev.related.ip.add(cmd_if["ip"])
if cmd_if.get("multicast_ip"):
dev.related.ip.add(cmd_if["multicast_ip"])
# TODO: build links to other devices via <x-connection> child tags
# <modbus-client>
# <modbus-connection>
# <endpoint>tcp://127.0.0.1:5502</endpoint>
for child in list(com):
if "connection" in child.tag:
# for now, just add the IPs to related.ip
# want to expand on this in the future with richer linkages
if _ele(child, "endpoint"):
conn_ep = _parse_endpoint(_ele(child, "endpoint"))
if conn_ep.get("ip"):
dev.related.ip.add(conn_ep["ip"])
if conn_ep.get("multicast_ip"):
dev.related.ip.add(conn_ep["multicast_ip"])
endpoint = _ele(com, "endpoint")
ip = _ele(com, "ip")
if endpoint:
parsed_endpoint = _parse_endpoint(endpoint)
if parsed_endpoint.get("ip"):
iface.ip = parsed_endpoint["ip"]
dev.related.ip.add(parsed_endpoint["ip"])
if parsed_endpoint.get("multicast_ip"):
svc.extra["multicast_ip"] = parsed_endpoint["multicast_ip"]
dev.related.ip.add(svc.extra["multicast_ip"])
if parsed_endpoint.get("transport"):
svc.transport = parsed_endpoint["transport"]
if parsed_endpoint.get("port"):
svc.port = parsed_endpoint["port"]
if parsed_endpoint.get("serial_port"):
iface.serial_port = parsed_endpoint["serial_port"]
if parsed_endpoint.get("type"):
iface.type = parsed_endpoint["type"]
if not iface.serial_port:
iface.type = "ethernet"
# Legacy format
elif ip:
# <port>20000</port>
# <ip>192.0.2.3</ip>
iface.ip = ip
iface.type = "ethernet"
svc.transport = "tcp" if protocol != "bacnet" else "udp"
port = _ele(com, "port")
if port:
svc.port = int(port)
if svc.port:
dev.related.ports.add(svc.port)
if svc.protocol:
dev.related.protocols.add(svc.protocol)
dev.store("interface", iface, lookup=["ip", "serial_port", "application"])
dev.store("service", svc, interface_lookup={"application": protocol})
def _parse_endpoint(endpoint: str) -> dict:
if not endpoint:
return {}
data = {} # type: dict[str, Union[str, int]]
if ";" in endpoint:
# <endpoint>udp://172.16.1.2;239.0.0.1:40000</endpoint>
transport, ip, port = endpoint.split(":")
data["ip"] = ip.split(";")[0].replace("//", "")
data["transport"] = transport
data["port"] = int(port)
data["multicast_ip"] = ip.split(";")[1]
elif "://" in endpoint:
# <endpoint>tcp://172.16.1.2:5555</endpoint>
# <endpoint>tcp://192.0.2.2:20000</endpoint>
# <endpoint>udp://127.0.0.1:47808</endpoint>
# <endpoint>tcp://127.0.0.1:20000</endpoint>
transport, ip, port = endpoint.split(":")
data["ip"] = ip.replace("//", "")
data["transport"] = transport
data["port"] = int(port)
elif "/" in endpoint: # Serial
# <endpoint>/tmp/ttyS0</endpoint>
# <endpoint>/dev/ttySerial1</endpoint>
data["serial_port"] = endpoint
data["type"] = "serial"
elif ":" in endpoint: # Not sure if I've seen, just a fallback
data["ip"], port = endpoint.split(":")
data["port"] = int(port)
else: # Legacy, I think
# <endpoint>172.16.254.254</endpoint>
data["ip"] = endpoint
return data
def _ele(root: Element, to_find: str) -> str | None:
found = root.find(to_find)
if found is not None:
return found.text.strip()
return None
SCEPTRE.ip_methods = [
IPMethod(
name="SCEPTRE FTP",
description=str(SCEPTRE._verify_ftp.__doc__).strip(),
type="unicast_ip",
identify_function=SCEPTRE._verify_ftp,
reliability=9,
protocol="ftp",
transport="tcp",
default_port=21,
)
]
__all__ = ["SCEPTRE"]