Source code for peat.modules.schneider.sage.sage_parse

"""
Parsing of files from Sage RTUs.

Works with 2400 and 3030M models, and should work with others.

Authors

- Christopher Goes
- Ryan Vrecenar
"""

import re
import xml.etree.ElementTree as ET
from collections.abc import Callable
from datetime import datetime
from pathlib import PurePath, PurePosixPath
from typing import IO, Any, Final

from peat import DeviceData, log, utils
from peat.data.models import Event, Interface, User

ElasticType = None | int | bool | str | float


def _convert_to_elastic(_input: str) -> ElasticType:
    """
    Cast input to inferred type (int, bool, None) for Elasticsearch.
    """
    try:
        return int(_input)
    except Exception:
        try:
            return float(_input)
        except Exception:
            if _input == "N":
                return False
            elif _input == "Y":
                return True
            elif _input.upper() == "NONE":
                return None
            else:
                return _input


def _store_dict_items(elem_tree: ET.Element, element_dict: dict[str, ElasticType]) -> None:
    for item in elem_tree.items():
        try:
            element_dict[item[0]] = int(item[1])
        except Exception:
            if item[1] == "N":
                element_dict[item[0]] = False
            elif item[1] == "Y":
                element_dict[item[0]] = True
            else:
                element_dict[item[0]] = item[1]


def _parse_element_into_dict(element: ET.Element, info: dict, key_map: dict) -> None:
    if element.tag in key_map and element.text and element.text.lower() != "changeme":
        info[key_map[element.tag]] = element.text.strip()


