Source code for peat.parsing.command_parsers

"""
Parse and/or process the output of various Linux commands and files,
such as ``arp -a``, ``/proc/meminfo``, ``/var/log/messages``, etc.

References for /proc (aka, "procfs"):

- https://www.kernel.org/doc/html/latest/filesystems/proc.html
- https://tldp.org/LDP/Linux-Filesystem-Hierarchy/html/proc.html
- https://www.man7.org/linux/man-pages/man5/procfs.5.html
- https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/deployment_guide/s1-proc-topfiles

"""

import datetime
import itertools
import json
import os.path
import re
from collections import defaultdict
from pathlib import PurePosixPath

import humanfriendly
from humanfriendly.text import split_paragraphs

from peat import DeviceData, Event, Interface, log, utils
from peat.data.models import File, Service, User


[docs] class NixParserBase: """ Base class for nix file and command parsers (Linux, VxWorks, etc.). """ # file paths/names for file parsers file: PurePosixPath | None = None paths: list[PurePosixPath] = [] # list of commands with arguments command: str = "" commands: list[str] = []
[docs] @classmethod def parse_and_process(cls, to_parse: str, dev: DeviceData) -> bool: """ Parse the data, then process it into the device data model. """ if not to_parse: log.warning(f"{cls.__name__}: no data to parse") return False # Save raw file or command output to file if cls.file: # PEAT's auto-serialization of JSON works against # us here, so manually load it then save. file_data = to_parse if cls.file.suffix == ".json": file_data = json.loads(file_data) dev.write_file( data=file_data, filename=cls.file.name, out_dir=dev.get_out_dir() / "raw_files", ) elif cls.command: dev.write_file( data=to_parse, filename=convert_filename(cls.command) + ".txt", out_dir=dev.get_out_dir() / "raw_commands", ) else: raise ValueError(f"'file' and 'command' unset for {cls.__name__}") try: parsed = cls.parse(to_parse) except Exception as ex: log.error(f"{cls.__name__}: Exception parsing {cls.type()} data: {ex}") return False if not parsed: log.warning(f"{cls.__name__}: no parsed data, parse probably failed") return False # Set file extension based on type of parsed data if isinstance(parsed, (dict, list)): file_ext = ".json" else: file_ext = ".txt" # Determine file basename if cls.file: dev.related.files.add(str(cls.file)) f_base = cls.file.stem else: f_base = convert_filename(cls.command) # Save parsed format to file dev.write_file( data=parsed, filename=f_base + file_ext, out_dir=dev.get_out_dir() / "parsed", ) # Save parsed format to extra if cls not in [ VarLogMessagesParser, LsRecursiveParser, ]: if cls.file and str(cls.file) not in dev.extra: dev.extra[str(cls.file)] = parsed elif cls.command and cls.command not in dev.extra: dev.extra[cls.command] = parsed # Process the data try: cls.process(parsed, dev) except Exception as ex: log.error(f"{cls.__name__}: Exception processing {cls.type()} data: {ex}") return False return True
[docs] @classmethod def type(cls) -> str: if cls.file: return "file" elif cls.command: return "command" else: raise ValueError(f"'file' and 'command' unset for {cls.__name__}")
[docs] @classmethod def parse(cls, to_parse: str): """ Parse raw data into a Python data structure, such as a dict or list. """ ...
[docs] @classmethod def process(cls, to_process, dev: DeviceData) -> None: """ Process parsed data into the device data model. """ ...
[docs] class VarLogMessagesParser(NixParserBase): """ Parse messages from ``/var/log/messages``. """ file = PurePosixPath("/var/log/messages") MESSAGE_REGEX = ( r"(?P<timestamp>\w+[ \t]+\d+[ \t]+\d{2}:\d{2}:\d{2})[ \t]+" r"(?P<hostname>\S+)[ \t]+(?P<logger>\S+)\.(?P<level>\S+)[ \t]+" r"(?P<process>[^:]+): (?P<message>.*)" ) # type: str
[docs] @classmethod def parse(cls, to_parse: str) -> list[dict[str, str]]: results = [] for line in to_parse.strip().splitlines(): match = re.match(cls.MESSAGE_REGEX, line.strip(), re.IGNORECASE) if match: msg = match.groupdict() msg["raw_line"] = line results.append(msg) else: log.warning(f"Failed to parse message line: {line}") return results
[docs] @classmethod def process(cls, to_process, dev: DeviceData) -> None: for index, msg in enumerate(to_process): timestamp = utils.parse_date(msg["timestamp"]) # !! NOTE: this relies on DateParser running prior this !! # make year for timestamps relative to the correct year if dev.extra.get("current_time") and timestamp.year > dev.extra["current_time"].year: timestamp = timestamp.replace(year=dev.extra["current_time"].year) msg_lower = msg["message"].lower() # type: str event_category = {"host"} # type: set[str] event_type = set() # type: set[str] event_outcome = "" # type: str # add to event.category if msg["logger"] == "auth": event_category.add("authentication") # event.type if msg["level"] in ["err", "error"] or "error:" in msg_lower: event_type.add("error") if "starting" in msg_lower: event_type.add("start") if "failed password" in msg_lower: event_category.add("authentication") event_type.add("denied") event_type.add("access") event_outcome = "failure" if "accepted password" in msg_lower: event_category.add("authentication") event_type.add("allowed") event_type.add("access") event_outcome = "success" # Create the Event object event = Event( category=event_category, created=timestamp, dataset="/var/log/messages", message=msg["message"], original=msg["raw_line"], outcome=event_outcome, sequence=index, type=event_type, ) # TODO: log.* fields # msg["logger"] => "syslog", "auth", "kernel" # msg["level"] => "notice", "err", "info" # TODO: add file.* metadata, e.g. /var/log/messages # TODO: pull more information out of messages # related.ip # related.ports # TODO: extract PID, e.g. "sshd[7050]" # Add hostname to related.hosts dev.related.hosts.add(msg["hostname"]) if " from " in msg["message"]: match = re.search( r" from (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) ", msg["message"], re.IGNORECASE | re.ASCII, ) if match: for m in match.groups(): if utils.is_ip(m): dev.related.ip.add(m) dev.store("event", event, append=True)
[docs] class ProcCmdlineParser(NixParserBase): """ Parse output of ``/proc/cmdline`` (the kernel's startup command line arguments). Parses returns dict with the arguments as key-value pairs. """ file = PurePosixPath("/proc/cmdline")
[docs] @classmethod def parse(cls, to_parse: str) -> dict[str, bool | str]: results = {} for arg in to_parse.strip().split(): key, _, value = arg.partition("=") # e.g. a single value like "rw" or "quiet" # would result in "rw": True if not value: results[key] = True else: results[key] = value return results
[docs] @classmethod def process(cls, to_process: dict[str, bool | str], dev: DeviceData) -> None: # TODO: do more with this data? dev.extra[str(cls.file)] = to_process
[docs] class ProcCpuinfoParser(NixParserBase): """ Parse output of ``/proc/cpuinfo`` and return dict with the formatted data. """ file = PurePosixPath("/proc/cpuinfo")
[docs] @classmethod def parse(cls, to_parse: str) -> dict[str, list | str]: results = {} proc_cpuinfo = to_parse.replace("\r\n", "\n") # split into by groups of lines for group in proc_cpuinfo.strip().split("\n\n"): group = group.strip() processor = {} for line in group.splitlines(): key, _, value = line.partition(":") key = key.strip().replace(" ", "_").lower() value = value.strip() if group.startswith("processor"): processor[key] = value else: results[key] = value if processor: if "processors" not in results: results["processors"] = [] results["processors"].append(processor) return results
[docs] @classmethod def process(cls, to_process: dict[str, list | str], dev: DeviceData) -> None: cpu_model = "" cpu_full = "" cpu_description = "" if to_process.get("platform"): cpu_full += to_process["platform"] + " " cpu_description += to_process["platform"] + " " elif to_process.get("model"): cpu_full += to_process["model"] + " " cpu_description += to_process["model"] + " " if to_process.get("processors"): proc = to_process["processors"][0] if proc.get("cpu"): cpu_model = proc["cpu"] + " " cpu_full += proc["cpu"] + " " cpu_description += proc["cpu"] + " " elif to_process.get("model"): cpu_model = to_process["model"] + " " if proc.get("clock"): cpu_description += proc["clock"] + " " if proc.get("revision"): cpu_description += f"revision {proc['revision']}" if cpu_model: dev.hardware.cpu.model = cpu_model.strip() if cpu_full: dev.hardware.cpu.full = cpu_full.strip() if cpu_description: dev.hardware.cpu.description = cpu_description.strip()
[docs] class ProcMeminfoParser(NixParserBase): """ Parse output of ``/proc/meminfo`` and return dict with the formatted data, with integer values in bytes. """ file = PurePosixPath("/proc/meminfo")
[docs] @classmethod def parse(cls, to_parse: str) -> dict[str, int]: results = {} for line in to_parse.splitlines(): key, _, value = line.partition(":") key = utils.convert_to_snake_case(key) key = utils.clean_replace(key, "_", "()") key = key.strip("_").replace("__", "_") # Convert size strings like "24M" or "42" into a raw integer # NOTE: while this says "kB", it's actually kibibytes # Therefore, we set binary=True for size parsing value = humanfriendly.parse_size(value, binary=True) results[key] = value return results
[docs] @classmethod def process(cls, to_process: dict[str, int], dev: DeviceData) -> None: if "mem_total" in to_process: dev.hardware.memory_total = to_process["mem_total"] if "mem_free" in to_process: dev.hardware.memory_available = to_process["mem_free"]
[docs] class ProcModulesParser(NixParserBase): """ Parse output of ``/proc/modules`` and return a list of the module names. """ file = PurePosixPath("/proc/modules")
[docs] @classmethod def parse(cls, to_parse: str) -> list[str]: return [line.split(" ")[0] for line in to_parse.splitlines()]
[docs] @classmethod def process(cls, to_process: list[str], dev: DeviceData) -> None: # TODO: do more with /proc/modules data dev.extra[str(cls.file)] = to_process
[docs] class ProcUptimeParser(NixParserBase): """ Parse ``/proc/uptime`` and return the timedelta for how long the system has been up for. Process sets ``dev.uptime``. """ file = PurePosixPath("/proc/uptime")
[docs] @classmethod def parse(cls, to_parse: str) -> datetime.timedelta: uptime = float(to_parse.strip().split(" ")[0]) return datetime.timedelta(seconds=uptime)
[docs] @classmethod def process(cls, to_process: datetime.timedelta, dev: DeviceData) -> None: dev.uptime = to_process
[docs] class ProcNetDevParser(NixParserBase): """ Parse and process ``/proc/net/dev`` """ file = PurePosixPath("/proc/net/dev")
[docs] @classmethod def parse(cls, to_parse: str) -> dict[str, dict[str, int]]: lines = _extract_lines(to_parse) if not lines: return {} # Inspired by https://stackoverflow.com/a/1052628 cols_sects = lines[1].split("|") recv_cols = [f"recv_{c}" for c in cols_sects[1].split()] trans_cols = [f"trans_{c}" for c in cols_sects[2].split()] cols = recv_cols + trans_cols # keyed by interface name results = {} for line in lines[2:]: if line.find(":") < 0: continue iface, raw_data = line.split(":") if_data = {t[0]: int(t[1]) for t in zip(cols, raw_data.split(), strict=False)} results[iface] = if_data return results
[docs] @classmethod def process(cls, to_process: dict[str, dict[str, int]], dev: DeviceData) -> None: # TODO: dev.interface for iface_name in to_process.keys(): iface = dev.retrieve("interface", search={"name": iface_name}) if not iface: iface = Interface( name=iface_name, ) if iface_name == "lo": iface.type = "loopback" elif iface_name.startswith("eth"): iface.type = "ethernet" dev.store("interface", iface, lookup="name")
[docs] class EtcPasswdParser(NixParserBase): """ Parse ``/etc/passwd`` and return the extracted user data. Process adds the users to ``dev.users`` in the device data model. """ file = PurePosixPath("/etc/passwd")
[docs] @classmethod def parse(cls, to_parse: str) -> list[str]: users = [] for line in to_parse.splitlines(): # To read manpage: "man 5 passwd" # /etc/passwd contains one line for each user account, # with seven fields delimited by colons (“:”) sections = line.split(":") if len(sections) != 7: log.warning(f"Bad /etc/passwd line: {line}") continue # login name # optional encrypted password, "x" if in shadow file # numerical user ID # numerical group ID # user name or comment field # user home directory # optional user command interpreter # If this field is empty, it defaults to the value /bin/sh user = { "login_name": sections[0], "password": sections[1] if sections[1] != "x" else "", "user_id": sections[2], "group_id": sections[3], "comment": sections[4], "home_directory": sections[5], "shell": sections[6], } users.append(user) return users
[docs] @classmethod def process(cls, to_process: list[str], dev: DeviceData) -> None: for raw_user in to_process: dev.related.user.add(raw_user["login_name"]) if raw_user["shell"]: dev.related.files.add(raw_user["shell"]) user_obj = User( description=raw_user["comment"].strip().strip(","), id=raw_user["login_name"], name=raw_user["login_name"], uid=str(raw_user["user_id"]), gid=str(raw_user["group_id"]), extra={ "home_directory": raw_user["home_directory"], "shell": raw_user["shell"], }, ) dev.store("users", user_obj, lookup="name")
[docs] class DateParser(NixParserBase): """ Parse output of the ``date`` command. This gets timezone information, as well as baseline for what year it is for the purposes of timestamping logs from sources such as ``/var/log/messages``. """ command = "date"
[docs] @classmethod def parse(cls, to_parse: str) -> datetime.datetime | None: if not to_parse: return None return utils.parse_date(to_parse)
[docs] @classmethod def process(cls, to_process: datetime.datetime, dev: DeviceData) -> None: if to_process.tzinfo: if not dev.geo.timezone: dev.geo.timezone = to_process.tzname() dev.extra["current_time"] = to_process
[docs] class EnvParser(NixParserBase): """ Parse the output of the ``env`` command. Environment variables for current shell session """ command = "env"
[docs] @classmethod def parse(cls, to_parse: str) -> dict[str, str]: result = {} for line in to_parse.splitlines(): if "=" not in line: log.trace(f"Skipping bad env line (missing '='): {line}") continue key, _, value = line.partition("=") result[key] = value return result
[docs] @classmethod def process(cls, to_process: dict[str, str], dev: DeviceData) -> None: # TODO: do more with environment variables and data model dev.extra["env"] = to_process
# TODO: WIP. This currently extracts: # interface name # mtu # options (e.g. "POINTOPOINT,MULTICAST,NOARP") # state # link_type # # from peat.protocols.common import IPV4_RE, MAC_RE_COLON # class IpAddrParser(NixParserBase): # """ # Parse output of ``ip addr`` command. # # Shows all network interfaces. # This is usually seen on modern Linux distributions # that install the ``iproute2`` package by default. # """ # # command = "ip addr" # # @classmethod # def parse(cls, to_parse: str) -> dict[str, dict]: # if not to_parse: # return {} # # interfaces = [s.strip() for s in re.split(r"\d+: (\w+): ", to_parse) if s] # pairs = list(zip(interfaces[0::2], interfaces[1::2])) # # if_pat = ( # r"\<(?P<if_info>[\w,]+)\> " # r"mtu (?P<mtu>\d+) .* " # r"state (?P<state>\w+).*\s+" # r"link/(?P<link_type>\w+) " # ) # # all_ifs = {} # for if_name, if_raw in pairs: # print(if_name) # if_info = re.match(if_pat, if_raw).groupdict() # if_info["mtu"] = int(if_info["mtu"]) # # for line in if_raw.splitlines(): # line = line.strip() # if line.startswith("link/none"): # continue # elif line.startswith("link/ether"): # parts = line.split(" ") # if_info["mac_addr"] = parts[1] # if_info["mac_bcast"] = parts[3] # elif line.startswith("link/"): # parts = line.split(" ") # # TODO: this is not the IP, usually 0.0.0.0 # if_info["ip_addr"] = parts[1] # if_info["ip_bcast"] = parts[3] # elif line.startswith("inet6"): # inet6_pat = r"inet6 ([0-9a-fA-F:]+)/(\d{1,2})" # pass # elif line.startswith("inet"): # inet4_pat = r"inet " + IPV4_RE + r"/(\d{1,2})" # pass # # all_ifs[if_name] = if_info # # return False # # @classmethod # def process(cls, to_process: dict[str, dict], dev: DeviceData) -> None: # pass
[docs] class IfconfigParser(NixParserBase): """ Parse output of ``ifconfig -a`` command. Shows all network interfaces. This is usually seen on older Linux distributions that install the ``net-utils`` package by default, as well as BusyBox systems. """ command = "ifconfig -a"
[docs] @classmethod def parse(cls, to_parse: str) -> dict[str, dict]: if not to_parse: return {} # The interface parsing code below is a heavily adapted version of # @KnightWhoSayNi's ifconfig-parser library (MIT-licensed). # https://github.com/KnightWhoSayNi/ifconfig-parser # TODO: make the regex objects class attributes iface_re = re.compile( r"(?P<name>[a-zA-Z0-9:._-]+)\s+Link (type|encap):(?P<type>\S+\s?\S+\s?\S+)", re.IGNORECASE | re.ASCII, ) mac_re = re.compile( r"(\s+HWaddr\s+\b(?P<mac>[0-9A-Fa-f:?]+))?\s+Queue:(?P<queue>\w+)", re.IGNORECASE | re.ASCII, ) cap_re = re.compile(r"capabilities: (?P<capabilities>[\w ]+)\s", re.IGNORECASE | re.ASCII) ip_re = re.compile( r"\s+inet (?P<ip>(?:[0-9]{1,3}\.){3}[0-9]{1,3})" r"\s+mask (?P<subnet_mask>(?:[0-9]{1,3}\.){3}[0-9]{1,3})" r"(\s+broadcast (?P<broadcast>(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?" r"(\s+peer (?P<peer>(?:[0-9]{1,3}\.){3}[0-9]{1,3}))?", re.IGNORECASE | re.ASCII, ) flags_re = re.compile( r"\W+(?P<flags>(?:\w+\s)+)(?:\s+)?" r"\s+MTU:(?P<mtu>\d+)" r"\s+metric:(?P<metric>[0-9]+)" r"\s+VR:(?P<vr>[0-9]+)\s+ifindex:(?P<ifindex>[0-9]+)", re.IGNORECASE | re.ASCII, ) rx_re = re.compile( r"\s+RX packets:(?P<rx_packets>[0-9]+)" r"\s+mcast:(?P<rx_multicast>[0-9]+)" r"\s+errors:(?P<rx_errors>[0-9]+)" r"\s+dropped:(?P<rx_dropped>[0-9]+)", re.IGNORECASE | re.ASCII, ) tx_re = re.compile( r"TX packets:(?P<tx_packets>[0-9]+)" r"\s+mcast:(?P<tx_multicast>[0-9]+)" r"\s+errors:(?P<tx_errors>[0-9]+)" r"\s+collisions:(?P<collisions>[0-9]+)" r"\s+unsupported proto:(?P<unsupported_protocol>[0-9]+)", re.IGNORECASE | re.ASCII, ) bytes_re = re.compile( r"\s+RX bytes:(?P<rx_bytes>\w+)\s+TX bytes:(?P<tx_bytes>\w+)", re.IGNORECASE | re.ASCII, ) re_vxworks = [iface_re, mac_re, cap_re, ip_re, flags_re, rx_re, tx_re, bytes_re] network_interfaces = re.finditer(iface_re, to_parse) positions = [] while True: try: pos = next(network_interfaces) positions.append(max(pos.start() - 1, 0)) except StopIteration: break if positions: positions.append(len(to_parse)) if not positions: log.warning("ifconfig parsing failed: couldn't find interface positions") return {} all_interfaces = {} for chunk_start, chunk_end in itertools.pairwise(positions): chunk = to_parse[chunk_start:chunk_end] interface = {} for pattern in re_vxworks: match = re.search(pattern, chunk.replace("\t", "\n")) if match: details = match.groupdict() for k, v in details.items(): if isinstance(v, str): interface[k] = v.strip() for key, value in interface.items(): if key in ["capabilities", "flags"]: interface[key] = [x.strip() for x in value.split(" ") if x.strip()] elif key[:2] in ["tx", "rx"] or key in [ "collisions", "unsupported_protocol", ]: try: # Convert size strings like "24M" or "42" into a raw integer interface[key] = humanfriendly.parse_size(value) except Exception: pass elif key in ["mtu", "metric", "vr", "ifindex"]: try: interface[key] = int(value) except Exception: pass if interface: all_interfaces[interface["name"]] = interface if not all_interfaces: log.warning("ifconfig parsing failed: no interfaces found") return all_interfaces
[docs] @classmethod def process(cls, to_process: dict[str, dict], dev: DeviceData) -> None: for name, if_dict in to_process.items(): iface = Interface( name=name, id=str(if_dict.get("ifindex", "")), mtu=if_dict.get("mtu"), ip=if_dict.get("ip", ""), mac=if_dict.get("mac", "").upper(), subnet_mask=if_dict.get("subnet_mask", ""), ) # Interface type if_type = if_dict["type"].lower().replace(" ", "_") if "loopback" in if_type: iface.type = "loopback" else: iface.type = if_type # "fei0" is primary. "fei1" is secondary if name == "fei0": iface.description.description = "Primary interface" elif name == "fei1": iface.description.description = "Secondary interface" # Peer for Point to Point interfaces if if_dict.get("peer"): dev.related.ip.add(if_dict["peer"]) # ifconfig flags reference: # https://docs.oracle.com/cd/E19253-01/816-5166/ifconfig-1m/index.html if "RUNNING" in if_dict["flags"]: iface.connected = True if "UP" in if_dict["flags"]: iface.enabled = True elif "DOWN" in if_dict["flags"]: iface.enabled = False for key, value in if_dict.items(): if value is None: continue # Add various keys to "extra" field if key in [ "queue", "flags", "capabilities", "peer", "metric", "ifindex", "vr", "broadcast", ]: iface.extra[key] = value # Store statistics in a "statistics" sub-dict in "extra" elif key[:2] in ["tx", "rx"] or key in [ "collisions", "unsupported_protocol", ]: if not iface.extra.get("statistics"): iface.extra["statistics"] = defaultdict(dict) if key[:2] == "tx": iface.extra["statistics"]["transmitted"][key[3:]] = value elif key[:2] == "rx": iface.extra["statistics"]["received"][key[3:]] = value else: iface.extra["statistics"][key] = value dev.store("interface", iface, lookup=["name", "ip"])
[docs] class ArpParser(NixParserBase): """ Parse and process output of ``arp -a`` command. ARP table, shows all known network devices. """ command = "arp -a"
[docs] @classmethod def parse(cls, to_parse: str) -> list[str]: return _extract_lines(to_parse)
[docs] @classmethod def process(cls, to_process: list[str], dev: DeviceData) -> None: for line in to_process: parts = [x.strip() for x in line.split(" ") if x.strip()] for part in parts: # "(192.0.2.1)" part = utils.clean_replace(part, "", "()").strip() # IP address if utils.is_ip(part): dev.related.ip.add(part) # MAC address elif part.count(":") == 5: dev.related.mac.add(part.upper())
[docs] class SshdConfigParser(NixParserBase): """ Parse and process ``/etc/ssh/sshd_config``. Cleanup the sshd_config to just lines with configs, excluding empty lines and comments. """ file = PurePosixPath("/etc/ssh/sshd_config")
[docs] @classmethod def parse(cls, to_parse: str) -> list[str]: return [line for line in _extract_lines(to_parse) if not line.startswith("#")]
[docs] @classmethod def process(cls, to_process: list[str], dev: DeviceData) -> None: # noqa: ARG003 # TODO: dev.related.port # TODO: dev.services return None
[docs] class HostnameParser(NixParserBase): """ Set dev.hostname to the output of the ``hostname`` command """ command = "hostname"
[docs] @classmethod def parse(cls, to_parse: str) -> str: return to_parse.strip()
[docs] @classmethod def process(cls, to_process: str, dev: DeviceData) -> None: if to_process: dev.hostname = to_process
[docs] class LsRecursiveParser(NixParserBase): """ Recursive ls of full file system. This assumes BusyBox's ``ls`` output. Other system's output may differ. Command: ``ls -lenAR /etc /boot /var/log /root /sysopt /sbin /pkg /bin /common /opt /lib`` Args: - l: one column output - e: full date and time - n: numeric UIDs and GIDs instead of names - A: include files that start with ``.`` and exclude the ``.`` and ``..`` "files". - R: recurse """ # TODO: do a full recursive in certain circumstances # command = "ls -lenAR /" # /usr command = "ls -lenAR /etc /boot /var/log /root /sysopt /sbin /pkg /bin /common /opt /lib"
[docs] @classmethod def parse(cls, to_parse: str) -> list[dict]: if not to_parse: return [] # NOTE: humanfriendly split_paragraphs assumes "\n\n" # we can get CRLF (\r\n) back, so need to fix that if "\r\n" in to_parse: to_parse = to_parse.replace("\r\n", "\n") results = [] for chunk in split_paragraphs(to_parse): lines = _extract_lines(chunk) # The first line of the chunk is the absolute # directory path, followed by a ":". # Example: "/etc/network:" par_dir = lines[0].rstrip(":") if not par_dir.endswith("/"): par_dir += "/" dir_path = PurePosixPath(par_dir) # TODO: combine with parsing code in peat/protocols/ftp.py # TODO: refer to this: https://pubs.opengroup.org/onlinepubs/9699919799/utilities/ls.html # and incorporate parsing code from the new Firewall module. # The lines following are the items in that directory # # Col 1: File type and permissions: type+user+group+world # Col 2: Number of hard links # Col 3: UID (numeric) # Col 4: GID (numeric) # Col 5: file size in bytes # Col 6-9: modification date and time # Col 10: name # # Standard file example: # "-rw-r--r-- 1 0 0 194 Wed Jan 01 00:00:00 1970 interfaces" for line in lines[1:]: # Exclude failures to read files in /proc # "ls: /proc/1045/exe: cannot read link: No such file or directory" if "cannot read link" in line or "No such file or directory" in line: continue parts = line.split() # File type raw_type = parts[0][0] # Add +1 offset for device files offset = 0 if raw_type in ["b", "c"]: offset = 1 file_info = { "type": raw_type, "perms": parts[0][1:], "uid": parts[2], "gid": parts[3], "mtime": utils.parse_date(" ".join(parts[offset + 5 : offset + 10])), "name": parts[offset + 10], "parent": dir_path, } # Device files (in /dev) don't have a size. # Instead, they have two numbers. These represent # the major and minor device number for that device. # The major number is the driver associated with the device. # The minor number is only used by the driver specified by the major number. # # Example: # "crw------- 1 0 0 5, 1 Thu Jan 1 00:00:14 1970 console" # noqa: E501 # # b: block device # c: character device if raw_type in ["b", "c"]: file_info["device_driver_major"] = int(parts[4].strip(",")) file_info["device_driver_minor"] = int(parts[5]) else: file_info["size"] = int(parts[4]) # If symlink, save what it points to # NOTE: if ls fails to read the target, such as with # /proc/*/exe, then there won't be a target specified # (and thus no "->" string). if raw_type == "l" and "->" in line: raw_target = parts[offset + 12] # Direct: addgroup -> busybox if "/" not in raw_target: file_info["symlink_target"] = PurePosixPath(dir_path, raw_target) # Relative: core -> ../proc/kcore elif raw_target.startswith(".."): # Convert "/dev/../proc/kcore" -> "/proc/kcore" norm = os.path.normpath(par_dir + raw_target) file_info["symlink_target"] = PurePosixPath(norm) # Absolute: exe -> /usr/sbin/webserver elif raw_target.startswith("/"): file_info["symlink_target"] = PurePosixPath(raw_target) else: log.warning(f"Weird symlink: {raw_target}") file_info["symlink_target"] = None results.append(file_info) return results
[docs] @classmethod def process(cls, to_process: list[dict], dev: DeviceData) -> None: for f_data in to_process: # File type # -: regular file # d: directory # l: symbolic link # p: named pipe # c: character device # b: block device # s: socket if f_data["type"] == "d": f_type = "dir" elif f_data["type"] == "l": f_type = "symlink" else: f_type = "file" # Create path object from parent + name path = PurePosixPath(f_data["parent"], f_data["name"]) # Add absolute path to host.related.files if f_type != "dir": dev.related.files.add(str(path)) # TODO: add device_driver_major and device_driver_minor to file.extra file_obj = File( device=dev.get_comm_id(), directory=str(f_data["parent"]), extension=path.suffix if f_type == "file" else "", gid=int(f_data["gid"]), # typecast to make sure it's an int peat_module=dev._module.__name__ if dev._module else "", path=path, mode=utils.file_perms_to_octal(f_data["perms"]), mtime=f_data["mtime"], name=f_data["name"], type=f_type, uid=int(f_data["uid"]), # typecast to make sure it's an int ) if file_obj.type == "file" and "size" in f_data: file_obj.size = f_data["size"] if file_obj.type == "symlink" and f_data.get("symlink_target"): file_obj.target_path = f_data["symlink_target"] if file_obj.uid == "0": file_obj.owner = "root" if file_obj.gid == "0": file_obj.group = "root" dev.files.append(file_obj) # add to data model
[docs] class NetstatSocketsVxWorksParser(NixParserBase): """ Parse output of "netstat -anP" command on VxWorks. -a: more sockets -n: numeric names instead of hostnames resolved -P: show the TID (task ID) that owns the socket """ command = "netstat -anP"
[docs] @classmethod def parse(cls, to_parse: str) -> list[str]: lines = _extract_lines(to_parse) sockets = [] for line in lines: if line.startswith("INET") or "Recv-Q" in line: continue parts = line.split() if len(parts) < 6 or len(parts) > 7: log.warning(f"Bad netstat line with length {len(line)}: '{line}'") continue skt = { "protocol": parts[0], # Prot "recv_q": int(parts[1]), # Recv-Q "send_q": int(parts[2]), # Send-Q "local_address": parts[3].rpartition(".")[0], # Local Address "local_port": parts[3].rpartition(".")[2], "foreign_address": parts[4].rpartition(".")[0], # Foreign Address "foreign_port": parts[4].rpartition(".")[2], # UDP and other non-TCP sockets don't have "STATE" "state": parts[5] if parts[0] == "TCP" else "", # State # The TID is always the last part "tid": parts[-1], # TID } sockets.append(skt) return sockets
[docs] @classmethod def process(cls, to_process: list[dict], dev: DeviceData) -> None: for skt in to_process: # Use local_address to resolve the interface # if interface doesn't exist, create it # TODO: add "connections" to data model # protocol "115" seems to be some sort of default # TCP and UDP # TODO: ipv6? if ( skt["foreign_address"] == "0.0.0.0" and skt["foreign_port"] == "*" and skt["local_port"] != "*" ): transport = "" if not skt["protocol"].isdigit(): transport = skt["protocol"].lower() svc = Service( port=int(skt["local_port"]), transport=transport, status="open", listen_address=skt["local_address"], process_pid=int(skt["tid"], 16), # as integer extra={ "receive_queue": skt["recv_q"], "send_queue": skt["send_q"], "task_id": skt["tid"], # TID as hex }, ) # TODO: associate with all interfaces if skt["local_address"] == "0.0.0.0": # listening on all interfaces pass elif skt["local_address"] == "127.0.0.1": # listening on localhost dev.store( "service", svc, lookup="port", interface_lookup={"ip": "127.0.0.1"}, ) else: log.warning(f"netstat: unknown local_address '{skt['local_address']}'") # TODO: dev.store service # Add IPs and ports to related.ip and related.ports for key in ["local", "foreign"]: addr = skt[f"{key}_address"] if addr != "0.0.0.0" and utils.is_ip(addr): dev.related.ip.add(addr) port = skt[f"{key}_port"] try: dev.related.ports.add(int(port)) except ValueError: pass
def _extract_lines(data: str, exclude: str = "") -> list[str]: """ Return list of lines that aren't empty and don't have excluded string. """ if not data: return [] lines = [] for line in data.strip().splitlines(): line = line.strip() if not line or (exclude and exclude in line): continue lines.append(line) return lines
[docs] def convert_filename(to_convert: str) -> str: """ Take command string or file path and make it into something that can be saved to disk. """ for pat in [" --", " -", " ", "/", ";", "{", "}"]: to_convert = to_convert.replace(pat, "_").replace("__", "_") to_convert = to_convert.replace("__", "_") to_convert = to_convert.strip().strip("_") return to_convert