Source code for peat.modules.camlin.totus_http

from collections.abc import Callable
from datetime import timedelta
from pathlib import PurePath

from bs4 import BeautifulSoup

from peat import DeviceData, config, consts, log, utils
from peat.data import Interface, Register, Service, SSHKey, User
from peat.protocols import HTTP, clean_mac


[docs] class TotusHTTP(HTTP): """ HTTP interface for the Camlin Totus Dissolved Gas Analyzer (DGA). """ DEFAULT_HEADERS = { "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="90"', "X-Requested-With": "XMLHttpRequest", "sec-ch-ua-mobile": "?0", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/90.0.4430.212 Safari/537.36" ), } def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.methods: dict[str, dict[str, str | Callable]] = { "hardware_info": { "page": "system/api/1/hardware-info", "process_method": self.process_hardware_info, }, "timedate": { "page": "system/api/1/timedate", "process_method": self.process_timedate, }, "ntp_config": { "page": "system/api/1/ntp/config", "process_method": self.process_ntp_config, }, "ntp_status": { "page": "system/api/1/ntp/status", "process_method": self.process_ntp_status, }, "network_configuration": { "page": "system/api/1/network/configuration", "process_method": self.process_network_configuration, }, "users": { "page": "auth/api/v0/users", "process_method": self.process_users, }, # NOTE: role processing MUST be called after user processing! "roles": { "page": "auth/api/v0/roles", "process_method": self.process_roles, }, "system_info": { "page": "totus/api/1.0/system/info", "process_method": self.process_system_info, }, "serial_ports": { "page": "totus/api/1.0/system/serial-ports", "process_method": self.process_serial_ports, }, "ssh_keys": { "page": "system/api/1/ssh-keys/root", "process_method": self.process_ssh_keys, }, "openvpn": { "page": "system/api/1/openvpn", "process_method": self.process_openvpn, }, "wifihotspot": { "page": "system/api/1/wifihotspot", "process_method": self.process_wifihotspot, }, "modbus_interfaces": { "page": "totus/api/1.0/modbus/interfaces", "process_method": self.process_modbus_interfaces, }, "dnp3_channels": { "page": "totus/api/1.0/dnp3/channels", "process_method": self.process_dnp3_channels, }, "modbus_map": { "page": "totus/api/1.0/modbus/register-map", "parse_method": self.parse_modbus_map, "process_method": self.process_modbus_map, }, "dnp3_map": { "page": "totus/api/1.0/dnp3/register-map", "parse_method": self.parse_dnp3_map, "process_method": self.process_dnp3_map, }, } @property def logged_in(self) -> bool: return bool(self.connected and self.session.headers.get("X-XSRF-TOKEN"))
[docs] def login(self, username: str, password: str) -> bool: if self.logged_in: self.log.debug("Skipping login since we're already logged in") return True if config.DEBUG: self.log.trace(f"Logging in with username '{username}' and password '{password}'") else: self.log.debug(f"Logging in with username '{username}'") # Get login XSRF (Cross-site request forgery) token # The token is required for all future requests to DGA try: # NOTE: This request will return 401 unauthorized, which doesn't # matter since we just need a valid XSRF token. profile_response = self.get(page="auth/api/v0/profile", allow_errors=True) if profile_response is None or not profile_response.cookies: self.log.error("Login failed: bad response") return False token = profile_response.cookies["XSRF-TOKEN"] # TODO: use self.url, ensures correct protocol (http/https) login_response = self.post( f"http://{self.ip}:{self.port}/auth/login", data={"_csrf": token, "username": username, "password": password}, ) if not login_response or login_response.status_code != 200: self.log.error("Login failed: no response or non-200 status code") return False # Setting token for nginx to accept authentication/cookies self.session.headers["X-XSRF-TOKEN"] = token return True except Exception as ex: self.log.error(f"Login failed: {ex}") return False
[docs] def get_and_process_all(self, dev: DeviceData) -> bool: """ Get all data and process any successful retrievals into device data model. Returns: If at least one method was successful """ at_least_one_success = False failed_methods = [] for label, method in self.methods.items(): self.log.info(f"Getting '{label}' data from {method['page']}") try: response = self.get(page=method["page"]) if not response or not response.text: self.log.warning( f"Failed to get {label} from {method['page']}: no data or error response" ) failed_methods.append(label) continue # methods that work on text data if method.get("parse_method"): self.log.debug(f"Parsing raw {label} data...") parsed_data = method["parse_method"](response.text) if not parsed_data: self.log.warning(f"Failed to parse {label} data") failed_methods.append(label) continue # methods that work on JSON data else: parsed_data = response.json() # Only add some data to host.extra if label not in ["ssh_keys", "users", "roles"]: dev.extra[label] = parsed_data if config.DEVICE_DIR: dev.write_file( data=parsed_data, filename=f"{label}.json", out_dir=dev.get_sub_dir("http_json_data"), ) # call the process function to put data into device data model self.log.debug(f"Processing parsed {label} data into the data model...") method["process_method"](dev, parsed_data) at_least_one_success = True except Exception as ex: self.log.exception(f"'{label}' method failed with unhandled exception: {ex}") failed_methods.append(label) self.log.info( f"Finished getting and processing data from {dev.ip} using {len(self.methods)} methods" ) if failed_methods: failed_str = "\n\t".join(failed_methods) self.log.warning( f"{len(failed_methods)} methods failed out of " f"{len(self.methods)} total methods for {dev.ip}" f"\n** Failed methods **\n{failed_str}" ) return at_least_one_success
[docs] @staticmethod def process_hardware_info(dev: DeviceData, hw_info: dict) -> None: dev.serial_number = hw_info["serialNumber"] dev.firmware.version = hw_info["softwareVersion"] dev.hostname = hw_info["hostname"] # TODO: ephemeral data from hw_info: freemem, dataFree, dataUsed dev.uptime = int(hw_info["uptime"]) # Hardware information dev.architecture = hw_info["arch"] dev.hardware.memory_total = int(hw_info["totalmem"]) dev.hardware.id = str(hw_info["hardwareID"]) dev.hardware.storage_available = int(hw_info["dataSize"]) # Operating system dev.os.name = hw_info["platform"] # TODO: auto-populate os.version from os.kernel in data model dev.os.kernel = hw_info["release"] dev.os.version = hw_info["release"].partition("-")[0]
[docs] @staticmethod def process_timedate(dev: DeviceData, time_info: dict) -> None: dev.geo.timezone = time_info["timezone"] if dev.uptime: # Calculate start_time from device's timestamp - uptime curr_time = utils.parse_date(time_info["time"]) if curr_time: dev.start_time = curr_time - timedelta(seconds=dev.uptime.total_seconds())
[docs] @staticmethod def process_ntp_config(dev: DeviceData, ntp_config: dict) -> None: """ Add GPS/NTP remotes to set of "related" hosts and IPs. """ for time_source in ntp_config: if utils.is_ip(time_source): dev.related.ip.add(time_source) else: dev.related.hosts.add(time_source)
[docs] @staticmethod def process_ntp_status(dev: DeviceData, ntp_status: dict) -> None: """ Add GPS/NTP peers to set of "related" hosts and IPs. """ for peer in ntp_status.get("status", {}).get("peers", []): if peer.get("remote"): if utils.is_ip(peer["remote"]): dev.related.ip.add(peer["remote"]) else: dev.related.hosts.add(peer["remote"])
@staticmethod def __process_if_dict(dev: DeviceData, iface: Interface, if_dict: dict) -> None: """ Process fields that are in both "devices" and "connections.status". """ if if_dict.get("type"): iface.type = if_dict["type"].lower() if not iface.ip and if_dict.get("ipAddress"): iface.ip = if_dict["ipAddress"] if if_dict.get("macAddress"): mac = clean_mac(if_dict["macAddress"]) dev.related.mac.add(mac) if not iface.mac: iface.mac = mac # Add the IP to the interface PEAT is talking to if iface.mac == dev.mac: if not iface.ip: iface.ip = dev.ip if not iface.hostname: iface.hostname = dev.hostname # this can be zero, hence the "None" checks if iface.speed is None and if_dict.get("speed") is not None: iface.speed = int(if_dict["speed"]) if not iface.subnet_mask and if_dict.get("netmask"): iface.subnet_mask = if_dict["netmask"] if not iface.gateway and if_dict.get("defaultGateway"): iface.gateway = if_dict["defaultGateway"] if not iface.id and if_dict.get("id"): iface.id = if_dict["id"] if if_dict.get("deviceState"): iface.extra["device_state"] = if_dict["deviceState"] if if_dict["deviceState"] != "unmanaged": iface.enabled = True else: iface.enabled = False if if_dict["deviceState"] == "connected": iface.connected = True else: iface.connected = False if "carrier" in if_dict: # "carrier" field indicates if there's a carrier signal on Ethernet # interfaces (e.g. a cable is connected to a switch). iface.connected = bool(if_dict["carrier"]) # modem interface data if if_dict.get("type", "").lower() == "modem": if if_dict.get("modemState"): if if_dict["modemState"] == "enabled": iface.enabled = True elif if_dict.get("modemState") == "connected": iface.enabled = True iface.connected = True elif iface.connected is None: iface.connected = False if if_dict.get("fwVersion"): iface.version = str(if_dict.get("fwVersion", "")) if not iface.description.vendor.name: iface.description.vendor.name = if_dict.get("manufacturer", "") if not iface.description.model: iface.description.model = str(if_dict.get("model", "")) if if_dict.get("modemState") == "connected": iface.connected = True else: iface.connected = False iface.extra.update( { "modem_state": str(if_dict.get("modemState", "")), "access_tech": str(if_dict.get("accessTech", "")), "signal": str(if_dict.get("signal", "")), "primary_port": str(if_dict.get("primaryPort", "")), "device": str(if_dict.get("device", "")), "equipment_id": str(if_dict.get("equipmentId", "")), "registration_state": str(if_dict.get("registrationState", "")), "network": str(if_dict.get("network", "")), } )
[docs] @staticmethod def process_network_configuration(dev: DeviceData, net_config: dict[str, list[dict]]) -> None: for raw_if in net_config["devices"]: iface = Interface(name=raw_if.get("name", "")) # Annotate fields on the Interface object TotusHTTP.__process_if_dict(dev, iface, raw_if) # Store into data model dev.store("interface", iface, lookup=["name", "mac", "ip"]) for conn in net_config["connections"]: settings = conn.get("settings", {}) status = conn.get("status", {}) if settings.get("username"): dev.related.user.add(settings["username"]) if_name = settings.get("interfaceName") if if_name: iface = dev.retrieve("interface", {"name": if_name}) # type: Interface if not iface: iface = Interface(name=if_name) dev.store("interface", iface, lookup="name") # Annotate fields on the Interface object TotusHTTP.__process_if_dict(dev, iface, status) TotusHTTP.__process_if_dict(dev, iface, settings) if conn.get("id"): iface.extra["connection_id"] = conn["id"] if settings.get("name"): iface.description.description = settings["name"] for s_key in ["autoconnect", "apn", "username"]: if s_key in settings: iface.extra[s_key] = settings[s_key] # Save any IPs to related.ip for ip_path in [ "settings.ipAddress", "settings.defaultGateway", "settings.primaryDns", "status.ipAddress", "status.defaultGateway", "status.primaryDns", ]: ip = utils.deep_get(conn, ip_path, "") if ip and utils.is_ip(ip): dev.related.ip.add(ip) # Save any MACs to related.mac for mac_path in ["settings.macAddress", "status.macAddress"]: mac = clean_mac(utils.deep_get(conn, mac_path, "")) if mac: dev.related.mac.add(mac)
[docs] @staticmethod def process_users(dev: DeviceData, users: dict[str, list[dict[str, str | int]]]) -> None: for user_dict in users["items"]: dev.related.user.add(user_dict.get("sub", "")) dev.related.user.add(user_dict.get("name", "")) user = User( name=user_dict.get("sub", ""), full_name=user_dict.get("name", ""), ) if user_dict.get("id") is not None: user.id = str(user_dict["id"]) if user_dict.get("role"): dev.related.roles.add(user_dict["role"]) user.roles.add(user_dict["role"]) if user_dict.get("state"): user.extra["state"] = user_dict["state"] if user_dict.get("iss"): user.extra["iss"] = user_dict["iss"] if "://" in user_dict["iss"]: dev.related.urls.add(user_dict["iss"]) dev.store("users", user, lookup="name")
[docs] @staticmethod def process_roles(dev: DeviceData, roles: dict[str, list[dict]]) -> None: """ Permissions allocated to roles. NOTE: This MUST be called after process_users()! """ for role in roles["items"]: if not role.get("name"): log.warning(f"Skipping role with no name for {dev.get_comm_id()}: {role}") continue dev.related.roles.add(role["name"]) # lookup user, if role name in user roles, then add permissions # this MUST be called after process_users! if role.get("permissions"): for user in dev.users: if role["name"] in user.roles: user.permissions.update(role["permissions"])
[docs] @staticmethod def process_system_info(dev: DeviceData, system_info: dict) -> None: if not dev.firmware.version and system_info.get("softwareVersion"): dev.firmware.version = str(system_info["softwareVersion"]) if not dev.hostname and system_info.get("hostname"): dev.hostname = str(system_info["hostname"])
[docs] @staticmethod def process_serial_ports(dev: DeviceData, serial_ports: dict) -> None: """ Serial ports on device. """ for ser_dev in serial_ports: iface = Interface(name=ser_dev["name"], type="serial", serial_port=ser_dev["device"]) if ser_dev.get("flow_control") and "none" in ser_dev["flow_control"]: iface.flow_control = "none" dev.store("interface", iface, lookup=["name", "serial_port"])
[docs] @staticmethod def process_ssh_keys(dev: DeviceData, ssh_keys: dict) -> None: """ Extract usernames, IPs, and/or hostnames from SSH public keys. """ for key_dict in ssh_keys: if not key_dict: continue if not key_dict.get("publicKey"): log.warning( f"Skipping invalid ssh key for {dev.get_comm_id()}: " f"no 'publicKey' field\nRaw key: {key_dict}" ) continue # Possible values for publicKey: # "ssh-ed25519@22:1.2.3.4 0x6c91e1...f8157" # "ssh-rsa AAAAbbbb...cccc user@hostname\n" # "ssh-rsa AAAAbbbb...cccc rsa-key-<timestamp>" # "/root/.ssh/keyname.pub" # "keyname.pub" key_obj = SSHKey( id=key_dict.get("keyId", "").strip(), original=key_dict["publicKey"].strip(), type="public", ) key_parts = key_obj.original.split(" ") if not key_parts or len(key_parts) == 1: if len(key_parts) == 1 and "." in key_parts[0]: key_obj.file.path = PurePath(key_parts[0]) dev.related.files.add(key_parts[0]) else: log.warning( f"TotusHTTP: failed to parse SSH key from {dev.ip} (key='{key_parts}')" ) continue # "ssh-ed25519@22:1.2.3.4 <key>" if "@" in key_parts[0]: p2 = key_parts[0].partition("@")[2] if ":" in p2: p2 = p2.partition(":")[2] if utils.is_ip(p2): dev.related.ip.add(p2) else: dev.related.hosts.add(p2) key_obj.host = p2 # "<key> root@testb" if len(key_parts) >= 3: if "@" in key_parts[2]: user = key_parts[2].partition("@")[0].strip() dev.related.user.add(user) key_obj.user = user host = key_parts[2].partition("@")[2].strip() if utils.is_ip(host): dev.related.ip.add(host) else: dev.related.hosts.add(host) key_obj.host = host elif utils.is_ip(key_parts[2]): dev.related.ip.add(key_parts[2]) key_obj.host = key_parts[2] dev.store("ssh_keys", key_obj, lookup="id")
[docs] @staticmethod def process_openvpn(dev: DeviceData, openvpn: dict) -> None: """ Extract hostnames and/or IPs from OpenVPN configs. """ for ov_profile in openvpn.get("profiles", []): # Add any IPs found in "address" to remote.ip if ov_profile.get("address"): for chunk in ov_profile["address"].strip().split(","): if utils.is_ip(chunk): dev.related.ip.add(chunk) # Add remote hosts to related.ip and/or related.hosts for remote in ov_profile.get("remotes", []): if not remote.get("host"): continue if utils.is_ip(remote["host"]): dev.related.ip.add(remote["host"]) else: dev.related.hosts.add(remote["host"])
[docs] @staticmethod def process_wifihotspot(dev: DeviceData, wifihotspot: dict) -> None: # TODO: what interface is this internally? (in output of "ip addr" command) iface = Interface(name="wifihotspot", enabled=False, type="wifi") if wifihotspot.get("status") != "off": iface.enabled = True iface.description.description = ( f"Wi-Fi Hotspot with an SSID of {wifihotspot.get('ssid', '')}" ) iface.extra.update(wifihotspot) if iface.extra.get("password"): del iface.extra["password"] dev.store("interface", iface, lookup="name")
[docs] @staticmethod def process_modbus_interfaces(dev: DeviceData, modbus_interfaces: dict) -> None: """ Modbus configuration. """ for mb_if in modbus_interfaces: svc = Service(protocol="modbus", enabled=True) if mb_if.get("port"): svc.port = int(mb_if["port"]) if mb_if.get("slaveAddress") is not None: svc.protocol_id = str(mb_if["slaveAddress"]) if mb_if.get("name"): svc.extra["name"] = str(mb_if["name"]) if mb_if.get("id") is not None: svc.extra["id"] = mb_if["id"] interface_lookup = None if mb_if.get("device") == "tcp": svc.protocol = "modbus_tcp" svc.transport = "tcp" interface_lookup = {"ip": dev.ip} if svc.port: dev.related.ports.add(svc.port) if svc.protocol: dev.related.protocols.add(svc.protocol) dev.store("service", svc, interface_lookup=interface_lookup)
[docs] @staticmethod def process_dnp3_channels(dev: DeviceData, dnp3_channels: dict) -> None: """ DNP3 configuration. """ for dnp3_ch in dnp3_channels: svc = Service( protocol="dnp3", port=int(dnp3_ch["config"]["port"]), enabled=True, protocol_id=str(dnp3_ch.get("localAddress", "")), extra={ "id": str(dnp3_ch.get("id", "")), "name": str(dnp3_ch.get("name", "")), }, ) if dnp3_ch.get("channel", "").lower() == "tcp": svc.transport = "tcp" elif dnp3_ch.get("channel") is not None: svc.extra["channel"] = dnp3_ch["channel"] if dnp3_ch.get("bufferSize") is not None: svc.extra["buffer_size"] = int(dnp3_ch["bufferSize"]) if dnp3_ch.get("remoteAddress") is not None: svc.extra["remote_address"] = int(dnp3_ch["remoteAddress"]) if dnp3_ch.get("keepaliveTimeout") is not None: svc.extra["keepalive_timeout"] = int(dnp3_ch["keepaliveTimeout"]) if svc.port: dev.related.ports.add(svc.port) if svc.protocol: dev.related.protocols.add(svc.protocol) dev.store("service", svc, interface_lookup={"ip": dev.ip})
[docs] @staticmethod def parse_modbus_map(modbus_map_html: str) -> list[dict]: """ Parse raw HTML page with the Modbus register map. """ mb_soup = BeautifulSoup(modbus_map_html, features=consts.BS4_PARSER) mb_tables = list(mb_soup.body.find_all("table")) ht_body = mb_tables[0] # In some cases it has "tbody", in others it doesn't. # Might be a difference between software versions. if mb_tables[0].find("tbody"): ht_body = mb_tables[0].find("tbody") mb_headers = [ str(th.string).strip().lower().replace(" ", "_") if th.string else "" for th in ht_body.find("tr").find_all("th") ] mb_data = [] for m_tbl in mb_tables: m_tbl_body = m_tbl if m_tbl.find("tbody"): m_tbl_body = m_tbl.find("tbody") for tbl_entry in m_tbl_body.find_all("tr")[1:]: mb_data.append( { h: str(td.string) if td.string else None for h, td in zip(mb_headers, tbl_entry.find_all("td"), strict=False) } ) return mb_data
[docs] @staticmethod def process_modbus_map(dev: DeviceData, mb_data: list[dict]) -> None: for entry in mb_data: # TODO: handle number_of_registers > 1 reg = Register( protocol="modbus", address=entry["address"], data_type=entry.get("format", "").lower(), tag=entry.get("topic", ""), ) if "read only" in entry.get("access", "").lower(): reg.read_write = "read" if entry.get("notes"): reg.description = entry["notes"] if entry.get("register") is not None: reg.extra["register"] = int(entry["register"]) if entry.get("number_of_registers") is not None: reg.extra["number_of_registers"] = int(entry["number_of_registers"]) if entry.get("scaling") is not None: reg.extra["scaling"] = float(entry["scaling"]) if entry.get("offset") is not None: reg.extra["offset"] = entry["offset"] dev.store( "registers", reg, lookup={"protocol": reg.protocol, "address": reg.address}, )
[docs] @staticmethod def parse_dnp3_map(dnp3_map_html: str) -> dict[str, list[dict]]: """ Parse raw HTML page with the DNP3 register map. """ dnp3_soup = BeautifulSoup(dnp3_map_html, features=consts.BS4_PARSER) # Analogue Inputs, Digital Inputs dnp3_sections = { str(table_header.string): table for table_header, table in zip( list(dnp3_soup.body.find_all("h2")), list(dnp3_soup.body.find_all("table")), strict=False, ) } dnp3_data = {} for section_name, section_table in dnp3_sections.items(): t_type = section_name.split(" ")[0].lower().replace("s", "") t_type = t_type.replace("analogue", "analog") # TODO: add to dev.io? # direction = section_name.split(" ")[1].lower().replace("s", "") parsed_section = [] section_headers = [ str(th.string).strip().lower().replace(" ", "_") if th.string else "" for th in section_table.find("thead").find("tr").find_all("th") ] tbl_body = section_table if section_table.find("tbody"): tbl_body = section_table.find("tbody") for tbl_entry in tbl_body.find_all("tr")[1:]: parsed_section.append( { h: str(td.string) if td.string else None for h, td in zip(section_headers, tbl_entry.find_all("td"), strict=False) } ) dnp3_data[section_name] = parsed_section return dnp3_data
[docs] @staticmethod def process_dnp3_map(dev: DeviceData, dnp3_data: dict[str, list[dict]]) -> None: # "analogue inputs", "digital inputs" for section_name, section_data in dnp3_data.items(): section_type = section_name.split(" ")[0].lower().replace("s", "") section_type = section_type.replace("analogue", "analog") for entry in section_data: reg = Register( protocol="dnp3", address=entry.get("index", ""), tag=entry.get("topic", ""), measurement_type=section_type, ) if entry.get("description"): reg.description = entry["description"] if entry.get("type") is not None: reg.data_type = entry["type"].lower() if entry.get("class") is not None: reg.extra["class"] = entry["class"] if entry.get("scale") is not None: reg.extra["scale"] = float(entry["scale"]) if entry.get("offset") is not None: reg.extra["offset"] = entry["offset"] dev.store( "registers", reg, lookup={ "protocol": reg.protocol, "address": reg.address, "measurement_type": reg.measurement_type, }, )
__all__ = ["TotusHTTP"]