[docs] def parse_access_xml(f_handle: IO[bytes], device_info: dict) -> None: """ Parse users and access permissions out of ``ACCESS.XML``. """ access_info = {} root = ET.fromstring(f_handle.read()) access_info["users"] = [] for user in root: user_info: dict[str, Any] = { "id": user.get("ID", ""), } if user.get("SESTIMEOUT"): user_info["session_timeout"] = int(user.attrib["SESTIMEOUT"]) for elem in user: if elem.tag == "DESC" and elem.text: user_info["description"] = elem.text elif elem.tag == "PASSWORD" and elem.text: user_info["password"] = elem.text elif elem.tag == "ACCESS": user_info["permissions"] = {} _store_dict_items(elem, user_info["permissions"]) access_info["users"].append(user_info) if not access_info["users"]: return device_info["access_info"] = access_info
[docs] def process_access_xml(dev: DeviceData, info: dict) -> None: """ Process data extracted from ``ACCESS.XML`` into the PEAT device data model. """ users = info["users"] for user_info in users: user = User( description=user_info.get("description", ""), id=user_info.get("id", ""), name=user_info.get("id", ""), ) if user_info.get("session_timeout"): user.extra["session_timeout"] = user_info["session_timeout"] if user_info.get("permissions"): user.extra["sage_permissions"] = user_info["permissions"] # Add any permissions that are enabled to set of permissions for user # e.g. if FTP="Y", then "FTP" would be added to user.permissions for perm_key, perm_value in user_info["permissions"].items(): if perm_value is True: user.permissions.add(perm_key) dev.store("users", user)
[docs] def parse_rtu_setup(f_handle: IO[bytes], device_info: dict) -> None: """ Parse XML tree from ``rtusetup.xml``. Args: f_handle: file handler to read data from and parse xml device_info: dictionary of keys and values for target device """ info = {} root = ET.fromstring(f_handle.read()) cpu = root.findall("CPU") if not cpu: log.warning("No 'CPU' element found in rtusetup.xml") return cpu = cpu[0] rtu_key_map = { "NAME": "name", "APPFILE": "app_file", "PARTNO": "part_number", "HWVER": "hw_ver", "USERID": "user_id", "SERIALNUM": "serial_number", "MFGNAME": "manufacturer_name", "RUNMODE": "run_mode", "TYPE": "model", } rtu = cpu.findall("RTU") if not rtu: log.warning("No 'RTU' element found in rtusetup.xml") else: rtu = rtu[0] for elem in rtu: _parse_element_into_dict(elem, info, rtu_key_map) vxworks = rtu.findall("VXWORKS") if not vxworks: log.warning("No 'VXWORKS' element found in rtusetup.xml") else: for elem in vxworks[0]: if not elem.text or elem.text == "ChangeMe": continue if elem.tag == "VXCREATED": os_date = datetime.strptime(elem.text, "%b %d %Y") info["vx_created"] = os_date.isoformat(sep="-") elif elem.tag == "VXVERSION": info["vx_version"] = elem.text.strip() config_tree = cpu.findall("CONFIG") if not config_tree: log.warning("No 'CONFIG' element found in rtusetup.xml") else: for elem in config_tree[0]: if not elem.text or elem.text == "ChangeMe": continue if elem.tag == "NAME": info["config_version"] = elem.text elif elem.tag == "LASTDATE": try: fw_date = datetime.strptime(elem.text, "%m-%d-%y") except ValueError: # Date can be '19' or '2019', must handle both accordingly fw_date = datetime.strptime(elem.text, "%m-%d-%Y") info["config_timestamp"] = fw_date.isoformat(sep="-") elif elem.tag == "REVISION": info["config_revision"] = elem.text device_info["rtusetup_info"] = info
[docs] def process_rtusetup_info(dev: DeviceData, info: dict) -> None: """ Process data extracted from ``rtusetup.xml`` into the PEAT device data model. """ if info.get("name"): dev.description.description = info["name"] if info.get("app_file"): dev.related.files.add(info["app_file"]) if info.get("part_number") and not dev.part_number: dev.part_number = info["part_number"] if info.get("user_id"): dev.related.user.add(info["user_id"]) if info.get("serial_number") and not dev.serial_number: dev.serial_number = info["serial_number"] if info.get("run_mode") and not dev.run_mode: dev.run_mode = info["run_mode"] if info.get("model") and not dev.description.model: dev.description.model = info["model"] if info.get("vx_created") and not dev.os.timestamp: dev.os.timestamp = utils.parse_date(info["vx_created"])
[docs] def parse_tte(f_handle: IO[bytes], device_info: dict) -> None: """ Parse XML tree from ``tte.xml`` to find time trigger ethernet settings. Args: f_handle: file handler to read data from and parse xml device_info: dictionary of keys and values for target device """ tte_settings = {} root = ET.fromstring(f_handle.read()) entry = 0 for tte_elem in root: entry_iter = f"entry_{entry}" tte_settings[entry_iter] = {} for tte_item in tte_elem.items(): if tte_item[1] != "": tte_settings[entry_iter][tte_item[0]] = _convert_to_elastic(tte_item[1]) entry += 1 tte_settings["note"] = "Time Triggered Ethernet" device_info["tte"] = tte_settings
[docs] def parse_time_elements(_time: ET.Element, device_info: dict) -> None: """ From TIME branch in ElementTree in time, parse out version and date of update for OS. """ time_settings = {} for item in _time.items(): time_settings[item[0]] = _convert_to_elastic(item[1]) for element in _time: time_settings[element.tag] = {} for item in element.items(): time_settings[element.tag][item[0]] = _convert_to_elastic(item[1]) device_info["time_settings"] = time_settings
[docs] def parse_gblfrz_elements(gblfrz: ET.Element, device_info: dict) -> None: """ From GBLFRZ branch in ElementTree in time, parse out version and date of update for OS. """ gblfrz_settings = {} _store_dict_items(gblfrz, gblfrz_settings) device_info["gblfrz_settings"] = gblfrz_settings
[docs] def parse_gps_elements(gps: ET.Element, device_info: dict) -> None: """ From GPS branch in ElementTree in time, parse out version and date of update for OS. """ gps_settings = {} for item in gps.items(): gps_settings[item[0]] = _convert_to_elastic(item[1]) device_info["gps_settings"] = gps_settings
[docs] def parse_time(f_handle: IO[bytes], device_info: dict) -> None: """ Parse xml tree from ``time.xml`` to find ``time``, ``gblfrz``, and ``gps`` settings. Args: f_handle: file handler to read data from and parse xml device_info: dictionary of keys and values for target device """ root = ET.fromstring(f_handle.read()) _time = root.findall("TIME")[0] parse_time_elements(_time, device_info) gblfrz = root.findall("GBLFRZ")[0] parse_gblfrz_elements(gblfrz, device_info) gps = root.findall("GPS")[0] parse_gps_elements(gps, device_info)
[docs] def bootline_parse(bootline: str | dict) -> dict[str, str]: """ Parse useful information out of vxworks bootline string, which is embedded in ``bootline.xml``. """ if isinstance(bootline, str): bootline_dict = {} cleaned = bootline.strip().replace(",0)", ",0) ").replace("host:", "host=") sections = [x.strip() for x in cleaned.split(" ") if x.strip()] for section in sections: parts = section.split("=") bootline_dict[parts[0]] = parts[1] else: bootline_dict = bootline # e: ethernet ip # b: blackplane IP (optional) # h: host IP # g: gateway IP # f: flags # tn: target name # s: startup script (optional) # o: other _key_map = { "e": "ethernet_ip", "b": "backplane_ip", "h": "host_ip", "g": "gateway_ip", "f": "flags", "tn": "target_name", "s": "startup_script", "o": "other", } info = {} for key, value in bootline_dict.items(): if key in _key_map: key = _key_map[key] info[key] = _convert_to_elastic(value) # Handle if in format "192.0.2.1:ffffff00" for e_key in ["ethernet", "backplane", "host", "gateway"]: if ":" in info.get(f"{e_key}_ip", ""): ip, subnet_mask = _parse_eth_str(info[f"{e_key}_ip"]) info[f"{e_key}_ip"] = ip info[f"{e_key}_subnet_mask"] = subnet_mask return info
def _parse_eth_str(eth_val: str) -> tuple[str, str]: """ Parse string format "IP:Subnet", with the subnet being hex-encoded. Example: ``"192.0.2.1:ffffff00"`` """ ip = eth_val[: eth_val.index(":")] h_subnet = eth_val[eth_val.index(":") + 1 :] subnet_bytes = (h_subnet[0:2], h_subnet[2:4], h_subnet[4:6], h_subnet[6:8]) subnet_mask = ".".join([f"{int(n, 16)}" for n in subnet_bytes]) return ip, subnet_mask
[docs] def parse_bootline_xml(f_handle: IO[bytes], device_info: dict) -> None: """ Parse XML tree from ``bootline.xml`` to find boot options including default IP address, location of operating system, hostname, username and password. Args: f_handle: file handler to read data from and parse xml device_info: dictionary of keys and values for target device """ root = ET.fromstring(f_handle.read()) bootline = root.findall("BOOTLINE")[0] device_info["bootline_from_xml"] = bootline_parse(bootline.attrib)
[docs] def process_bootline(dev: DeviceData, data: dict[str, str]) -> None: """ Process data extracted from VxWorks bootline into the PEAT device data model. """ boot_file = data.get("file") if not boot_file and data.get("host") and "/" in data["host"]: boot_file = data["host"] if boot_file: b_path = PurePosixPath(boot_file) dev.related.files.add(str(b_path)) dev.firmware.file.path = b_path dev.firmware.file.directory = str(b_path.parent) dev.firmware.file.name = b_path.name dev.firmware.file.type = "file" if data.get("ethernet_ip") and utils.is_ip(data["ethernet_ip"]): dev.related.ip.add(data["ethernet_ip"]) iface = dev.retrieve("interface", {"ip": data["ethernet_ip"]}) if not iface: iface = Interface( ip=data["ethernet_ip"], subnet_mask=data.get("ethernet_subnet_mask", ""), gateway=data.get("gateway_ip", ""), type="ethernet", ) dev.store("interface", iface, lookup="ip") else: if not iface.subnet_mask and data.get("ethernet_subnet_mask"): iface.subnet_mask = data["ethernet_subnet_mask"] if not iface.gateway and data.get("gateway_ip") and utils.is_ip(data["gateway_ip"]): iface.gateway = data["gateway_ip"] if data.get("backplane_ip"): dev.related.ip.add(data["backplane_ip"]) if data.get("host_ip") and utils.is_ip(data["host_ip"]): dev.related.ip.add(data["host_ip"]) if data.get("gateway_ip") and utils.is_ip(data["gateway_ip"]): dev.related.ip.add(data["gateway_ip"]) if data.get("raw_bootline"): dev.firmware.extra["bootline"] = data["raw_bootline"]
[docs] def parse_ether(f_handle: IO[bytes], device_info: dict) -> None: """ Parse XML tree from ``ethernet.xml`` to find route settings and ethernet port settings. Args: f_handle: file handler to read data from and parse xml device_info: dictionary of keys and values for target device """ ether_settings = {} root = ET.fromstring(f_handle.read()) entry = 0 for ether_elem in root: ether_iter = f"entry_{entry}" ether_settings[ether_iter] = {} # Save ethernet parameters including ID, Password, Name, etc for item in ether_elem.items(): ether_settings[ether_iter][item[0]] = item[1] # Save Routes routes_elem = ether_elem.findall("ROUTES") if not routes_elem: log.warning("Failed to find 'ROUTES' element in ether_elem") continue routes = ether_elem.findall("ROUTES")[0] routes_settings = {} for route in routes: route_fields = route.items() # Route_ID uses first route field which is value of ID route_id = route_fields[0][1] for field in route_fields[1:]: routes_settings[field[0]] = _convert_to_elastic(field[1]) ether_settings[ether_iter][f"route_{route_id}"] = routes_settings entry += 1 device_info["ethernet"] = ether_settings
[docs] def parse_firewall_rule(firewall_line: str, firewall_rule: dict) -> None: delimiters = "block in ", "from ", " to ", " port = " regex_pattern = "|".join(map(re.escape, delimiters)) # Split on grammar, drop null first byte firewall_syntax = re.split(regex_pattern, firewall_line)[1:] proto = firewall_syntax[0].rstrip() if not firewall_syntax[0] == "" else None firewall_rule["protocol"] = proto firewall_rule["from_cdir"] = firewall_syntax[1] firewall_rule["to_cdir"] = firewall_syntax[2] # Hard coded assumption, port = is last term if "port = " in firewall_line: firewall_rule["port"] = firewall_syntax[-1]
[docs] def parse_firewall(f_handle: IO[bytes], device_info: dict) -> None: firewall_rules = {} lines = f_handle.read().decode("utf-8") i = 0 for line in lines.split("\n"): # Remove comments from firewall rules if len(line.lstrip()) > 1 and line.lstrip()[0] != "#" and line.lstrip()[0:2] != "//": i += 1 firewall_rules[f"entry_{i}"] = {} firewall_entry = firewall_rules[f"entry_{i}"] firewall_entry["original_rule"] = line firewall_entry["semantic_rule"] = {} parse_firewall_rule(line, firewall_entry["semantic_rule"]) else: continue device_info["firewall_settings"] = firewall_rules
[docs] def parse_isagraf(f_handle: IO[bytes], device_info: dict) -> None: isagraf_data = f_handle.read().decode("utf-8") isagraf_version = isagraf_data.split(" ")[1] device_info["isagraf"] = {} device_info["isagraf"]["version"] = isagraf_version.rstrip() device_info["isagraf"]["note"] = "Soft Logic Controller, including runtime, and dev tools"
[docs] def parse_startup_script(f_handle: IO[bytes], device_info: dict) -> None: script_data = f_handle.read().decode("utf-8") device_info["startup_script"] = script_data
[docs] def parse_vxworks_script(f_handle: IO[bytes], device_info: dict) -> None: script_data = f_handle.read().decode("utf-8") device_info["vxworks_script"] = script_data
[docs] def parse_private_key(f_handle: IO[bytes], device_info: dict) -> None: key_data = f_handle.read().decode("utf-8") device_info["server_private_key"] = key_data
[docs] def parse_certificate(f_handle: IO[bytes], device_info: dict) -> None: cert_data = f_handle.read().decode("utf-8") device_info["server_certificate"] = cert_data
[docs] def parse_ike_config(f_handle: IO[bytes], device_info: dict) -> None: if "ike" not in device_info: device_info["ike"] = {} ike_data = "" lines = f_handle.read().decode("utf-8") for line in lines.split("\n"): # Remove comments from firewall rules if len(line.lstrip()) > 0 and not line.lstrip()[0] == "#": ike_data += line device_info["ike"]["config"] = ike_data
[docs] def parse_ike_ca(f_handle: IO[bytes], device_info: dict) -> None: if "ike" not in device_info: device_info["ike"] = {} ca_data = f_handle.read().decode("utf-8") device_info["ike"]["telvent_cert_auth"] = ca_data
[docs] def parse_ike_privkey(f_handle: IO[bytes], device_info: dict) -> None: if "ike" not in device_info: device_info["ike"] = {} ca_data = f_handle.read().decode("utf-8") device_info["ike"]["privkey"] = ca_data
[docs] def parse_ike_cert(f_handle: IO[bytes], device_info: dict) -> None: if "ike" not in device_info: device_info["ike"] = {} ca_data = f_handle.read().decode("utf-8") device_info["ike"]["cert"] = ca_data
[docs] def parse_port_comm_settings(proto: ET.Element, port_settings: dict[str, ElasticType]) -> None: comms = proto.findall("COMM")[0] comm_settings = {} _store_dict_items(comms, comm_settings) port_settings["comm_settings"] = comm_settings
[docs] def parse_port_proto_settings( proto_name: str, proto: ET.Element, port_settings: dict[str, ElasticType] ) -> None: protocol_field = proto.findall(proto_name)[0] _store_dict_items(protocol_field, port_settings["protocol"]) for child in protocol_field: _store_dict_items(child, port_settings["protocol"]) # TODO """ protocol_params = protocol_field.findall('AO_MAP')[0] store_dict_items(protocol_params, port_settings['protocol']) protocol_params = protocol_field.findall('AI_MAP')[0] store_dict_items(protocol_params, port_settings['protocol']) """
[docs] def parse_port(f_handle: IO[bytes], device_info: dict) -> None: if "ports" not in device_info: device_info["ports"] = {} port_settings = {} port = ET.fromstring(f_handle.read()) proto = port.findall("PROTOCOL")[0] for item in port.items(): if not item[0] == "VER" and not item[0] == "PNT": port_settings[item[0]] = item[1] elif item[0] == "PNT": port_name = item[1] for item in proto.items(): if item[0] == "TYPE": if not item[1] == "NONE": port_settings["protocol"] = {} port_settings["protocol"]["name"] = item[1] parse_port_comm_settings(proto, port_settings) parse_port_proto_settings(item[1], proto, port_settings) else: port_settings["protocol"] = None device_info["ports"][f"port_{port_name}"] = port_settings
[docs] def parse_features(f_handle: IO[bytes], device_info: dict) -> None: features = ET.fromstring(f_handle.read()) protocols = features.findall("PROTOCOLS")[0] device_info["feature_protocols"] = {} for proto in protocols: device_info["feature_protocols"][proto.text] = {} for item in proto.items(): device_info["feature_protocols"][proto.text][item[0]] = _convert_to_elastic(item[1])
[docs] def parse_3835(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) c3835 = root.findall("C3835") cards = {} for i in c3835: # Set key for cards dict devid = "" card = {} for item in i.items(): if item[0] == "DEVID": devid = item[1] else: card[item[0]] = _convert_to_elastic(item[1]) cards[devid] = card device_info["c3835"] = cards
[docs] def parse_dcana(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) dcana_settings = {} for child in root: child_dict = {} for item in child.items(): child_dict[item[0]] = _convert_to_elastic(item[1]) for grandchild in child: grandchild_dict = {} for item in grandchild.items(): if item[0] == "PNT": key = item[1] else: grandchild_dict[item[0]] = _convert_to_elastic(item[1]) child_dict[key] = grandchild_dict dcana_settings[child.tag] = child_dict device_info["dcana_settings"] = dcana_settings
[docs] def parse_calc(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) calc = {} for child in root: child_dict = {} for item in child.items(): child_dict[item[0]] = _convert_to_elastic(item[1]) for grand_child in child: grand_child_dict = {} # Traverse children scope and append to dictionary key = "" for item in grand_child.items(): if item[0] == "PN": key = item[1] else: grand_child_dict[item[0]] = _convert_to_elastic(item[1]) if key != "": child_dict[key] = grand_child_dict calc[child.tag] = child_dict device_info["calc"] = calc
[docs] def parse_rtu_sts(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) rtu_settings = {} for child in root: child_dict = {} for item in child.items(): child_dict[item[0]] = _convert_to_elastic(item[1]) for grand_child in child: grand_child_dict = {} for item in grand_child.items(): if item[0] == "PNT": key = item[1] else: grand_child_dict[item[0]] = _convert_to_elastic(item[1]) child_dict[key] = grand_child_dict rtu_settings[child.tag] = child_dict device_info["rtu_settings"] = rtu_settings
[docs] def parse_timing(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) timing_settings = {} for child in root: child_dict = {} for item in child.items(): child_dict[item[0]] = _convert_to_elastic(item[1]) timing_settings[child.tag] = child_dict device_info["timing_parameters"] = timing_settings
[docs] def parse_sfb(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) sfb_cards = {} for child in root: devid = child.items()[0] sfb_type = child.items()[1] child_dict = {} child_dict[sfb_type[0]] = _convert_to_elastic(sfb_type[1]) sfb_cards[devid[1]] = child_dict device_info["sfb_cards"] = sfb_cards
[docs] def parse_acc_rate(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) acc_param = root.findall("ACC_PARAM") acc_rate = {} for device in acc_param: device_acc_param = {} for item in device.items(): if item[0] == "DEVID": key = f"DevID_{item[1]}" else: device_acc_param[item[0]] = _convert_to_elastic(item[1]) acc_rate[key] = device_acc_param device_info["acc_param"] = acc_rate
[docs] def parse_alarming(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) alarm_child = root.findall("ALARMS")[0] alarms = {} for item in alarm_child.items(): alarms[item[0]] = _convert_to_elastic(item[1]) for map_child in alarm_child: for item in map_child.items(): alarms[f"{map_child.tag}_{item[0]}"] = _convert_to_elastic(item[1]) device_info["alarming"] = alarms
[docs] def parse_alarms(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) alarm_child = root.findall("ALARMS")[0] alarms = {} for device in alarm_child: alarm_pnt = {} for item in device.items(): if item[0] == "PNT": key = item[1] else: alarm_pnt[item[0]] = _convert_to_elastic(item[1]) alarms[key] = alarm_pnt device_info["alarms"] = alarms
[docs] def parse_almdev(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) dev_attrb = root.findall("DEVICE_ATTRIBUTES")[0] almdev = {} for child in dev_attrb: attribute = {} for item in child.items(): attribute[item[0]] = _convert_to_elastic(item[1]) for grandchild in child: for item in grandchild.items(): attribute[item[0]] = _convert_to_elastic(item[1]) almdev[child.tag] = attribute device_info["almdev"] = almdev
[docs] def parse_bbdi(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) bbdi = {} for item in root.items(): bbdi[item[0]] = _convert_to_elastic(item[1]) points = root.findall("POINTS")[0] for child in points: point = {} point_items = child.items() key = "" for item in point_items: if item[0] == "PNT": key = item[1] for item in point_items: if item[0] != "PNT": point[item[0]] = _convert_to_elastic(item[1]) bbdi[key] = point device_info["bbdi"] = bbdi
[docs] def parse_cbc(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) cbc = {} for child in root: child_dict = {} for item in child.items(): child_dict[item[0]] = _convert_to_elastic(item[1]) for grandchild in child: grandchild_dict = {} for item in grandchild.items(): if item[0] == "PNT" or item[0] == "NUM": key = item[1] else: grandchild_dict[item[0]] = _convert_to_elastic(item[1]) child_dict[key] = grandchild_dict cbc[child.tag] = child_dict device_info["cbc_config"] = cbc
[docs] def parse_ast(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) ast = {} for child in root: child_dict = {} for item in child.items(): child_dict[item[0]] = _convert_to_elastic(item[1]) for grandchild in child: grandchild_dict = {} for item in grandchild.items(): if item[0] == "PNT" or item[0] == "NUM": key = item[1] else: grandchild_dict[item[0]] = _convert_to_elastic(item[1]) child_dict[key] = grandchild_dict for ggchild in grandchild: ggchild_dict = {} for item in ggchild.items(): if item[0] == "PNT" or item[0] == "NUM": gkey = item[1] else: ggchild_dict[item[0]] = _convert_to_elastic(item[1]) child_dict[f"{grandchild.tag}_{gkey}"] = ggchild_dict ast[child.tag] = child_dict device_info["ast_config"] = ast
[docs] def parse_anunctor(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) annunciator = {} for child in root: child_dict = {} for item in child.items(): child_dict[item[0]] = _convert_to_elastic(item[1]) for grandchild in child: gchild_items = grandchild.items() child_dict[gchild_items[0][1]] = _convert_to_elastic(gchild_items[1][1]) annunciator[child.tag] = child_dict device_info["annunciator"] = annunciator
[docs] def parse_com_assignments(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) com_assignments = {} for port in root: port_items = port.items() # First entry is Num key = port_items[0][1] child_dict = {} for item in port_items[1:]: child_dict[item[0]] = _convert_to_elastic(item[1]) com_assignments[f"port_{key}"] = child_dict device_info["com_assignments"] = com_assignments
[docs] def parse_sprcodes(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) spr_codes = {} for spr_code in root: spr_items = spr_code.items() # First entry is PNT key = spr_items[0][1] spr_codes[key] = _convert_to_elastic(spr_items[1][1]) device_info["spr_code_map"] = spr_codes
[docs] def parse_leds(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) leds = {} for child in root: led_dict = {} for grandchild in child: gchild_items = grandchild.items() key = gchild_items[0][1] led_dict[key] = {} for item in gchild_items[1:]: led_dict[key][item[0]] = _convert_to_elastic(item[1]) leds[child.tag] = led_dict device_info["leds"] = leds
[docs] def parse_relays(f_handle: IO[bytes], device_info: dict) -> None: # TODO: may be losing items from Relay tree, double check from config again root = ET.fromstring(f_handle.read()) relay_tree = root.findall("RELAYS")[0] relays = {} for child in relay_tree: relay_dict = {} child_items = child.items() key = child_items[0][1] for item in child_items[1:]: relay_dict[item[0]] = _convert_to_elastic(item[1]) relays[key] = relay_dict device_info["relays"] = relays
[docs] def parse_nrgcalc(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) nrgcalc_tree = root.findall("NRGCALC_PARAM")[0] nrgcalcs = {} nrgcalcs["note"] = "Energy Calc Parameters" # Set top level fields for item in nrgcalc_tree.items(): nrgcalcs[item[0]] = _convert_to_elastic(item[1]) for child in nrgcalc_tree: child_dict = {} child_items = child.items() if child.tag == "DI_LIST": for grandchild in child: gc_dict = {} gc_items = grandchild.items() key = gc_items[0][1] for item in gc_items[1:]: gc_dict[item[0]] = _convert_to_elastic(item[1]) nrgcalcs[f"di_list_{key}"] = gc_dict else: key = child_items[0][1] for item in child_items[1:]: child_dict[item[0]] = _convert_to_elastic(item[1]) ais = [] for grandchild in child: gc_dict = {} gc_items = grandchild.items() for item in gc_items: gc_dict[item[0]] = _convert_to_elastic(item[1]) ais.append(gc_dict) child_dict["ais"] = ais nrgcalcs[key] = child_dict device_info["nrgcalc_settings"] = nrgcalcs
[docs] def parse_bbrelay(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) for child in root: child_dict = {} for item in child.items(): child_dict[item[0]] = _convert_to_elastic(item[1]) for grandchild in child: gchild_dict = {} gchild_items = grandchild.items() key = gchild_items[0][1] for item in gchild_items[1:]: gchild_dict[item[0]] = _convert_to_elastic(item[1]) child_dict[f"bb_{key}"] = gchild_dict device_info["bb_relays"] = child_dict
[docs] def parse_btstconf(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) btests = {} for child in root: child_dict = {} for item in child.items(): child_dict[item[0]] = _convert_to_elastic(item[1]) for gchild in child: gchild_dict = {} for element in gchild: elem_dict = {} items = element.items() key = items[0][1] for item in items[1:]: elem_dict[item[0]] = _convert_to_elastic(item[1]) gchild_dict[key] = elem_dict child_dict[gchild.tag] = gchild_dict btests[child.tag] = child_dict device_info["btest_config"] = btests
[docs] def parse_aci(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) fmr = root.findall("FMR")[0] fmr_dict = {} fmr_dict["note"] = "Oracle/Cisco ATG Customer Intellegence?" for item in fmr.items(): fmr_dict[item[0]] = _convert_to_elastic(item[1]) device_info["aci_db"] = fmr_dict
[docs] def parse_aci1250(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) aci = {} for child in root: child_dict = {} for item in child.items(): child_dict[item[0]] = _convert_to_elastic(item[1]) for gchild in child: gchild_dict = {} for item in gchild.items(): gchild_dict[item[0]] = _convert_to_elastic(item[1]) for element in gchild: items = element.items() for item in items: gchild_dict[item[0]] = _convert_to_elastic(item[1]) if element.tag == "SENSOR": for sensor in element: elem_dict = {} sensor_items = sensor.items() skey = sensor_items[0][1] for item in sensor_items[1:]: elem_dict[item[0]] = _convert_to_elastic(item[1]) gchild_dict[f"sensor {skey}"] = elem_dict child_dict[gchild.tag] = gchild_dict aci[child.tag] = child_dict device_info["aci1250_config"] = aci
[docs] def parse_userlog_settings_xml(f_handle: IO[bytes], device_info: dict) -> None: """ Parse XML tree from ``userlog.xml`` to find logging settings. Args: f_handle: file handler to read data from and parse xml device_info: dictionary of keys and values for target device """ userlog_settings = {} root = ET.fromstring(f_handle.read()) userlog = root.findall("USRLOG_PARAM")[0] for item in userlog.items(): if item[0] == "TASK_ENABLED": userlog_settings["userlog_enabled"] = _convert_to_elastic(item[1]) elif item[0] == "NUM_EVENTS": userlog_settings["userlog_num_events"] = _convert_to_elastic(item[1]) device_info["userlog_settings"] = userlog_settings
[docs] def parse_cmdlog_settings_xml(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) cmdlog_settings = {} for child in root: for item in child.items(): cmdlog_settings[item[0]] = _convert_to_elastic(item[1]) device_info["cmdlog_settings"] = cmdlog_settings
[docs] def parse_soelog_settings_xml(f_handle: IO[bytes], device_info: dict) -> None: root = ET.fromstring(f_handle.read()) soelog_settings = {} for child in root: for item in child.items(): soelog_settings[item[0]] = _convert_to_elastic(item[1]) device_info["soelog_settings"] = soelog_settings
[docs] def parse_ipcom_syslog(f_handle: IO[bytes], device_info: dict) -> None: """ Parse ``syslog`` and ``syslog.0`` files from ``ipcom/`` directory on the Sage. """ # TODO: unit test data = f_handle.read().decode() events = [] lines = [x.strip() for x in data.strip().splitlines() if x.strip()] for line in lines: parts = [x.strip() for x in line.split(": ") if x.strip()] event = { "timestamp": parts[0], "raw_task": parts[1], # "iptelnets" which is actually "ipcom_telnetspawn" in task list # However, the Task ID here actually resolves to ipcom_telnetd "short_task_name": parts[1].split("[")[0], "task_id": parts[1].split("[")[1].split("]")[0], "level": parts[2], "service_name": parts[3].replace(":", "").strip(), "message": ": ".join(parts[4:]), "original": line, } events.append(event) if not device_info.get("ipcom_syslog_events"): device_info["ipcom_syslog_events"] = [] device_info["ipcom_syslog_events"].extend(events)
[docs] def process_ipcom_syslog(dev: DeviceData, events: list[dict]) -> None: """ Process parsed ``ipcom/syslog`` events into the PEAT device data model. """ for e_dict in events: event = Event( category={"network", "process", "session"}, created=utils.parse_date(e_dict["timestamp"]), dataset="ipcom_syslog", kind={"event"}, message=e_dict["message"], module="Sage", original=e_dict["original"], outcome="failure" if "error" in e_dict["level"].lower() else "unknown", provider=dev.ip, type={"connection"}, extra={ "raw_task": e_dict["raw_task"], "short_task_name": e_dict["short_task_name"], "task_id": e_dict["task_id"], "service_name": e_dict["service_name"], "level": e_dict["level"].lower(), }, ) if "error" in e_dict["level"].lower(): event.type.add("error") if "ipcom_accept" in event.message: event.type.add("access") dev.store("event", event)
[docs] def parse_logfiles(f_handle: IO[bytes], device_info: dict) -> None: """ Parse the contents of ``SYSLOG.LOG``, ``USERLOG.LOG``, and ``soelog.txt``. This creates a list of dicts with structured event information. Known locations of logfiles: - ``/ata0a/Webfiles/LOGS/SYSLOG.LOG`` - ``/ata0a/Webfiles/LOGS/USERLOG.LOG`` - ``/ramDrv/soelog.txt`` """ data = f_handle.read().decode() # Replace NUL characters with spaces data = re.sub("[\x00]", " ", data) lines = [x.strip() for x in data.strip().splitlines() if x.strip()] # First line with "NextSlot" can be ignored. if "NextSlot" in lines[0]: lines = lines[1:] # SYSLOG.LOG has events across two separate lines if "DT=" not in data: lines = ["\n".join(x) for x in zip(lines[0::2], lines[1::2], strict=False)] events = [] for raw_line in lines: # USERLOG.LOG, soelog.txt if raw_line.startswith("<REC"): # ID: identifier? NOTE: these are NOT sequential by time # DT: date. Ex: 01/01/2025 # TM: time. Ex: 00:00:00.000 # USERLOG.LOG: ID, DT, TM, TYPE, TXT # TYPE: "Logged Out" # TXT: "192.0.2.20-FTP, Admin" # soelog.txt: ID, DT, TM, VAL, PN, DN # VAL: "0" or "1" # PN: "MAX LOGIN FAILURES EXCEEDED" # DN: "RTU Internal Status" line_tag = ET.fromstring(raw_line) pairs = {k.strip(): v.strip() for k, v in line_tag.attrib.items()} event = { "timestamp": f"{pairs['DT']} {pairs['TM']}", # NOTE: the events in the file are not listed in order of sequence! # HOWEVER, the ID= field is an integer that IS a proper ordered sequence. "id": int(pairs["ID"]), "original": raw_line, "extra": {}, } # USERLOG.LOG if pairs.get("TYPE"): event["message"] = f"{pairs['TYPE']} {pairs['TXT']}" # logged in => 192.0.2.1-TSHELL, Admin, LCL_DB # logged out => 192.0.2.1-TSHELL, Admin # logged in => 192.0.2.1-FTP, Admin, LCL_DB # logged out => 192.0.2.1-FTP, Admin if "logged" in pairs["TYPE"].lower() and pairs["TXT"].count(".") >= 3: dash_parts = pairs["TXT"].partition("-") event["extra"]["ip"] = dash_parts[0].strip() comma_parts = dash_parts[2].split(",") event["extra"]["service"] = comma_parts[0].strip() if event["extra"]["service"] == "TSHELL": event["extra"]["service"] = "Telnet" event["extra"]["user"] = comma_parts[1].strip() # Power Up => RTU Started Up, Mode: NORMAL elif "Mode:" in pairs["TXT"]: event["extra"]["mode"] = pairs["TXT"].split("Mode:")[1].strip() # soelog.txt if pairs.get("VAL"): event["extra"]["VAL"] = int(pairs["VAL"]) event["message"] = pairs["PN"] # This seems to always be "RTU Internal Status" event["extra"]["DN"] = pairs["DN"] if pairs["DN"] != "RTU Internal Status": event["message"] += " " + pairs["DN"] # SYSLOG.LOG else: line = re.sub(r" {2,}", " ", raw_line).strip() chunks = line.split("\n") m_parts = chunks[0].strip().split(" ") event = { "id": int(m_parts[0]), "timestamp": " ".join(m_parts[1:2]), "message": chunks[1].strip(), "original": raw_line, "extra": { # This seems to always be "startup" "stage": m_parts[3], # "GPST task" # "m1_uif" # "startup" # "inCrashMode" "task": " ".join(m_parts[4:]), }, } events.append(event) # TODO: return a list instead of mutating device_info if not device_info.get("raw_events"): device_info["raw_events"] = [] device_info["raw_events"].extend(events)
[docs] def process_logfile_events(dev: DeviceData, events: list[dict]) -> None: for e_dict in events: event = Event( created=utils.parse_date(e_dict["timestamp"]), dataset="logfiles", kind={"event"}, message=e_dict["message"], module="Sage", original=e_dict["original"], provider=dev.ip, sequence=e_dict["id"], extra={**e_dict["extra"]}, ) if event.extra.get("TYPE"): # USERLOG.LOG event.dataset = "userlog" elif event.extra.get("DN"): # soelog.txt event.dataset = "soelog" elif event.extra.get("stage"): # SYSLOG.LOG event.dataset = "syslog_file" else: # shouldn't ever hit this branch event.dataset = "unknown_log_file" lower_msg = event.original.lower() if event.extra.get("TYPE"): e_type = event.extra["TYPE"].lower() event.action = e_type.replace(" ", "-") if "power" in e_type: event.category.add("host") event.type.add("start") elif e_type.startswith("logged"): event.category.add("authentication") event.category.add("session") event.outcome = "success" event.type.add("access") if "logged in" in e_type: event.type.add("start") elif "logged out" in e_type: event.type.add("end") # NOTE: rtu_sts.xml lists the possible event strings for soelog.txt if "logged in" in lower_msg: event.outcome = "success" if "login" in lower_msg or "logged in" in lower_msg: event.category.add("authentication") event.type.add("access") if "login failure" in lower_msg: event.type.add("denied") # "LOGIN FAILURE" # "TIME SRC FAIL" if "failure" in lower_msg or "fail " in lower_msg: event.outcome = "failure" if "startup" in event.original.lower() or "started up" in event.original.lower(): event.type.add("start") if event.extra.get("ip"): dev.related.ip.add(event.extra["ip"]) dev.related.user.add(event.extra["user"]) event.category.add("network") event.type.add("access") event.type.add("connection") dev.store("event", event)
[docs] def parse_file(path: PurePath, f_handle: IO[bytes], device_info: dict) -> bool: """ Parse a Sage file using the appropriate parsing function, and update the ``device_info`` dict with extracted data from that file. Returns: Bool indicating if a parsing function was found for the file. """ f_name_lower = path.name.lower() # TODO: return dicts from functions instead of mutating device_info inside functions if f_name_lower in FILE_PROCESSING_MAP: FILE_PROCESSING_MAP[f_name_lower](f_handle, device_info) else: path_str = path.as_posix() if re.match(r"firewall/.*\.cfg", path_str, re.IGNORECASE): parse_firewall(f_handle, device_info) elif re.match(r"xml/port.*\.xml", path_str, re.IGNORECASE): parse_port(f_handle, device_info) # syslog or syslog.{x} (where x=an integer, e.g. "syslog.0") elif path.name.startswith("syslog") and len(path.name) in [6, 8]: parse_ipcom_syslog(f_handle, device_info) else: # File is not one of interest, continue log.trace2(f"Skipping file '{path}' (no parser for it)") return False return True
# tar => FTP: # xml/* => /ata0a/Webfiles/xml/* # ethernet.xml => /ata0a/ethernet.xml # bootline.xml => /ata0a/bootline.xml # isagraf/ISaGRAF.TXT => /ata0a/Webfiles/ISaGRAF/ISaGRAF.TXT # scripts/* => /ata0a/scripts/* # server.key => /ata0a/ssl/private/server.key # server.crt => /ata0a/ssl/cert/server.crt # ike/* => /ata0a/ike/* # Dynamic function dispatch to the appropriate parser based on the input file # NOTE: keys must be lowercase to ensure filenames match properly in all cases FILE_PROCESSING_MAP: Final[dict[str, Callable]] = { "syslog.log": parse_logfiles, "userlog.log": parse_logfiles, "soelog.txt": parse_logfiles, "syslog": parse_ipcom_syslog, "syslog.0": parse_ipcom_syslog, "access.xml": parse_access_xml, "rtusetup.xml": parse_rtu_setup, "tte.xml": parse_tte, "userlog.xml": parse_userlog_settings_xml, "time.xml": parse_time, "ethernet.xml": parse_ether, "bootline.xml": parse_bootline_xml, "isagraf.txt": parse_isagraf, "startup.scp": parse_startup_script, "vxworks_start.scp": parse_vxworks_script, "server.key": parse_private_key, "server.crt": parse_certificate, "ike.cfg": parse_ike_config, # "ike/ca_store/telvent_ca.cer": parse_ike_ca, "telvent_ca.cer": parse_ike_ca, # "ike/private/priv0.key": parse_ike_privkey, "priv0.key": parse_ike_privkey, # "ike/cert/cert0.cer": parse_ike_cert, "cert0.cer": parse_ike_cert, "features.xml": parse_features, "c3835.xml": parse_3835, "dcana.xml": parse_dcana, "timing.xml": parse_timing, "calc.xml": parse_calc, "rtu_sts.xml": parse_rtu_sts, "sfb.xml": parse_sfb, "acc_rate.xml": parse_acc_rate, "alarms.xml": parse_alarms, "alarming.xml": parse_alarming, "almdev.xml": parse_almdev, "bbdi.xml": parse_bbdi, "cbcconf.xml": parse_cbc, "astconf.xml": parse_ast, "anunctor.xml": parse_anunctor, "cmdlog.xml": parse_cmdlog_settings_xml, "comasign.xml": parse_com_assignments, "sprcodes.xml": parse_sprcodes, "soelog.xml": parse_soelog_settings_xml, "leds.xml": parse_leds, "relays.xml": parse_relays, "nrgcalc.xml": parse_nrgcalc, "bbrelay.xml": parse_bbrelay, "btstconf.xml": parse_btstconf, "aci.xml": parse_aci, "aci1250.xml": parse_aci1250, }