import ipaddress
import re
import traceback
from collections import defaultdict
from datetime import timedelta
from urllib.parse import parse_qs, urlparse
from requests.auth import HTTPDigestAuth
from peat import DeviceData, Event, Interface, Memory, Service, log, utils
from peat.protocols import HTTP, clean_mac
from peat.protocols.enip.vendor_ids import VENDOR_NAMES
[docs]
class ClxHTTP(HTTP):
"""
HTTP interface for Allen-Bradley ControlLogix Ethernet communication modules.
Supported communication modules and functions
- EN2TR: all functions except ``serverlog``
- ``memory``
- ``network``
- ``home``
- ``device_identity``
- ``syslog``
- ``diagnetwork``
- ``modules``
- ``module_list``
- EWEB: all functions except ``memory``, ``syslog``, and ``device_identity``
- ``network``
- ``home``
- ``serverlog``
- ``diagnetwork``
- ``modules``
- ``module_list``
- L8 CPU (built-in Ethernet): ``network`` (no other functions work)
- ``network``
"""
MEMORY_GROUPS = [
{
# rokform/SysListDetail?name=WatchDog%20Log&id=216&comp=Apex
"name": "watchdog_log",
"page": "WatchDog",
"search": "WatchDog Log",
# "ids": ["216", "185"],
},
{
# rokform/SysListDetail?name=Internal%20Memory&id=217&comp=Apex
"name": "internal_memory",
"page": "Internal%20Memory",
"search": "Internal Memory",
# "ids": ["217", "186"],
},
{
# rokform/SysListDetail?name=Parameter%20Area&id=218&comp=Apex
"name": "parameter_area",
"page": "Parameter%20Area",
"search": "Parameter Area",
# "ids": ["218", "187"]
},
]
HOME = "home.asp"
"""Device metadata page."""
SERVERLOG = "serverlog.asp"
"""HTTP server log page."""
DIAGNETWORK = "diagnetwork.asp"
"""Network diagnostics page."""
CHASSIS_WHO = "chassisWho.asp"
"""Page with a list of modules."""
UPTIME_RE = re.compile(
r"((?P<years>\d+) years)?[ ,]*"
r"((?P<months>\d+) months)?[ ,]*"
r"((?P<days>\d+) days)?[ ,]*"
r"(?P<hours>\d{1,2})h:(?P<minutes>\d{1,2})m:(?P<seconds>\d{1,2})"
r"\.?(?P<milliseconds>\d{3})?s",
re.ASCII | re.IGNORECASE,
)
SYSLOG_PAGE_RE = re.compile(
r"rokform/SysListDetail\?name=Full List&id=(\d+)&comp=SysLog",
re.ASCII | re.IGNORECASE,
)
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.syslog_page_id = ""
self.memory_page_ids = {}
@staticmethod
def _clean_data(data: dict) -> dict:
clean = {}
for key, value in data.items():
key = key.replace("\u00a0?", "").strip()
if isinstance(value, str):
# Replace multiple spaces with a single space
value = re.sub(" {2,}", " ", value.replace("\t", " ")).strip()
clean[key] = value
return clean
@classmethod
def _extract_rows(cls, text: str, start: int, **kwargs) -> list[list]:
table = cls.gen_soup(text).find("table", **kwargs)
rows = table.find_all("tr")[start:]
return [r.find_all("td") for r in rows]
@classmethod
def _extract_list(cls, text: str, start: int = 1, **kwargs) -> list[dict]:
"""
Table with rows of values, and with keys defined in the columns.
Transforms into a list of dict, with the dict keys associated with
the column names and values being the values from that row.
"""
rows = cls._extract_rows(text, start, **kwargs)
labels = [str(column.string) if column.string else str(column.text) for column in rows[0]]
return [
{label: str(val.string) for label, val in zip(labels, row, strict=False)}
for row in rows[1:]
]
@classmethod
def _extract_tabular(cls, text: str, start: int = 1, **kwargs) -> dict:
"""Table with rows of key-value pairs."""
return {
row[0].string: row[1].string
for row in cls._extract_rows(text, start, **kwargs)
if len(row) == 2
}
@staticmethod
def _convert_serial_num(value: str) -> str:
"""
Convert Rockwell serial number from Hexadecimal to int.
"""
try:
if not value or not value.strip():
return ""
return str(int(value.strip(), 16))
except Exception:
return value
[docs]
def get_memory(self) -> dict[str, dict[str, str]]:
"""
Extracts and aggregates memory information from several pages.
"""
memory_info = {}
if not self.memory_page_ids and not self._cache_memory_page_ids():
self.log.warning("Failed to get memory: couldn't find memory page IDs")
return {}
for mg in self.MEMORY_GROUPS:
pid = self.memory_page_ids.get(mg["name"])
if not pid:
self.log.warning(
f"No cached page ID for '{mg['name']}' "
f"(page doesn't exist on this comm module model)"
)
continue
page = f"rokform/SysListDetail?name={mg['page']}&id={pid}&comp=Apex"
memory_data = self.retrieve_memory_page(page)
memory_info[mg["name"]] = memory_data
self.log.info(f"Read memory for {mg['name']} (page id: {pid})")
return memory_info
def _cache_memory_page_ids(self) -> bool:
"""
Get the page IDs of the memory-related pages.
"""
self.log.debug("Attempting to find memory page IDs")
apex = self.get("rokform/SysDataDetail?name=Apex")
if not apex or not apex.text:
return False
for group in self.MEMORY_GROUPS:
pat = r"rokform/SysListDetail\?name={}&id=(\d+)&comp=Apex".format(group["search"])
match = re.search(pat, apex.text, re.ASCII | re.IGNORECASE)
if match:
self.memory_page_ids[group["name"]] = match.groups()[0]
self.log.trace(f"Memory page IDs: {self.memory_page_ids}")
return True
[docs]
def retrieve_memory_page(self, memory_page: str) -> dict:
"""
Parses memory from webpage to the values as a hex string.
"""
page = self.get(memory_page, use_cache=False)
if not page or not page.text:
return {}
blob, by_offset = self.parse_memory_page(page.text)
if not blob:
return {}
return {
"blob": blob,
"lines_by_offset": by_offset,
"timestamp": page.response_timestamp,
}
[docs]
@classmethod
def parse_memory_page(cls, text: str) -> tuple[str, dict[str, str]]:
rows = cls._extract_rows(text, start=1, width="100%", cellpadding=4)
binary_blob = ""
by_offset = {}
for row in rows:
line_value = ""
for offset in row[1:-1]:
line_value += cls.add_padding(offset.string[2:])
binary_blob += line_value
line_key = cls.add_padding(row[0].string[2:])
by_offset[line_key] = line_value
return binary_blob, by_offset
[docs]
@staticmethod
def add_padding(input_data: str) -> str:
"""
Adds padding for memory values, e.g. convert ``01`` to ``00000001``.
"""
return "0" * (8 - len(input_data)) + input_data
[docs]
@classmethod
def process_memory(cls, dev: DeviceData, memory_regions: dict) -> None:
"""
Add data from the parsed memory to a
:class:`~peat.data.models.DeviceData` object.
"""
log.info(f"Processing memory reads from {dev.ip} (this may take a while)")
blobs = defaultdict(dict)
for r_name, region in memory_regions.items():
if isinstance(region["timestamp"], str):
ts = region["timestamp"]
else:
ts = region["timestamp"].isoformat()
blobs[r_name][ts] = region["blob"]
memory = Memory(
# TODO: where in memory is this actually? ask our Rockwell experts
address=next(iter(region["lines_by_offset"].keys())),
# Set "created" to the time the page was pulled by PEAT
created=region["timestamp"],
dataset=r_name, # "watchdog_log", "internal_memory", etc.
device=dev.ip,
# TODO: size (calculate this)
value=region["blob"],
)
dev.memory.append(memory)
dev.write_file(blobs, "memory-blobs.json")
[docs]
def get_network(self) -> dict[str, dict | list]:
"""
Extracts and aggregates network information from several pages.
"""
self.log.info("Pulling network information...")
network_information = {
"arp_table": self.retrieve_net_table("arp"),
"icmp_statistics": self.retrieve_statistics("icmp"),
"if_statistics": self.retrieve_statistics("if"),
"ip_statistics": self.retrieve_statistics("ip"),
"ip_routes": self.retrieve_net_table("iproute"),
"tcp_statistics": self.retrieve_statistics("tcp"),
"tcp_connections": self.retrieve_net_table("tcpconn"),
"udp_statistics": self.retrieve_statistics("udp"),
"udp_table": self.retrieve_net_table("udptable"),
}
return network_information
[docs]
def retrieve_statistics(self, name: str) -> dict[str, str]:
"""
Retrieves and parses a page of statistics into a dictionary.
"""
page = self.get(f"rokform/advancedDiags?pageReq={name}", use_cache=False)
if not page or not page.text:
return {}
return self.parse_statistics(page.text)
[docs]
@classmethod
def parse_statistics(cls, text: str) -> dict[str, str]:
"""
Parse and extract a network statistics from HTML text.
"""
return cls._extract_tabular(text, cellpadding=2)
[docs]
def retrieve_net_table(self, name: str) -> list[dict[str, str]]:
"""
Retrieves and parses a table from a network information
page into a dictionary.
"""
page = self.get(f"rokform/advancedDiags?pageReq={name}", use_cache=False)
if not page or not page.text:
return []
return self.parse_net_table(page.text)
[docs]
@classmethod
def parse_net_table(cls, text: str) -> list[dict[str, str]]:
"""
Parse and extract a network data table from HTML text.
"""
return cls._extract_list(text, cellpadding=2)
@classmethod
def _make_svc(cls, dev: DeviceData, svc_info: dict, transport: str) -> None:
port = svc_info.get("Local Port")
if not port:
return
port = int(port)
_protocol_lookups = {
68: "bootp",
80: "http",
161: "snmp",
2221: "enip_secure",
2222: "enip",
44818: "cip",
}
protocol_name = _protocol_lookups.get(port, "")
dev.related.ports.add(port)
dev.related.protocols.add(protocol_name)
to_search = {"port": port, "transport": transport}
# TODO: should service be added to all ethernet interfaces?
# (since localaddress == 0.0.0.0)?
existing_service = dev.retrieve("service", search=to_search)
if existing_service:
if isinstance(existing_service, list):
log.warning(f"Multiple services found: {existing_service}")
existing_service = existing_service[0]
# hack to not overwrite existing "verified" status with "open"
if existing_service.status != "verified":
existing_service.status = "open"
if not existing_service.protocol and protocol_name:
existing_service.protocol = protocol_name
else:
# If the "protocol" field is blank, then it fails to add the
# service because the matching logic is flawed and thinks
# it's a duplicate or something.
new_service = Service(
port=port, protocol=protocol_name, status="open", transport=transport
)
dev.store("service", new_service)
[docs]
@classmethod
def process_network(cls, dev: DeviceData, info: dict) -> None:
"""
Add data from the parsed network information pages to a
:class:`~peat.data.models.DeviceData` object.
"""
# ARP table, has MACs and IPs
for arp_entry in info.get("arp_table", []):
mac = clean_mac(arp_entry["Physical Address"])
dev.related.mac.add(mac)
dev.related.ip.add(arp_entry.get("Net Address"))
# infer MAC if it's not known from IP in arp table
if not dev.mac and mac and arp_entry["Net Address"] == dev.ip:
dev.mac = mac
# Interface info
if info.get("if_statistics") and info["if_statistics"].get("MAC address"):
mtu = int(info["if_statistics"]["MTU"])
description = info["if_statistics"]["Description"]
mac = clean_mac(info["if_statistics"]["MAC address"])
dev.related.mac.add(mac)
if not dev.interface:
interface = Interface(
enabled=True, name=description, mac=mac, mtu=mtu, type="ethernet"
)
dev.store("interface", interface)
else:
for iface in dev.interface:
if mac == iface.mac:
iface.name = description
iface.mtu = mtu
break
if description == iface.name:
iface.mac = mac
iface.mtu = mtu
break
# udp_table: listening ports (0.0.0.0-only), infer service names
for udp in info.get("udp_table", []):
dev.related.ip.add(udp.get("Local Address"))
if udp.get("Local Address") == "0.0.0.0" and udp.get("Local Port"):
cls._make_svc(dev, udp, "udp")
elif udp.get("Local Port"):
try:
dev.related.ports.add(int(udp["Local Port"]))
except Exception:
pass
# tcp_connections: listening ports (0.0.0.0-only), infer service names
for conn in info.get("tcp_connections", []):
dev.related.ip.add(conn.get("Local Address"))
dev.related.ip.add(conn.get("Remote Address"))
if conn.get("State") == "LISTEN" and conn.get("Local Address") == "0.0.0.0":
cls._make_svc(dev, conn, "tcp")
elif conn.get("Local Port"):
try:
dev.related.ports.add(int(conn["Local Port"]))
except Exception:
pass
# ip_routes: IP routing table
for ip_route in info.get("ip_routes", []):
# Infer the subnet of the route using destination and subnet mask
# If any of the interfaces on the device have an IP that lies in
# that subnet and doesn't have a known subnet mask, then set the
# subnet mask to the mask derived from the IP route.
dest = ip_route.get("Destination", "")
raw_mask = ip_route.get("Mask", "")
if not dest or not raw_mask:
continue
subnet_mask = ".".join(reversed(raw_mask.split(".")))
try:
network = ipaddress.ip_network(f"{dest}/{subnet_mask}")
except Exception as ex:
log.warning(f"Failed to parse ip_route '{ip_route}': {ex}")
continue
for interface in dev.interface:
if not interface.ip:
continue
iface_ip = ipaddress.ip_address(interface.ip)
if not interface.subnet_mask and iface_ip in network:
interface.subnet_mask = subnet_mask
dev.extra["network"] = info
[docs]
def get_home(self) -> dict:
"""
Home page with basic information about device.
"""
page = self.get(self.HOME, use_cache=False) # uptime changes every time
if not page or not page.text:
return {}
data = self._extract_tabular(page.text, start=0, width="100%", cellpadding=4)
return self._clean_data(data)
[docs]
@classmethod
def parse_home(cls, text: str) -> dict:
"""
Parse ``home.asp`` HTML data.
"""
data = cls._extract_tabular(text, start=0, width="100%", cellpadding=4)
return cls._clean_data(data)
[docs]
@classmethod
def process_home(cls, dev: DeviceData, info: dict) -> None:
"""
Add data from the parsed home page to a
:class:`~peat.data.models.DeviceData` object.
"""
if not info:
return
if not dev.extra.get("http_home_info"):
dev.extra["http_home_info"] = {}
if info.get("Device Name"):
name = info.pop("Device Name").strip()
if not dev.name:
dev.name = name
elif dev.name != name:
log.warning(
f"Device name '{dev.name}' != HTTP name '{name}', "
f"which could be weird/interesting"
)
dev.extra["http_home_info"]["device_name"] = name
if info.get("Ethernet Address (MAC)"):
mac = clean_mac(info.pop("Ethernet Address (MAC)").strip())
dev.related.mac.add(mac)
if not dev.mac:
dev.mac = mac
elif dev.mac != mac:
log.warning(
f"Device MAC '{dev.mac}' != HTTP MAC '{mac}', which could be weird/interesting"
)
dev.extra["http_home_info"]["ethernet_address_mac"] = mac
if info.get("IP Address"):
ip = info.pop("IP Address").strip()
if not dev.ip:
dev.ip = ip
elif dev.ip != ip:
log.warning(
f"Device IP '{dev.ip}' != HTTP IP '{ip}', which could be weird/interesting"
)
dev.extra["http_home_info"]["ip_address"] = ip
if not dev.firmware.release_date and info.get("Firmware Version Date"):
fw_release_timestamp = utils.parse_date(info.pop("Firmware Version Date"))
dev.firmware.release_date = fw_release_timestamp
if not dev.uptime and info.get("Uptime"):
raw_uptime = info.pop("Uptime").strip()
parsed_uptime = cls._parse_uptime(raw_uptime)
if parsed_uptime:
# dev.uptime = int(parsed_uptime.total_seconds())
dev.uptime = parsed_uptime
dev.start_time = utils.utc_now() - parsed_uptime
else:
log.warning(f"Failed to parse uptime: {raw_uptime}")
dev.extra["http_home_info"]["uptime"] = raw_uptime
if not dev.description.description and info.get("Device Description"):
dev.description.description = info.pop("Device Description").strip()
if not dev.geo.name and info.get("Device Location"):
dev.geo.name = info.pop("Device Location").strip()
if info:
for og_key, value in info.items():
if value and value.strip():
key = utils.clean_replace(og_key, "", "()").replace(" ", "_").lower()
# NOTE: this is usually the serial number of the
# communication adapter, not the CPU.
if key == "serial_number":
value = cls._convert_serial_num(value)
elif key == "status" and not dev.status:
dev.status = value.strip()
dev.extra["http_home_info"][key] = value.strip()
if not dev.extra["http_home_info"]:
del dev.extra["http_home_info"]
@classmethod
def _parse_uptime(cls, uptime_string: str) -> timedelta | None:
"""
Parse a uptime string, e.g. ``28 days, 15h:43m:33.775s`` or
``28 days, 16h:53m:45s``.
"""
# I spent more time than was probably warranted on this. Time is hard.
uptime_res = cls.UPTIME_RE.search(uptime_string)
if not uptime_res:
return None
ut = uptime_res.groupdict()
days = 0
if ut.get("years"):
days += int(ut["years"]) * 365
if ut.get("months"):
days += int(ut["months"]) * 31
if ut.get("days"):
days += int(ut["days"])
kwargs = {
"days": days,
"hours": int(ut["hours"]),
"minutes": int(ut["minutes"]),
"seconds": int(ut["seconds"]),
}
if ut.get("milliseconds"):
kwargs["milliseconds"] = int(ut["milliseconds"])
return timedelta(**kwargs)
[docs]
def get_device_identity(self) -> dict:
"""
Get Device Identity page data.
"""
page = self.get("rokform/SysDataDetail?name=Device%20Identity", use_cache=False)
if not page or not page.text:
return {}
return self.parse_device_identity(page.text)
[docs]
@classmethod
def parse_device_identity(cls, text: str) -> dict:
"""
Parse device identity page data.
"""
data = cls._extract_tabular(text, width="100%", cellpadding=4)
return cls._clean_data(data)
[docs]
@classmethod
def process_device_identity(cls, dev: DeviceData, info: dict) -> None:
if info and not dev.extra.get("http_device_identity"):
dev.extra["http_device_identity"] = {}
for og_key, value in info.items():
if value and value.strip():
key = utils.clean_replace(og_key, "", "()/").replace(" ", "_").lower()
if key == "serial_number":
value = cls._convert_serial_num(value)
dev.extra["http_device_identity"][key] = value.strip()
[docs]
def get_syslog(self) -> list[dict]:
"""
Get Syslog data (page: ``Syslog -> Full List``).
"""
# Cache the page ID for subsequent queries in the same session
if not self.syslog_page_id and not self._cache_syslog_page_id():
self.log.warning(
"Failed to get syslog: couldn't find ID of syslog "
"page. This module may not have syslog data available."
)
return []
url = f"rokform/SysListDetail?name=Full%20List&id={self.syslog_page_id}&comp=SysLog"
page = self.get(url, use_cache=False)
if not page or not page.text:
return []
return self.parse_syslog_page(page.text)
def _cache_syslog_page_id(self) -> bool:
"""
Get the page ID of the 'Full List' syslog page.
"""
self.log.debug("Attempting to find syslog page ID")
# This page has the URLs of the syslog pages with correct ID values
syslog_home = self.get("rokform/SysDataDetail?name=SysLog")
if not syslog_home or not syslog_home.text:
return False
# Extract the ID of the "Full List" syslog page
match = self.SYSLOG_PAGE_RE.search(syslog_home.text)
if not match:
return False
self.syslog_page_id = match.groups()[0]
self.log.trace(f"Syslog page ID: {self.syslog_page_id}")
return True
[docs]
@classmethod
def parse_syslog_page(cls, text: str) -> list[dict]:
"""
Parse Syslog data page.
"""
data = cls._extract_list(text, start=0, width="100%", cellpadding=4)
return [cls._clean_data(d) for d in data]
[docs]
@classmethod
def process_syslog(cls, dev: DeviceData, syslog: list[dict]) -> None:
"""
Add data from the parsed syslog pages to a
:class:`~peat.data.models.DeviceData` object.
"""
log.info(f"Processing syslog from {dev.ip}")
for raw in syslog:
# "Event String": "CloseNode() occurred"
# "Event String": "CipStatus: Generated on device"
action = utils.clean_replace(raw["Event String"], "", "/():").lower()
action = action.replace(" ", "-").replace("--", "-").strip("-")
event_kind = {"event"}
# We **should** know the start_time, but not always
# The timestamps are relative, so the start time is necessary
# e.g. "28 days, 15h:42m:55.588s"
t_delta = cls._parse_uptime(raw["Time"])
if not t_delta:
log.debug(f"Failed to parse event time: {raw['Time']}")
event_kind.add("pipeline_error")
created_ts = None
if t_delta and dev.start_time:
created_ts = dev.start_time + t_delta
extra = {
"file": raw["File"], # "File": "CommPortBase.cpp"
"line": int(raw["Line Number"]), # "Line Number": "2308"
"param_1": raw["Param1"], # "Param1": "0x2"
"param_2": raw["Param2"], # "Param2": "0x19001e"
"task_name": raw["Task Name"], # "Task Name": "EncapTCP024"
"event_code": raw["Event Code"], # "Event Code": "0xd"
}
event = Event(
action=action,
category={"process"},
created=created_ts,
dataset="syslog",
kind=event_kind,
module=dev._module.__name__ if dev._module else "ControlLogix",
original=str(raw),
sequence=int(raw["Event Number"]),
type={"info"},
message=raw["Event String"],
extra=extra,
)
dev.event.append(event)
[docs]
def get_serverlog(self, username: str = "Administrator", password: str = "") -> list[dict]:
"""
Get ``serverlog`` data from a EWEB module.
.. note::
This page requires authentication. The default credentials are
attempted but could differ on other devices. They are configurable
via the "web" option in the PEAT configuration file.
"""
page = self.get(self.SERVERLOG, use_cache=False, auth=HTTPDigestAuth(username, password))
if not page or not page.text:
return []
return self.parse_serverlog_page(page.text)
[docs]
@classmethod
def parse_serverlog_page(cls, text: str) -> list[dict]:
"""
Parse ``serverlog`` data page from a EWEB module.
"""
data = cls._extract_list(text, start=0, width="100%", cellpadding=4)
return [cls._clean_data(d) for d in data]
[docs]
@classmethod
def process_serverlog(cls, dev: DeviceData, serverlog: list[dict]) -> None:
"""
Add data parsed EWEB ``serverlog`` page to a
:class:`~peat.data.models.DeviceData` object.
This log contains a history of web page requests to the EWEB module
and the source IP of those requests.
- Possible values for ``event.category``: network, web
- Possible values for ``event.kind``: event, pipeline_error
- Possible values for ``event.type``: access, allowed, denied, error, user
"""
log.info(f"Processing serverlog from {dev.ip} (this may take a while)")
for raw_event in serverlog:
dev.related.ip.add(raw_event["IP Address"])
event_category = {"network", "web"}
event_kind = {"event"}
event_type = {"access"}
http_code = int(raw_event["HTTP Code"]) # "HTTP Code": "200"
outcome = "unknown"
if http_code < 300:
outcome = "success"
if "username" in raw_event["URL"]:
event_type.add("allowed")
elif http_code >= 400:
outcome = "failure"
event_type.add("error")
if "username" in raw_event["URL"]:
event_type.add("denied")
extra = {
# TODO: add network/host/http fields to event data model
"url": raw_event["URL"], # "URL": "/index.html",
"http_code": http_code, # "HTTP Code": "200",
"ip": raw_event["IP Address"], # "IP Address": "192.168.0.20"
"access": raw_event["Access"], # "Access: "public"
}
if "username=" in raw_event["URL"]:
try:
# "URL": "/serverlog.asp?username=Administrator&realm=1756-EWEB"
user = parse_qs(urlparse(raw_event["URL"]).query)["username"][0]
extra["username"] = user
event_type.add("user")
dev.related.user.add(user)
except Exception as ex:
log.warning(f"Failed to process username for URL '{raw_event['URL']}': {ex}")
event_kind.add("pipeline_error")
event = Event(
category=event_category,
# "Timestamp": "MAY 04 17:11:01 2021",
created=utils.parse_date(raw_event["Timestamp"]),
dataset="serverlog",
kind=event_kind,
module=dev._module.__name__ if dev._module else "ControlLogix",
original=str(raw_event),
outcome=outcome,
type=event_type,
extra=extra,
)
# TODO: de-duplication of events before export
dev.event.append(event)
[docs]
def get_diagnetwork(self) -> dict:
"""
Retrieve and parse network interface settings page (``diagnetwork``).
"""
page = self.get(self.DIAGNETWORK, use_cache=False)
if not page or not page.text:
return {}
return self.parse_diagnetwork(page.text)
[docs]
@classmethod
def parse_diagnetwork(cls, text: str) -> dict:
"""
Parse and extract data from ``diagnetwork.asp`` HTML page data.
"""
data = cls._extract_tabular(text, cellspacing=0, cellpadding=4)
return cls._clean_data(data)
[docs]
@classmethod
def process_diagnetwork(cls, dev: DeviceData, info: dict) -> None:
"""
Add data from the parsed ``diagnetwork`` page to a
:class:`~peat.data.models.DeviceData` object.
"""
# NOTE: Interface.annotate() will add hostname and gateway to related.*
iface = Interface(
enabled=True,
type="ethernet",
mac=clean_mac(info.get("Ethernet Address (MAC)", "")),
)
if info.get("Host Name"):
iface.hostname = info["Host Name"]
if info.get("IP Address"):
iface.ip = info["IP Address"]
if info.get("Subnet Mask"):
iface.subnet_mask = info["Subnet Mask"]
if info.get("Default Gateway"):
iface.gateway = info["Default Gateway"]
dev.store("interface", iface)
[docs]
def get_modules(self) -> list[dict]:
"""
Get information about all the modules on the chassis.
"""
mod_list = self.get_module_list()
modules = []
for mod in mod_list:
mod_data = self.retrieve_module(mod["slot"])
if not mod_data:
self.log.debug(f"No data for slot {mod['slot']}")
continue
# check if product name and revision match for funsies
if mod["revision"] != mod_data["Module Revision"]:
self.log.warning(
f"Revision '{mod['revision']}' from module "
f"table doesn't match the module details "
f"revision '{mod_data['Module Revision']}'"
)
if mod["module"] != mod_data["Product Name"]:
self.log.warning(
f"Module name '{mod['module']}' from module "
f"table doesn't match the module details "
f"name '{mod_data['Product Name']}'"
)
modules.append({"slot": mod["slot"], **mod_data})
return modules
[docs]
def get_module_list(self) -> list[dict]:
"""
Get list of modules on the chassis.
"""
page = self.get(self.CHASSIS_WHO, use_cache=False)
if not page or not page.text:
return []
return self.parse_module_list(page.text)
[docs]
@classmethod
def parse_module_list(cls, text: str) -> list[dict]:
"""
Parse and extract list of modules from HTML text.
"""
rows = cls._extract_rows(text, start=1, width="100%", align="left", cellpadding=0)
data = []
for row in rows:
if len(row) != 5:
log.trace(f"Skipping row with {len(row)} elements")
continue
data.append(
cls._clean_data(
{
"module": row[3].a["title"].split("information about ")[-1],
"slot": row[3].a["href"].split("slot=")[-1],
"revision": row[4].string,
}
)
)
return data
[docs]
def retrieve_module(self, slot: int) -> dict:
"""
Information about a specific module in the chassis.
"""
page = self.get(f"rokform/chassisDetail?slot={slot}", use_cache=False)
if not page or not page.text:
return {}
if "No Information Available" in page.text:
self.log.info(f"No data available for slot {slot}")
return {}
return self.parse_module_page(page.text)
[docs]
@classmethod
def parse_module_page(cls, text: str) -> dict:
"""
Parse and extract information about a PLC rack module from HTML text.
"""
data = cls._extract_tabular(text, width=500, cellpadding=4)
return cls._clean_data(data)
[docs]
@classmethod
def process_modules(cls, dev: DeviceData, raw_modules: list[dict]) -> None:
"""
Add data from the parsed modules to a
:class:`~peat.data.models.DeviceData` object.
"""
cleaned_mods = []
for raw_mod in raw_modules:
if not raw_mod:
continue
cleaned_mod = {}
for key, value in raw_mod.items():
key = key.strip().replace(" ", "_").lower()
if key == "serial_number":
value = cls._convert_serial_num(value)
cleaned_mod[key] = value.strip()
cleaned_mods.append(cleaned_mod)
for mod_dict in cleaned_mods:
mod = DeviceData()
if mod_dict.get("slot"):
mod.slot = mod_dict.pop("slot")
if mod_dict.get("serial_number"):
mod.serial_number = mod_dict.pop("serial_number")
if mod_dict.get("product_name"):
mod.description.product = mod_dict.pop("product_name")
if mod_dict.get("vendor"):
vendor_id = int(mod_dict.pop("vendor"))
if vendor_id in VENDOR_NAMES:
mod.description.vendor.name = VENDOR_NAMES[vendor_id]
if vendor_id == 1:
mod.description.vendor.id = "Rockwell"
else:
log.warning(f"Unknown vendor ID: {vendor_id}")
mod.extra["vendor_id"] = vendor_id
mod.extra.update(mod_dict)
if mod.extra.get("module_status"):
mod.status = mod.extra["module_status"]
dev.store("module", mod, lookup="slot")
[docs]
def get_all(self, dev: DeviceData) -> bool:
"""
Retrieves and processes data using all available methods.
"""
# TODO: make this a class attribute, type annotate as typing.Final
# Function pointers to methods to retrieve and process the data
pull_process_methods = [
("home", self.get_home, self.process_home),
("network", self.get_network, self.process_network),
("device-identity", self.get_device_identity, self.process_device_identity),
("diagnetwork", self.get_diagnetwork, self.process_diagnetwork),
("modules", self.get_modules, self.process_modules),
("memory", self.get_memory, self.process_memory),
("syslog", self.get_syslog, self.process_syslog),
("serverlog", self.get_serverlog, self.process_serverlog),
]
self.log.info(
f"Pulling data from {self.ip}:{self.port} via HTTP "
f"using {len(pull_process_methods)} methods "
f"(timeout: {self.timeout} seconds)"
)
was_successful = True
for name, get_func, process_func in pull_process_methods:
self.log.debug(f"Running method for '{name}' data")
err_msg = (
f"Failed to process '{name}' data, "
f"the ControlLogix module likely doesn't "
f"support this method"
)
try:
# Retrieve the data via HTTP and parse it into a dict/list
if name == "serverlog":
# Minor hack to allow configurable creds for web login
pulled_info = get_func(
username=dev.options["web"]["user"],
password=dev.options["web"]["pass"],
)
else:
pulled_info = get_func()
if pulled_info:
if name == "serverlog":
dev.related.user.add(dev.options["web"]["user"])
dev.write_file(
data=pulled_info,
filename=f"{name}.json", # Example: "syslog.json"
out_dir=dev.get_sub_dir("http_json_results"),
)
# Process the extracted data into the DeviceData object
process_func(dev, pulled_info)
else:
self.log.warning(f"No data from '{name}' method")
except Exception:
self.log.warning(err_msg)
self.log.trace(f"'{name}' method traceback\n{traceback.format_exc()}")
was_successful = False
continue
return was_successful
__all__ = ["ClxHTTP"]