"""
Core functionality for interrogating a SEL Real-Time Automation Controller (RTAC).
.. note::
The 3530 RTAC (and possibly others ) disable ping (ICMP) responses by default.
If using PEAT against an RTAC on another subnet or VLAN and the online check
fails, try again with the ``--assume-online`` flag set. Another option is to
force TCP SYN checks to be used with port 443.
SEL RTACs store data in a PostgreSQL database, but it can be exported as XML.
Each "page" of the RTAC configuration is its own XML file.
Instead of importing all of the XML files separately, they should be put into
a tarball. PEAT will then decompress it to the file system, read each file
into memory, sort based on file type, and finally delete the files from the file
system. PEAT can then parse the configuration based on what's in-memory.
To test Postgres (connecting to the "3530" database table):
``psql -h <ip> -U <username> -d 3530``
Note: Nginx is the web server used by the SEL RTAC devices.
Listening network services
- HTTP (TCP 80) (This redirects to 443 HTTPS)
- HTTPS (TCP 443)
- PostgreSQL (TCP 5432)
Supported models
- SEL-3530
- SEL-3530-4
- SEL-3350
Authors
- Ryan Vrecenar
- Christopher Goes
"""
import base64
import functools
import math
import re
import tarfile
import xml.etree.ElementTree as ET
from copy import deepcopy
from pathlib import Path
from time import sleep
from typing import Literal
import psycopg2
from peat import (
DeviceData,
DeviceModule,
Interface,
IPMethod,
datastore,
exit_handler,
state,
utils,
)
from peat.consts import DeviceError
from peat.data.data_utils import merge_models
from peat.data.models import User
from peat.data.validators import validate_hash
from peat.protocols import HTTP
from .rtac_parse import (
parse_accesspointrouters,
parse_contact_ios,
parse_devices,
parse_maincontroller,
parse_pous,
parse_systemtags,
parse_tagprocessor,
)
from .sel_consts import RTAC_DB_NAME, RTAC_DB_TABLES, RTAC_TRANSIENT_KEYS
from .sel_http import SELHTTP
[docs]
class SELRTAC(DeviceModule):
"""
SEL Real-Time Automation Controller (RTAC).
"""
device_type = "RTAC"
vendor_id = "SEL"
vendor_name = "Schweitzer Engineering Laboratories"
brand = "SEL"
can_parse_dir = True
filename_patterns = [
# "*.tar.gz"
"*accesspointrouter*.tar.gz",
"*AccessPoints*.tar.gz",
"devices.tar.gz",
"*rtacexport*.tar.gz", # rtacexport3.tar.gz
"*rtac*.tar*",
"*rtac*.tgz",
"Tag Processor.xml",
"SystemTags.xml",
"Main Controller.xml",
"Contact I_O.xml",
]
# These are what's known to work. Others may work as well
supported_models = ["3530", "3530-4", "3350"]
module_aliases = [f"sel-{x}" for x in supported_models]
default_options = {
"web": {
"user": "",
"pass": "",
"users": [
"Admin",
"admin",
"administrator",
],
"passwords": ["admin", "rtac"],
},
"postgres": {
"user": "",
"pass": "",
"users": ["Admin", "admin", "administrator", "rtac"],
"passwords": ["admin", "rtac"],
},
"sel": {
"pull_http": True,
"pull_postgres": True,
"rtac_monitor_enable": False,
"rtac_monitor_count": 3,
"rtac_monitor_pause_for": 4.0,
},
}
_tag_to_parser_map = {
"AccessPointRouter": parse_accesspointrouters,
"Device": parse_devices,
"TagProcessor": parse_tagprocessor,
"ContactIO": parse_contact_ios,
"SystemTags": parse_systemtags,
"MainController": parse_maincontroller,
"POU": parse_pous,
}
@classmethod
def _verify_http(cls, dev: DeviceData, protocol: Literal["http", "https"] = "http") -> bool:
"""
Verify a device is a SEL RTAC via the HTTP web interface.
"""
port = dev.options[protocol]["port"]
timeout = dev.options[protocol]["timeout"]
cls.log.debug(
f"Verifying RTAC HTTP for {dev.ip}:{port} using {protocol} (timeout: {timeout})"
)
session = SELHTTP(dev.ip, port, timeout)
logged_in = False
if dev._cache.get("verified_web_user") and dev._cache.get("verified_web_pass"):
logged_in = session.login_rtac(
dev._cache["verified_web_user"],
dev._cache["verified_web_pass"],
protocol,
)
# Check all user, and pass, only proceed when logged_in is True, or exhausted
else:
if dev.options["web"]["user"]:
users = [dev.options["web"]["user"]]
else:
users = dev.options["web"]["users"]
if dev.options["web"]["pass"]:
passwords = [dev.options["web"]["pass"]]
else:
passwords = dev.options["web"]["passwords"]
for username in users:
cls.log.debug(f"Attempting RTAC login to {dev.ip} with user '{username}'")
for password in passwords:
logged_in = session.login_rtac(username, password, protocol)
if logged_in:
dev._cache["verified_web_user"] = username
dev._cache["verified_web_pass"] = password
dev.related.user.add(username)
break
if logged_in:
break
if logged_in:
try:
dashboard = session.view_dashboard(dev)
except Exception:
cls.log.exception(f"Failed to view dashboard for {dev.ip}")
session.disconnect()
return False
# Check if dashboard has appropriate fields
if (
(not dashboard.get("web_device_info"))
or ("Firmware Version" not in dashboard["web_device_info"])
or ("Host Name" not in dashboard["web_device_info"])
):
session.disconnect()
return False
# Only cache session of it's a known RTAC
if any(m in dev.description.model for m in cls.supported_models) or (
"3530" in dashboard["web_device_info"].get("Firmware Version", "")
and "3530" in dashboard["web_device_info"].get("Host Name", "")
):
# Cache the session using this protocol
# TODO: we should try to prefer HTTPS when possible for security
if not dev._cache.get("web_session"):
dev._cache["web_session"] = session
dev._cache["web_protocol"] = protocol
exit_handler.register(session.disconnect, "CONNECTION")
else:
session.disconnect()
return True
session.disconnect()
cls.log.debug(f"RTAC HTTP verification failed for {dev.ip}:{port}")
return False
@classmethod
def _verify_https_ssl_certificate(cls, dev: DeviceData) -> bool:
"""
Verify a device is a SEL RTAC via inspection of the Common Name
attribute of a SSL certificate.
"""
timeout = dev.options["https"]["timeout"]
port = dev.options["https"]["port"]
cls.log.debug(f"Verifying {dev.ip}:{port} using SSL (timeout: {timeout})")
with HTTP(dev.ip, port, timeout, dev=dev) as http:
parsed_cert = http.get_ssl_certificate()
if not parsed_cert:
return False
merge_models(dev.x509, parsed_cert)
entity = parsed_cert.subject
if not entity.common_name:
entity = parsed_cert.issuer
if not entity.common_name:
return False
dev._cache["selrtac_ssl_fingerprinted"] = True
if "SEL" in entity.common_name and "RTAC" in entity.common_name:
cls.log.debug(f"SSL verification successful for {dev.ip}:{port}")
dev.description.model = entity.common_name.partition(" ")[0].partition("-")[2]
dev.type = "RTAC"
cls.update_dev(dev)
return True
return False
@classmethod
def _verify_postgres(cls, dev: DeviceData) -> bool:
"""
Verify a device is a SEL RTAC via a PostgreSQL database connection.
"""
# TODO: cache the connection in dev._cache
conn = None
curs = None
try:
# If we have a working password from logging in web
try:
user = dev.options["postgres"]["user"]
if not user:
user = dev._cache.get("verified_pg_user")
if not user:
user = "postgres"
kwargs = {
"host": dev.ip,
"port": dev.options["postgres"]["port"],
"dbname": RTAC_DB_NAME,
"user": user,
}
if dev.options["postgres"]["pass"]:
kwargs["password"] = dev.options["postgres"]["pass"]
elif dev._cache.get("verified_pg_pass"):
kwargs["password"] = dev._cache["verified_pg_pass"]
conn = psycopg2.connect(**kwargs)
curs = conn.cursor()
except BaseException: # Always cleanup regardless of error
if conn is not None:
conn.close()
return False
# No valid logged in session created
if conn is None:
return False
dev._cache["verified_pg_user"] = kwargs["user"]
dev.related.user.add(kwargs["user"])
if "password" in kwargs:
dev._cache["verified_pg_pass"] = kwargs["password"]
cmd = "SELECT * from device_info;"
curs.execute(cmd)
conn.commit()
# if no columns in table continue
if curs.description is None:
curs.close()
conn.close()
return False
data = curs.fetchall()
cols = [desc[0] for desc in curs.description]
for datum in data:
for col_idx in range(len(cols)):
if "3530" in datum[col_idx]:
curs.close()
conn.close()
return True
curs.close()
conn.close()
except Exception:
cls.log.exception("CLEANED UP")
if curs is not None:
curs.close()
if conn is not None:
conn.close()
return False
@classmethod
def _pull(cls, dev: DeviceData) -> bool:
# Ensure postgres cursor and connection is properly cleaned up on exit
# TODO: better way of ensuring postgres connections are cleaned up
def _ensure_close_db():
for k in ["pg_cursor", "pg_conn"]:
if dev._cache.get(k):
try:
dev._cache[k].close()
except Exception:
pass
exit_handler.register(_ensure_close_db, "CONNECTION")
# Pull and compare history over time
if dev.options["sel"]["rtac_monitor_enable"]:
pull_count = dev.options["sel"]["rtac_monitor_count"]
pause_time = dev.options["sel"]["rtac_monitor_pause_for"]
cls.log.info(
f"Pulling {pull_count} times with a pause for "
f"{pause_time:.2f} seconds between each pull"
)
dev._cache["history"] = {}
for i in range(2, pull_count):
try:
cls.log.info("Pulling config")
cls.log.info(f"\tRecording history {i}")
cls.update_history(dev)
sleep(dev.options["sel"]["rtac_monitor_pause_for"])
cls.log.info("\tChecking history")
cls.check_history(dev)
except Exception:
cls.log.exception(f"failed history check of {dev.ip}")
return False
return True
# Regular pull and return
cls.pull_config(dev)
return True
# --------- Monitor Functionality ---------
[docs]
@classmethod
def chk_transient_path(cls, path: list[str]) -> bool:
"""Check if path is defined in transient keys."""
for key in RTAC_TRANSIENT_KEYS:
i = 0
# if lengths do not match, proceed
if not len(key) == len(path):
continue
# assume match is true, until proven otherwise
match = True
# iterate over whitelist keys to compare
for pattern in key:
if re.match(pattern, path[i]) is None:
match = False
break
i += 1
if match is True:
return True
return False
@classmethod
def compare_dictionaries(
cls,
dict_1: dict,
dict_2: dict,
dict_1_name: str,
dict_2_name: str,
path: list[str] | None = None,
err: list[str] | None = None,
key_err: list[str] | None = None,
value_err: list[str] | None = None,
) -> tuple[list[str], list[str], list[str]]:
if err is None:
err = []
if key_err is None:
key_err = []
if value_err is None:
value_err = []
for k1, v1 in dict_1.items():
if path is None:
old_path = []
else:
old_path = path[:]
old_path.append(f"{k1}")
# Check if test path in transient keys
if cls.chk_transient_path(old_path):
continue
if k1 not in dict_2:
key_err.append(f"Key {dict_2_name}{old_path} not in {dict_2_name}\n")
elif isinstance(v1, dict) and isinstance(dict_2[k1], dict):
cls.compare_dictionaries(
v1, dict_2[k1], "d1", "d2", old_path[:], err, key_err, value_err
)
elif v1 != dict_2[k1]:
value_err.append(
f"Value of {dict_1_name}{old_path} ({v1}) not same "
f"as {dict_2_name}{old_path} ({dict_2[k1]})\n"
)
for k in dict_2.keys():
if path is None:
old_path = []
else:
old_path = path[:]
old_path.append(f"{k}")
if k not in dict_1:
key_err.append(f"Key {dict_2_name}{old_path} not in {dict_1_name}\n")
return key_err, value_err, err
@classmethod
def update_history(cls, dev: DeviceData) -> None:
cls.pull_config(dev)
timestamp = utils.utc_now().strftime("%Y-%m-%dT%H:%M:%S")
dev._cache["history"][timestamp] = deepcopy(dev.extra)
dev.extra = {}
@classmethod
def check_history(cls, dev: DeviceData) -> None:
if len(dev._cache["history"].keys()) < 2:
cls.log.info("baselined.")
return
d1_name: str = list(dev._cache["history"].keys())[-1]
d2_name: str = list(dev._cache["history"].keys())[-2]
d1 = dev._cache["history"][d1_name]
d2 = dev._cache["history"][d2_name]
cls.log.info(f"Comparing dictionaries '{d1_name}' and '{d2_name}'")
res1 = cls.compare_dictionaries(d1, d2, "d1", "d2")
res2 = ([], [], [])
if (res1[0] == [] and res1[1] == [] and res1[2] == []) and (
res2[0] == [] and res2[1] == [] and res2[2] == []
):
cls.log.info("nominal")
else:
cls.log.error("abnominal")
if state.elastic:
# update action to key that is changed
state.elastic.push(
"alerts",
{
"message": "device configuration changed",
"event": {
"action": "config-change",
"category": "device-config-change",
"kind": {"alert"},
"module": cls.__name__, # "SELRTAC"
"severity": 10,
},
},
)
[docs]
@classmethod
def pull_config(cls, dev: DeviceData) -> None:
"""Pull and parse configuration from an RTAC."""
# NOTE: this method is used by update_history() and _pull()
# NOTE: if web pull fails, attempt postgres pull anyway to make sure we get data
web_exception = None
if dev.options["sel"]["pull_http"]:
try:
cls.pull_web(dev)
cls.update_dev(dev)
except Exception as ex:
cls.log.exception(f"failed http pull from {dev.ip}")
state.error = True
web_exception = ex
else:
cls.log.warning(f"Skipping HTTP pull from {dev.ip} (sel.pull_http=False)")
if dev.options["sel"]["pull_postgres"]:
cls.pull_postgres(dev)
cls.update_dev(dev)
else:
cls.log.warning(f"Skipping postgres pull from {dev.ip} (sel.pull_postgres=False)")
# NOTE: exceptions are handled by pull_api, it doesn't check method
# return status at the moment, so we re-raise web exception here.
if web_exception:
raise web_exception from None
@classmethod
def pull_web(cls, dev: DeviceData, protocol: str | None = None) -> dict | None:
cls.log.info(f"Pulling RTAC data via web interface from {dev.ip}")
if not protocol:
protocol = dev._cache.get("web_protocol", "http")
if not dev._cache.get("web_protocol"):
dev._cache["web_protocol"] = protocol
port = dev.options[protocol]["port"]
timeout = dev.options[protocol]["timeout"]
if not dev._cache.get("web_session"):
cls.log.info(f"No web session cached for {dev.ip}, creating new session...")
dev._cache["web_session"] = SELHTTP(dev.ip, port, timeout)
session = dev._cache["web_session"]
if not session.rtac_logged_in:
username = dev._cache.get("verified_web_user")
if not username:
username = dev.options["web"]["user"]
if not username:
username = "ACC"
password = dev._cache.get("verified_web_user")
if not password:
password = dev.options["web"]["pass"]
if not password:
password = "OTTER"
if not session.login_rtac(username, password, protocol):
cls.log.error(
f"Failed to login to web interface on {dev.ip} with user '{username}'"
)
return None
dev.related.user.add(username)
web_methods = [
session.view_dashboard,
session.view_usagepolicy,
session.view_filesondevice,
session.view_features, # TODO: not working currently
session.view_accounts, # NOTE: added to pull
session.view_user_roles,
session.view_ldap,
session.download_http_files,
session.view_ip_tables, # TODO: not implemented yet
session.view_syslog, # this is slowest, do last
]
cls.log.info(
f"Completed web session setup, beginning pull from "
f"{dev.ip}:{port} using {len(web_methods)} web methods"
)
pulled_config = {}
dev._cache["num_successful_methods"] = 0
for method in web_methods:
sleep(0.3) # attempt to avoid overloading web server
cls.log.info(f"Running '{method.__name__}' for {dev.ip}:{port}")
try:
method_result = method(dev) # type: dict
if not method_result or not any(bool(val) for val in method_result.values()):
cls.log.warning(
f"No data from HTTP method '{method.__name__}' on {dev.ip}:{port}"
)
else:
pulled_config.update(method_result)
dev._cache["num_successful_methods"] += 1
except Exception:
cls.log.exception(f"'{method.__name__}' failed on {dev.ip}:{port}")
continue
if dev._cache["num_successful_methods"] == 0:
dev._cache["web_session"].disconnect()
raise DeviceError(f"all web methods failed from {dev.ip}")
cls.log.info(
f"Finished pulling data via web interface from {dev.ip}:{port} "
f"({dev._cache['num_successful_methods']} methods were "
f"successful out of {len(web_methods)} methods attempted)"
)
dev.write_file(pulled_config, "pulled-web-config.json")
# Close the web session
dev._cache["web_session"].disconnect()
# Remove fields we don't want in elasticsearch
for bad_field in ["web_syslog_events", "web_usagepolicy", "web_device_info"]:
if bad_field in pulled_config:
del bad_field
return pulled_config
@classmethod
def pull_postgres(cls, dev: DeviceData) -> dict | None:
cls.log.info(f"Pulling postgres data from {dev.ip}")
if not dev._cache.get("pg_conn"): # TODO: check if connected
username = dev._cache.get("verified_pg_user")
if not username:
username = dev.options["postgres"]["user"]
if not username:
username = "postgres"
dev.related.user.add(username)
password = dev._cache.get("verified_pg_pass")
if not password:
password = dev.options["postgres"]["pass"]
dev._cache["pg_conn"] = psycopg2.connect(
host=dev.ip,
dbname=RTAC_DB_NAME,
user=username,
password=password,
port=dev.options["postgres"]["port"],
)
# TODO: check if login succeeded, if not return None
conn = dev._cache["pg_conn"]
if not dev._cache.get("pg_cursor"): # TODO: check if connected
dev._cache["pg_cursor"] = conn.cursor()
cursor = dev._cache["pg_cursor"]
config = {}
for table in RTAC_DB_TABLES:
config[table] = {}
command = f"SELECT * from {table};"
cls.execute(cursor, command)
conn.commit()
# if no columns in table continue
if cursor.description is None:
continue
data = cursor.fetchall()
cols = [desc[0] for desc in cursor.description]
for datum in data:
config[table][datum[0]] = {}
for col_idx in range(len(cols)):
# do conversions based on column label
if cols[col_idx] == "data" or cols[col_idx] == "project_image":
# if binary blob, hash binary blob
if datum[col_idx] is None:
config[table][datum[0]][cols[col_idx]] = ""
continue
# TODO: should we save hash in one place and raw in another?
decoded = (base64.b64encode(datum[col_idx].tobytes())).decode()
config[table][datum[0]][cols[col_idx]] = decoded
elif cols[col_idx] in [
"set_dst_start_time",
"set_dst_stop_time",
"set_system_time_dst_start_time",
"set_system_time_dst_stop_time",
"last_access",
"account_creation",
"last_modified_time",
]:
# if datetime object, convert to string
# TODO: use utils.parse_date().isoformat()
# to generate ISO-format time string.
config[table][datum[0]][cols[col_idx]] = str(datum[col_idx])
elif cols[col_idx] == "new_event_reset_location_value":
# if floating point object, and NaN, then convert to -999999
val = datum[col_idx]
if not math.isfinite(datum[col_idx]):
val = float(-999999999)
config[table][datum[0]][cols[col_idx]] = val
else:
config[table][datum[0]][cols[col_idx]] = datum[col_idx]
# TODO: compare device data with what's pulled from web and emit warning if it differs
# e.g. firmware rev, checksums, etc.
dev_info = config.get("device_info")
if dev_info:
if len(dev_info) > 1:
cls.log.warning(
f"Multiple devices found in PostgreSQL data "
f"in 'device_info' from {dev.ip}: {dev_info}"
)
first = next(iter(dev_info.values()))
host_name = first.get("host_name")
if host_name and not dev.hostname:
dev.hostname = host_name
dev.related.hosts.add(host_name)
dev_name = first.get("device_name")
if dev_name and not dev.name:
dev.name = dev_name
dev.related.hosts.add(dev_name)
location = first.get("location") # TODO: is this lat/lon?
if location and not dev.geo.name:
dev.geo.name = location
dev.extra["postgres_device_info"] = dev_info
fw_checksum = first.get("current_fw_checksum")
if fw_checksum:
if not dev.firmware.checksum:
dev.firmware.checksum = fw_checksum
# This may or may not be an actual hash, and in some
# cases it's definitely not, e.g. "B0AB".
try:
checksum_hash = validate_hash(fw_checksum)
dev.related.hash.add(checksum_hash)
if not dev.firmware.hash.md5 and len(fw_checksum) == 32:
dev.firmware.hash.md5 = checksum_hash
except ValueError:
pass
desc = first.get("description")
if desc and not dev.description.description:
dev.description.description = desc
fw_rev = first.get("fw_release_version")
if fw_rev and not dev.firmware.revision:
dev.firmware.revision = fw_rev
projects = config.get("projects", {})
if projects:
if len(projects) > 1:
cls.log.warning(
f"{len(projects)} projects on {dev.ip}, using "
f"metadata from first one to populate "
f"fields for 'host.logic'"
)
projs = list(projects.values())
if not projs:
cls.log.error(f"Failed to get a project for {dev.ip}")
state.error = True
else:
proj = projs[0]
dev.logic.name = proj["project_name"]
dev.logic.description = proj["description"]
dev.related.user.add(proj["last_modified_user"])
if not dev.logic.author:
dev.logic.author = proj["last_modified_user"]
dev.logic.last_updated = utils.parse_date(proj["last_modified_time"])
# Add device names from "remote_devices"
# TODO: models
# "32": {
# "remote_device_id": 32,
# "project_id": 1,
# "name": "SEL_411L_E1",
# "model": "411L",
# "version_hash": null,
# "manufacturer": null,
# "version": null
# },
remote_devices = config.get("remote_devices", {})
for rem_dev in remote_devices.values():
# Skip tunnels, e.g "Eng_Access_40"
if "Eng_Access_" not in rem_dev["name"]:
dev.related.hosts.add(rem_dev["name"])
# "role_master" and "system_roles" have almost the same format
for role_key in ["role_master", "system_roles"]:
# TODO: system_roles has a "unix_home_dir" value, useful?
role_dict = config.get(role_key)
if role_dict:
for rv in role_dict.values():
# Note: db_name usually matches name, and
# lc_name is lowercase version of name.
for key in ["name", "lc_name", "db_name"]:
if rv.get(key):
dev.related.roles.add(rv[key])
root_devices = config.get("root_devices")
if root_devices:
for rd in root_devices.values():
if rd.get("name"):
dev.related.hosts.add(rd["name"])
# TODO:
# role_master
# role_membership
# system_permissions
# system_role_resource_assoc
# system_roles
# user_lockouts
# user_membership
users = config.get("users", {})
for user_dict in users.values():
user = User()
# Values could be null, so gotta check
if user_dict.get("description"):
user.description = user_dict["description"]
if user_dict.get("username"):
user.name = user_dict["username"]
for extra_key in [
"user_id",
"status",
"account_creation",
"last_access",
"account_expiration",
"auto_login",
]:
if user_dict.get(extra_key) is not None:
user.extra[extra_key] = str(user_dict[extra_key])
dev.store("users", user, lookup="name")
users_transient_history = config.get("users_transient_history", {})
for user_dict in users_transient_history.values():
dev.related.user.add(user_dict.get("username", ""))
serial_ports = config.get("serial_ports", {}) # type: dict
for sp in serial_ports.values():
extra = {}
for m_key in [
"connection_method_id",
"protocol_id",
"serial_port_type_id",
]:
if sp.get(m_key):
extra[m_key] = str(sp[m_key])
if "full_duplex" in sp:
extra["full_duplex"] = bool(sp["full_duplex"])
flow_control = "" # "none"
for fc_key in ["hardware_flow_control", "software_flow_control"]:
if sp.get(fc_key):
flow_control = fc_key
s_type = "serial"
s_port_id = str(sp.get("serial_port_type_id", ""))
if s_port_id == "232":
s_type = "rs-232"
iface = Interface(
connected=bool(sp.get("port_power")),
name=str(sp.get("device_file", "")),
type=s_type,
id=str(sp.get("serial_port_id", "")),
serial_port=str(sp.get("device_file", "")),
baudrate=int(sp.get("baud_rate")) if "baud_rate" in sp else None,
data_bits=int(sp.get("data_bits")) if "data_bits" in sp else None,
# TODO: determine parity value
stop_bits=int(sp.get("stop_bit")) if "stop_bit" in sp else None,
flow_control=flow_control,
extra=extra,
)
dev.store("interface", iface, lookup="serial_port")
for conn_group_name in [
"local_tcp_network_port_connections",
"local_udp_network_port_connections",
]:
conn_group = config.get(conn_group_name, {}) # type: dict
for conn_value in conn_group.values():
for port_key in ["tcp_port", "udp_port"]:
if conn_value.get(port_key):
dev.related.ports.add(int(conn_value[port_key]))
# If protocol_id isn't null, it may be of interest
for port_group_name in [
"local_tcp_network_ports",
"local_udp_network_ports",
]:
port_group = config.get(port_group_name, {}) # type: dict
for port_values in port_group.values():
if port_values.get("protocol_id") and port_values.get("port_number"):
dev.related.ports.add(int(port_values["port_number"]))
# Pull some useful info out of various setting groups
for eth_group_name in [
"dnp_client_ethernet_settings",
"dnp_client_unique_settings",
"sel_client_ethernet_settings",
# NOTE: this section is untested
"modbus_ethernet_settings",
]:
dnp_group = config.get(eth_group_name, {}) # type: dict
for eth_settings in dnp_group.values():
if eth_settings.get("server_ip_address"):
dev.related.ip.add(eth_settings["server_ip_address"])
for port_key in [
"server_ip_port",
"local_tcp_port",
"local_udp_port",
"client_udp_port",
]:
if eth_settings.get(port_key):
dev.related.ports.add(int(eth_settings[port_key]))
# This only appears in dnp_client_ethernet_settings
if eth_settings.get("ssh_username"):
dev.related.user.add(eth_settings["ssh_username"])
# config is large and leads to excessive-sized device-data-summary.json
# Can't save as JSON, too large and has some data types that are not human-reader friendly
dev.write_file(config, "raw-pulled-postgres-config.txt")
del config["tag_task_settings"]
del config["tags"]
dev.write_file(config, "pulled-postgres-config.json")
cursor.close() # TODO: set cache to None?
conn.close() # TODO: set cache to None?
cls.log.debug(f"Finished pulling postgres data from {dev.ip}")
return config
[docs]
@classmethod
def execute(cls, cursor, statement: str, args: tuple | None = None) -> None:
"""Wrapper code for database execute."""
if args is None:
args = ()
while True: # TODO: change to better method than while loop
try:
cursor.execute(statement, args)
except Exception as e:
# if the db is locked, try again
cls.log.trace2(f"Postgres operational error: {e}")
sleep(0.1)
# break out when successful
break
@classmethod
def parse_xml(cls, data: str, filename: str, device_info: dict):
xml_root = ET.fromstring(data)
tag = xml_root[0].tag
if tag in cls._tag_to_parser_map:
cls._tag_to_parser_map[tag](xml_root, device_info)
else:
cls.log.debug(f"Skipping unknown XML file '{filename}' with tag '{tag}'")
@classmethod
def _parse(cls, file: Path, dev: DeviceData | None = None) -> DeviceData:
if not dev:
dev = datastore.get(file.name.partition(".")[0], "id")
device_info = {}
# Possible inputs:
# - .tar.gz/.tar file containing XML files
# - directory of XML files
# - single XML file?
#
# NOTE: all RTAC XML files must be read with encoding "utf-8-sig".
# The RTAC editor appears to save XML files as "UTF-8 with BOM" encoding.
# The XML parser handles this fine on Linux, but for some reason it
# isn't handled well on Windows. This is fixed by decoding the text
# using the "utf-8-sig" encoding, which will skip the BOM if present.
# Per the Python documentation:
# "On decoding utf-8-sig will skip those three bytes if they appear
# as the first three bytes in the file."
# Ref: https://docs.python.org/3/library/codecs.html#encodings-and-unicode
if file.is_dir():
xml_files = file.rglob("*.xml")
for xf in xml_files:
cls.parse_xml(xf.read_text(encoding="utf-8-sig"), xf.name, device_info)
dev.related.files.add(xf.name)
elif file.suffix == ".xml":
cls.parse_xml(file.read_text(encoding="utf-8-sig"), file.name, device_info)
dev.related.files.add(file.name)
elif tarfile.is_tarfile(file):
if any("gz" in s for s in file.suffixes):
mode = "r:gz"
else:
mode = "r" # transparent, i think this auto-determines compression
device_info["existing_files"] = []
with tarfile.open(name=file.as_posix(), mode=mode, encoding="utf-8") as tar:
for member in tar.getmembers():
cls.log.debug(f"Parsing '{member.name}' from '{file.name}'")
f_handle = tar.extractfile(member)
if f_handle is not None: # skip directories
device_info["existing_files"].append(member.name)
# see note earlier in code about utf-8-sig encoding
data = f_handle.read().decode(encoding="utf-8-sig")
try:
cls.parse_xml(data, member.name, device_info)
except Exception as ex:
# still save the data to file even if parsing failed
cls.log.error(
f"Failed to parse '{member.name}' from '{file.name}': {ex}"
)
dev.write_file(data, f"extracted_files/{member.name}")
dev.related.files.add(Path(member.name).name)
# TODO: there is useful information being parsed we're not saving yet
# Interfaces (Ethernet, serial, and USB)
# Services (e.g. SSH, Telnet, SEL FastMessage)
# Remote device endpoints and registers (Device)
# Get a DeviceData from datastore, then update:
# registers, IP, services, interfaces, model/brand/vendor
# Internal Tags?
dev.extra.update(device_info)
# TODO: hack, defaulting model to 3530 for now (it's not being extracted...)
if not dev.description.model:
cls.log.warning(
"RTAC parsing doesn't currently extract the model, "
"it defaults to '3530' for the time being"
)
dev.description.model = "3530"
cls.update_dev(dev)
return dev
SELRTAC.ip_methods = [
IPMethod(
name="SEL RTAC login HTTP",
description=str(SELRTAC._verify_http.__doc__).strip(),
type="unicast_ip",
identify_function=functools.partial(SELRTAC._verify_http, protocol="http"),
reliability=6, # Modern RTACs just redirect from HTTP to HTTPS
protocol="http",
transport="tcp",
default_port=80,
),
IPMethod(
name="SEL RTAC login HTTPS",
description=str(SELRTAC._verify_http.__doc__).strip(),
type="unicast_ip",
identify_function=functools.partial(SELRTAC._verify_http, protocol="https"),
reliability=8,
protocol="https",
transport="tcp",
default_port=443,
),
IPMethod(
name="SEL RTAC SSL certificate",
description=str(SELRTAC._verify_https_ssl_certificate.__doc__).strip(),
type="unicast_ip",
identify_function=SELRTAC._verify_https_ssl_certificate,
reliability=9,
protocol="https",
transport="tcp",
default_port=443,
),
IPMethod(
name="SEL RTAC login PostgreSQL",
description=str(SELRTAC._verify_postgres.__doc__).strip(),
type="unicast_ip",
identify_function=SELRTAC._verify_postgres,
reliability=5,
protocol="postgres",
transport="tcp",
default_port=5432, # TODO: don't auto scan for postgres
# TODO: better workaround for postgres port identification
# The TCP SYN check may put the database in a weird state or ban us
# since the relay's server is configured with a strict limit on connections,
# and the port check counts as a new connection.
# Implement a custom port_function?
),
]
__all__ = ["SELRTAC"]