Source code for peat.modules.ge.ge_relay

import copy
import re
from pathlib import Path

from bs4 import BeautifulSoup

from peat import (
    DeviceData,
    DeviceError,
    DeviceModule,
    Event,
    Interface,
    IPMethod,
    consts,
    log,
    state,
    utils,
)
from peat.protocols import HTTP, clean_mac

# TODO: these devices may support HTTPS in some cases, should we handle that somehow?
# TODO: make a "GEHTTP" subclass of HTTP to more cleanly implement custom functionality


[docs] class GERelay(DeviceModule): """ PEAT module for GE Multilin Relays. Listening services - HTTP (TCP 80) Web pages - /IEC61850InfoMenu.htm - /CustomerSupport.htm - /ProcessCardMenu.htm - /DeviceInfoMenu.htm - /DisplayDump.htm - /DNPPoints.htm - /HF03DisableStatus.htm - /FlexInteger.htm - /FlexAnalog.htm - /FlexLogicParameters.htm - /memoryMap.htm - ?0x<HEXADDRESS> - /USBStats.htm - /FaultReport.htm - /RoutingAndArpTable.htm - /EventRecorder.htm - ?<# of alerts, 0 to show all> - /DefaultSettingsDiagnostics.htm - /FlexOperandStates.htm Authors - Christopher Goes - Daniel Hearn, Idaho National Laboratory (INL) """ device_type = "Relay" vendor_id = "GE" vendor_name = "General Electric" brand = "Multilin" can_parse_dir = True # NOTE: F35 has not been tested with PEAT after integration changes supported_models = ["D30", "F35", "N60", "T60", "L90"] URLS = { "default_settings_diagnostics": "/DefaultSettingsDiagnostics.htm", "dnp_points": "/DNPPoints.htm", "event_recorder": "/EventRecorder.htm?0", "flex_analog": "/FlexAnalog.htm", "flex_integer": "/FlexInteger.htm", "flex_logic": "/FlexLogicParameters.htm", "flex_operand_states": "/FlexOperandStates.htm", "mb_map_product_info": "/memoryMap.htm?0x0000", "mb_map_administrator": "/memoryMap.htm?0x0D00", "routing": "/RoutingAndArpTable.htm", "usb_stats": "/USBStats.htm", } # Source for signatures: https://github.com/pnnl/ssass-e # License: BSD-3 MODEL_SIGS = { # Signatures of models we know of "D30 Distance Relay": "D30", "N60 Network Relay": "N60", "T60 Transformer": "T60", "L90 Line Relay": "L90", } @classmethod def _verify_http(cls, dev: DeviceData) -> bool: """ Verify GERelay by checking for strings in homepage. """ cls.log.debug(f"Verifying {dev.ip} using HTTP") try: with HTTP( ip=dev.ip, port=dev.options["http"]["port"], timeout=dev.options["http"]["timeout"], ) as http: response = http.get() if not response or not response.content: return False data = response.content.decode() lower_data = data.lower() verified = False # TODO: use process_header_data() for this for sig, model in cls.MODEL_SIGS.items(): if sig.lower() in lower_data: verified = True dev.description.model = model break if not verified and ("ge power" in lower_data and "relay" in lower_data): verified = True if "f35 " in lower_data: dev.description.model = "F35" if verified: cls.log.info(f"Verified {dev.ip} via HTTP") # Source: https://github.com/pnnl/ssass-e # License: BSD-3 if "Revision " in data: try: fw_id = data.partition("Revision ")[2].split("<")[0] dev.firmware.version = fw_id.strip() except Exception as ex: cls.log.warning(f"Failed to parse firmware version: {ex}") return True except Exception: cls.log.exception(f"failed to verify {dev.ip} via HTTP due to an unhandled exception") return False @classmethod def _pull(cls, dev: DeviceData) -> bool: raw_pages = cls._download_pages(dev) if not raw_pages: cls.log.error(f"No pages were downloaded from {dev.ip}!") return False page_tables = cls._parse_pages(raw_pages) if not page_tables: return False relay_data = cls._process_pages(dev, page_tables) if not relay_data: return False return True @classmethod def _parse(cls, file: Path, dev: DeviceData | None = None) -> DeviceData | None: """ This will parse a directory of scraped HTML files. It is intended to be used for testing and development of PEAT, such as generation of test data. """ if not file.is_dir(): raise DeviceError(f"{file.name} must be a directory") if not dev: dev = DeviceData() raw_pages = {} for d_type, f_name in cls.URLS.items(): f_name = f_name.replace("/", "") if "?" in f_name: f_name = f_name.replace("?", "") + ".html" path = list(file.glob(f"*{f_name}")) if not path: raise DeviceError(f"Failed to find file '{f_name}' in '{file}'") if len(path) > 1: raise DeviceError(f"Multiple versions of file '{f_name}' in '{file}'") raw_pages[d_type] = path[0].read_text(encoding="utf-8") # Hack to make PEAT tests work. I'm fine with this since # GERelay._parse() was originally created to use for # PEAT test data generation in the first place. f_name = path[0].name for model in cls.supported_models: m_str = f"{model.lower()}_" if f_name.startswith(m_str): f_name = f_name.replace(m_str, "") break dev.related.files.add(f_name) page_tables = cls._parse_pages(raw_pages) cls._process_pages(dev, page_tables) cls.update_dev(dev) return dev @classmethod def _download_pages(cls, dev: DeviceData) -> dict[str, str]: raw_pages = {} cls.log.info(f"Downloading pages from {dev.ip}") # NOTE: flex_logic and flex_operand_states take a long time (html page # takes a long time to load). with HTTP( ip=dev.ip, port=dev.options["http"]["port"], timeout=dev.options["http"]["timeout"], ) as http: for d_type, path in cls.URLS.items(): url = f"http://{dev.ip}{path}" page_data = cls._get_page(http, dev, url) if page_data: raw_pages[d_type] = cls._get_page(http, dev, url) cls.log.info(f"Finished downloading pages from {dev.ip}") return raw_pages @classmethod def _get_page(cls, http: HTTP, dev: DeviceData, url: str) -> str: cls.log.info(f"Downloading {url}") response = http.get(url=url, dev=dev) if response and response.text: return response.text cls.log.warning(f"Failed to download {url}") state.error = True return "" @staticmethod def _parse_pages(raw_pages: dict[str, str]) -> dict[str, dict]: return { d_type: parse_ge_html(data, key_value_pairs=d_type in ["usb_stats"]) for d_type, data in raw_pages.items() } @classmethod def _process_pages(cls, dev: DeviceData, page_tables: dict) -> dict | None: relay_data = copy.deepcopy(page_tables) # Process header information from each page (IP, name, model, etc.) for tables in relay_data.values(): if "page_header" in tables: process_header_data(dev, tables.pop("page_header")) if dev.id and dev.name and "ge_html_files" in dev.id: dev.id = dev.name elif dev.id and dev.ip and "ge_html_files" in dev.id: dev.id = dev.ip dev.write_file(page_tables, "parsed-page-data.json") # Issue warnings to make code a bit cleaner for d_type in cls.URLS.keys(): if not relay_data.get(d_type): cls.log.warning(f"No '{d_type}' data on {dev.ip}") # Flatten if there's only one table for d_type, tables in relay_data.items(): if len(tables) == 1: relay_data[d_type] = tables.popitem()[1] # TODO: default_settings_diagnostics isn't getting used anywhere # Memory values # Modbus registers # Setting names (add this as a Set to .extra) # # TODO: add to dev.memory (Memory model) from DefaultSettingsDiagnostics # flash_address => value_hex # value_hex: 00 00 (2 bytes of memory), 00 00 00 00 (4 bytes of memory) # extrapolate address using flash_address as offset # Set memory reads dataset as "Event #1, <date> <time>" # Combine all reads for a event into a single Memory object # annotate with setting_name and modbus_address # if relay_data.get("default_settings_diagnostics"): # for table_name, table in relay_data["default_settings_diagnostics"].items(): # if table_name == "collected_data": # pass # TODO # elif "event" in table_name: # pass # TODO # else: # cls.log.warning( # f"Unknown DefaultSettingsDiagnostics table: {table_name}" # ) # TODO: add Register models for DNP3 registers # if relay_data.get("dnp_points"): # dnp_points = relay_data["dnp_points"] # TODO: add to dev.registers for Modbus points? for row in relay_data.get("mb_map_administrator", []): if row.get("value") and row.get("name") in [ "GDOI KDC IP", "OCSP Server IP", "SCEP Server IP", ]: if utils.is_ip(row["value"]): dev.related.ip.add(row["value"]) # Process device metadata from "Product Info" ModbusMap page for row in relay_data.get("mb_map_product_info", []): if not row.get("name") or not row.get("value"): cls.log.warning(f"Bad mb_map_product_info row: {row}") continue # TODO: other interesting fields # "UR Product Type" # "Modification Number" # "CPU Module Serial Number" (dev.module.*?) # "CPU Supplier Serial Number" (dev.module.*?) # "Main Board HW ID" # "Daughter Board HW ID" if row["name"] == "Product Version": dev.firmware.version = row["value"] elif row["name"] == "Serial Number": dev.serial_number = row["value"] elif row["name"] == "Manufacturing Date": dev.manufacturing_date = utils.parse_date(row["value"]) elif row["name"] == "Order Code": dev.firmware.id = row["value"] dev.firmware.extra["order_code"] = row["value"] # Extract device model from start of order code dev.description.model = row["value"].split("-")[0] elif row["name"] == "Ethernet MAC Address": # Convert MAC: "DC3752FFFFFF" => "DC:37:52:FF:FF:FF" dev.extra["mac_address"] = ":".join(re.findall(r"..", row["value"])) elif row["name"] == "FPGA Version": dev.boot_firmware.id = "FPGA" dev.boot_firmware.version = row["value"] elif row["name"] == "FPGA Date": dev.boot_firmware.release_date = utils.parse_date(row["value"]) else: dev.extra[convert_key(row["name"])] = row["value"] # Network interfaces for port in relay_data.get("routing", {}).get("port_status", []): iface = Interface( connected=(True if port.get("link_status", "").upper() == "UP" else False), enabled=(True if port.get("function", "").upper() == "ENABLED" else False), name=port.get("port", ""), id=port.get("port", ""), extra={ "link_status": port.get("link_status", ""), "redundancy": port.get("redundancy", ""), "active_if_redundancy": str(port.get("active_if_redundancy", "")), "port": port.get("port", ""), }, ) # "127.0.0.1" check is to avoid duplicating "lo0" interface if port.get("ip_address") and port["ip_address"] != "127.0.0.1": iface.type = "ethernet" iface.ip = port["ip_address"] if port.get("subnet_mask"): iface.subnet_mask = port["subnet_mask"] if relay_data["routing"].get("ipv4_routing_table"): for route in relay_data["routing"]["ipv4_routing_table"]: if iface.ip == route.get("gateway", "") and route.get("if"): iface.name = route["if"] break if port.get("ip_address") == dev.ip and "mac_address" in dev.extra: dev.mac = clean_mac(dev.extra.pop("mac_address")) iface.mac = dev.mac dev.store("interface", iface) lo_found = False for v4_route in relay_data.get("routing", {}).get("ipv4_routing_table", []): if "/" not in v4_route.get("destination", ""): dev.related.ip.add(v4_route["destination"]) if "." in v4_route.get("gateway", ""): dev.related.ip.add(v4_route["gateway"]) elif ":" in v4_route.get("gateway", ""): dev.related.mac.add(clean_mac(v4_route["gateway"])) # Add localhost "lo0" interface standalone if not lo_found and v4_route.get("if") == "lo0": lo_iface = Interface( connected=True, enabled=True, name="lo0", hostname="localhost", ip="127.0.0.1", type="ethernet", ) dev.store("interface", lo_iface) lo_found = True # TODO: ipv6 routing table entries (ipv6_routing_table) # Process ARP table entries for arp in relay_data.get("routing", {}).get("arp_table", []): dev.related.ip.add(arp.get("internet_address", "")) dev.related.mac.add(clean_mac(arp.get("physical_address", ""))) # Add the fact that there's a USB interface I guess if relay_data.get("usb_stats"): u_stats = relay_data["usb_stats"] usb_iface = Interface( type="usb", # TODO: use to determine enabled/active status: # usb_initialized, usb_configured, usb_enumerated, # rx_frame_count, tx_frame_count # speed=0, # TODO: usb_speed, "Full Speed: 12Mbit/s" extra=copy.deepcopy(u_stats), ) if "metric" in usb_iface.extra: del usb_iface.extra["metric"] dev.store("interface", usb_iface) # Process logic. Currently this is just cleaning up the fields, # someday we should turn this into useful logic. for logic_section in ["flex_logic", "flex_operand_states"]: if not relay_data.get(logic_section): continue try: cleaned_logic = [] for logic_entry in relay_data[logic_section]: cleaned_entry = {} for key, value in logic_entry.items(): clean_key = utils.clean_replace(key, "", "(),. ") if "ok_" in clean_key: cleaned_entry[clean_key] = bool(value) elif clean_key == "value_decimal": cleaned_entry["value"] = int(value) # Trim the hex value since there's no reason to keep it elif clean_key != "value_hex": cleaned_entry[clean_key] = value cleaned_logic.append(cleaned_entry) relay_data[logic_section] = cleaned_logic except Exception: cls.log.exception(f"Error processing {logic_section}") # Process logs if relay_data.get("event_recorder"): cls._process_event_recorder(dev, relay_data["event_recorder"]) # Dump data with any post-processing applied dev.write_file(relay_data, "processed-data.json") cls.update_dev(dev) dev.extra.update(relay_data) # Remove extraneous data we've already processed for key in [ "flex_analog", "flex_integer", "flex_logic", "flex_operand_states", "event_recorder", "mb_map_product_info", "mb_map_administrator", ]: if key in dev.extra: del dev.extra[key] return relay_data @classmethod def _process_event_recorder(cls, dev: DeviceData, event_recorder: list[dict]) -> None: """ Process GE device logs (event recorder) into the PEAT data model. """ try: for raw in event_recorder: action = utils.clean_replace(raw["event_cause"], "-", " /()").lower() action = action.replace("'", "").replace("--", "-").strip("-") event = Event( action=action, category={"host"}, created=utils.parse_date(raw["time_and_date"]), dataset="event_recorder", kind={"event"}, module=cls.__name__ if not dev._module else dev._module.__name__, original=raw["event_cause"], sequence=int(raw["event_number"]), type={"info"}, ) lowercase = raw["event_cause"].lower() if any(x in lowercase for x in ["closed", "open", "trigger"]): event.outcome = "success" event.type.add("change") if "change" in lowercase: event.category.add("configuration") event.type.add("change") if "power" in lowercase: event.type.add("change") if "failure" in lowercase: event.outcome = "failure" event.type.add("error") event.type.remove("info") dev.store("event", event, lookup="sequence") except Exception: cls.log.exception(f"error while processing 'event_recorder' data for {dev.ip}") state.error = True
GERelay.ip_methods = [ IPMethod( name="GE Relay HTTP homepage", description=str(GERelay._verify_http.__doc__).strip(), type="unicast_ip", identify_function=GERelay._verify_http, reliability=7, protocol="http", transport="tcp", default_port=80, ), ] def process_header_data(dev: DeviceData, header: list) -> None: if not header: return prod_rev = header[0].partition("Revision") dev.firmware.version = prod_rev[2].strip() if not dev.description.model: prod = prod_rev[0].strip() for sig, model in GERelay.MODEL_SIGS.items(): if sig.lower() in prod.lower(): dev.description.model = model break else: dev.extra["product_identifier"] = prod dev.description.model = prod.partition(" ")[0] name_ip = header[1].partition("IP Address: ") ip = name_ip[2].strip() if not dev.ip: dev.ip = ip if ip != dev.ip: log.warning(f"Configured IP {ip} does not match current IP {dev.ip}!") dev.related.ip.add(ip) name = name_ip[0].partition("Relay Name: ")[2].strip() if dev.name and name != dev.name: log.warning(f"Configured Name {name} does not match current name {dev.name}") dev.name = name
[docs] def parse_ge_html(text: str, key_value_pairs: bool = False) -> dict[str, list[dict]]: """ HTML parsing for pages scraped from GE devices. Originally based on code by Ryan Vrecenar in ``sel_http.py.read_html()``. Args: text: the raw HTML text to parse key_value_pairs: If a table should be parsed as key-value pairs. This mainly applies to the USBStats page, but it could apply to other pages as well. """ soup = BeautifulSoup(text, features=consts.BS4_PARSER) table_elements = soup.find_all("table") if not table_elements: return {} # If "multi-table" (e.g., DNPPoints.htm), then rows with one <TD/> # (one column) is a new table/ There will only be two <TABLE/> # elements: the header, and the "table" with multiple tables. results = {} page_name = "_UNKNOWN_PAGE_NAME" # Iterate over all "<TABLE/>" elements for table_num, table_element in enumerate(table_elements): table_name = f"unknown_table_{table_num}" tables = [] # type: list[dict] rows = [] # type: list[list[str]] # Find rows in html table objects for row_index, row in enumerate(table_element.find_all("tr")): if not row: # skip empty rows.append([]) # preserve row indices continue # Find columns in each row cols = row.find_all("td") if not cols: # skip empty rows.append([]) # preserve row indices continue if len(cols) == 1: table_name = convert_key(clean_text(cols[0].getText())) tables.append({"name": table_name, "index": row_index}) rows.append([]) # preserve row indices continue # Iterate over columns cleaned = [clean_text(pos.getText()) for pos in cols] rows.append([c for c in cleaned if c]) if "click_here" in table_name: page_name = table_name.split("click_here")[0] results["page_header"] = rows[0] continue if len(rows) < 3 or key_value_pairs: # key-value pairs row_values = {} for r in rows[1:]: if r and len(r) > 1: row_values[convert_key(r[0])] = r[1].strip() results[table_name] = row_values continue # In the case of no table header (e.g. EventRecorder or FlexAnalog) if not tables: tables.append({"name": page_name, "index": -1}) header = [] # type: list[str] for tbl_idx, table in enumerate(tables): header_row = rows[table["index"] + 1] if not header: header = [convert_key(h) for h in header_row] if header and len(header_row) == len(header): tbl_header = header else: # header specific to this table tbl_header = [convert_key(h) for h in header_row] if tbl_idx + 1 < len(tables): end_row_idx = tables[tbl_idx + 1]["index"] else: end_row_idx = len(rows) table_values = [] # type: list[dict[str, Union[str, int]]] for tr_vals in rows[table["index"] + 2 : end_row_idx]: if not tr_vals: continue row_values = { col: value.strip() for col, value in zip(tbl_header, tr_vals, strict=False) } # type: dict[str, Union[str, int]] table_values.append(row_values) results[table["name"]] = table_values return results
[docs] def clean_text(data: str) -> str: """ Attempt to remove whitespace and new line characters. """ return data.replace("\\\\n", "").replace("\\n", "").lstrip().rstrip()
def convert_key(key: str) -> str: return utils.clean_replace(key, "", "():,.").replace(" ", "_").lower()