Source code for peat.cli_main

import code
import json
import pdb  # noqa: T100
import sys
import timeit
from pathlib import Path
from pprint import pformat
from typing import Any, get_args

from humanfriendly.text import pluralize

from peat import (
    PeatError,
    cli_args,
    config,
    consts,
    datastore,
    decrypt,
    encrypt,
    exit_handler,
    heat_main,
    initialize_peat,
    log,
    module_api,
    parse,
    pillage,
    pull,
    push,
    scan,
    state,
    utils,
)
from peat.config_builder import launch_builder
from peat.heat import HEAT_EXTRACTORS

TargetsType = tuple[list[str], consts.AllowedCommTypes, list[str]]


[docs] def run_peat(args: dict[str, Any], start_time: float) -> None: """ CLI main (note: the entrypoint that calls this is in ``__main__.py``). """ try: initialize_peat(conf=args, entrypoint="CLI") except Exception as ex: log.error(f"Failed to initialize peat: {ex}") sys.exit(1) log.trace4(f"** Raw CLI arguments **\n{pformat(args, indent=4)}\n") # Print imported modules and aliases, then exit if args.get("list_modules"): print(json.dumps(module_api.names), flush=True) # noqa: T201 sys.exit(0) if args.get("list_aliases"): print( # noqa: T201 json.dumps(list(module_api.module_aliases.keys())), flush=True ) sys.exit(0) if args.get("list_alias_mappings"): print(json.dumps(module_api.alias_mappings), flush=True) # noqa: T201 sys.exit(0) if args.get("list_all"): print( # noqa: T201 f"** Modules **\n" f"{pformat(module_api.names, compact=True)}" f"\n\n** Aliases **\n" f"{pformat(list(module_api.module_aliases.keys()), compact=True)}" f"\n\n** Alias Mappings **\n" f"{pformat(module_api.alias_mappings, compact=True)}", flush=True, ) sys.exit(0) # Print examples for the current sub-command, e.g. "scan" if args.get("examples"): print(cli_args.ALL_EXAMPLES[args["func"]].strip(), flush=True) # noqa: T201 sys.exit(0) # Print examples for all commands, then exit if args.get("all_examples"): all_examples = "" for cmd, examples in cli_args.ALL_EXAMPLES.items(): all_examples += f"** {cmd} examples **\n{examples}\n\n" print(all_examples.strip(), flush=True) # noqa: T201 sys.exit(0) if args.get("list_heat_protocols"): print( # noqa: T201 ", ".join(plugin.__name__ for plugin in HEAT_EXTRACTORS), flush=True ) sys.exit(0) # Run configuration builder Textual interface if args["func"] == "config-builder": launch_builder() sys.exit(0) # Drop into pdb (Python debugger) if args["pdb"]: pdb.set_trace() # noqa: T100 # Drop into interactive interpreter (the "REPL") if args["repl"]: code.interact(local=globals()) # Execute CLI commands # NOTE: in the future, this will execute continuous # monitoring and/or the standard "one-shot" CLI commands. if not oneshot_main(args): state.error = True # Dump config to Elasticsearch if state.elastic and config.ELASTIC_SAVE_CONFIG: log.info( f"Saving configuration to {state.elastic.type} " f"(index basename: {config.ELASTIC_CONFIG_INDEX})" ) raw_config = config.export() # BUGFIX: remove raw options from PEAT config pushed to Elasticsearch # The dynamic mappings generated for these fields can vary and cause pushes to fail for opt in ["DEVICE_OPTIONS", "HOSTS"]: if raw_config.get(opt): del raw_config[opt] if not state.elastic.push(config.ELASTIC_CONFIG_INDEX, raw_config): log.warning(f"Failed to save configuration to {state.elastic.type}") # Dump state to Elasticsearch if state.elastic and config.ELASTIC_SAVE_STATE: log.info( f"Saving state to {state.elastic.type} (index basename: {config.ELASTIC_STATE_INDEX})" ) if not state.elastic.push(config.ELASTIC_STATE_INDEX, state.export()): log.warning(f"Failed to save state to {state.elastic.type}") # Record end time and duration in the log duration = timeit.default_timer() - start_time log.info(f"Finished run in {utils.fmt_duration(duration)} at {utils.utc_now()} UTC") # Cleanup all empty directories on exit if config.OUT_DIR and config.RUN_DIR.exists(): utils.clean_empty_dirs(config.RUN_DIR) elif config.OUT_DIR and config.OUT_DIR.exists(): utils.clean_empty_dirs(config.OUT_DIR) # Fix ownership of peat_results to be actual user instead of "root". # Only gets executed if running as root on a POSIX system. # exit_handler.register() here to run on exit after other atexit # handlers run (e.g. config/state dumps). if consts.POSIX and state.superuser_privs and config.OUT_DIR and config.RUN_DIR.exists(): exit_handler.register(utils.fix_file_owners, "FILE", args=(config.RUN_DIR,)) # Write the README.md file to OUT_DIR (e.g., ./peat_results/README.md) if config.OUT_DIR and config.OUT_DIR.exists(): write_readme() # Exit with exit code 1 if failure, 0 if successful if state.error: log.warning("PEAT run failed! See logs for details.") sys.exit(1) else: log.debug("PEAT run finished successfully (no major errors)") sys.exit(0)
[docs] def oneshot_main(args: dict[str, Any]) -> bool: """ Main logic when running a regular PEAT command, e.g. ``peat scan``. This is distinct from the other current and future capabilities, such as monitoring or the PEAT HTTP server. "oneshot" refers to the "run and done" (in "one shot") and non-persistent nature of the traditional PEAT CLI commands. """ if args["func"] == "heat": return heat_main() if args["func"] == "encrypt": result = encrypt(args["filepath"], args["user-password"]) if result: log.info("Done encrypting file, exiting...") sys.exit(0) else: log.critical("Error encountered while encrypting file, exiting...") sys.exit(1) if args["func"] == "decrypt": result = decrypt( config_path=args["filepath"], output_path=args["write-path"], user_password=args["user-password"], ) if result: log.info("Done decrypting file, exiting...") sys.exit(0) else: log.critical("Error encountered while decrypting file, exiting...") sys.exit(1) targets = [] # type: list[str] device_types = set() # type: set[str] # Include any imported third-party modules device_types.update(module_api.runtime_imports) # Populate the list of devices to use (will be resolved to PEAT modules) if args["func"] in ["scan", "pull", "push"]: try: targets, comm_type, module_names = get_targets(args) except PeatError as err: log.critical(err) return False state.comm_type = comm_type # set the global value device_types.update(module_names) log.info( f"Running {args['func']} of {pluralize(len(targets), 'target')} using " f"{pluralize(len(device_types), 'module')} (comm_type: {comm_type})" ) log.debug(f"{args['func']} targets: {targets}") log.debug(f"{args['func']} modules: {list(device_types)}") elif args["func"] == "parse": if args["device_types"] is None and not device_types: device_types.add("all") elif args["device_types"] is not None: device_types.update(args["device_types"]) # Ensure inputs are deterministic (consistent order every run) sorted_device_types: list[str] = sorted(device_types) if config.DRY_RUN: log.warning("Dry run enabled, skipping calling command functions") return True # Call the appropriate PEAT function for the command specified try: if args["func"] == "parse": parse_results = parse(args["input_source"], sorted_device_types) if not parse_results: return False if config.PRINT_RESULTS: print( # noqa: T201 json.dumps(consts.convert(parse_results), indent=4), flush=True ) return True elif args["func"] == "pull": pull_results = pull(targets, comm_type, sorted_device_types) if not pull_results: return False success = export_device_data(args) if config.PRINT_RESULTS: print_results = consts.convert(pull_results.get("pull_results", {})) print(json.dumps(print_results, indent=4), flush=True) # noqa: T201 return success elif args["func"] == "scan": scan_summary = scan(targets, comm_type, sorted_device_types) if not scan_summary: return False success = export_device_data(args) if config.PRINT_RESULTS: print(json.dumps(scan_summary, indent=4), flush=True) # noqa: T201 return success elif args["func"] == "push": if not push( targets, comm_type, sorted_device_types, args["input_source"], args["push_type"], skip_scan=config.PUSH_SKIP_SCAN, ): return False return export_device_data(args) elif args["func"] == "pillage": return pillage(args["pillage_source"]) else: log.critical(f"Unknown func: {args['func']}") except PeatError as ex: log.error(f"{args['func']} failed: {ex}") except Exception: log.exception(f"{args['func']} failed due to unhandled exception") return False
[docs] def export_device_data(args: dict[str, Any]) -> bool: """ Export data from all devices in the datastore to files and/or Elasticsearch. """ # Combine any duplicate devices before exporting datastore.deduplicate(prune_inactive=args["func"] in ["scan", "pull", "push"]) devices = [d for d in datastore.objects if d._is_verified or (d._is_active and d._module)] success = True if not devices: log.warning("No device results, skipping export") return True if config.DEVICE_DIR: log.info(f"Exporting data from {pluralize(len(devices), 'host')} to files...") for dev in devices: if not dev.export_to_files(overwrite_existing=True): success = False # pull and parse already export if state.elastic and args["func"] not in ["pull", "parse"]: log.info( f"Exporting data from {pluralize(len(devices), 'host')} to {state.elastic.type}..." ) for dev in devices: try: if not dev.export_to_elastic(): success = False except Exception: log.exception( f"Failed to export data to {state.elastic.type} for device '{dev.get_id()}'" ) success = False return success
[docs] def get_targets(args: dict[str, Any]) -> TargetsType: """ Collect targets and module names from a file or CLI argument. """ # Read from JSON host file (not to be confused with the YAML config) if args.get("host_file"): file_data = read_host_file(args["host_file"]) if file_data is None: raise PeatError("Bad host file") try: targets, comm_type, module_names = parse_scan_summary(file_data) except Exception as ex: raise PeatError(f"Failed to parse host file (scan summary) : {ex}") from ex # Read from CLI args else: module_set = set(args["device_types"]) # type: set[str] id_key = "ip" if args.get("host_list"): comm_type = "unicast_ip" targets = args["host_list"] # type: list[str] elif args.get("broadcast_list"): comm_type = "broadcast_ip" targets = args["broadcast_list"] # type: list[str] elif args.get("port_list"): comm_type = "serial" id_key = "serial_port" targets = args["port_list"] # type: list[str] else: raise PeatError("Bad target arguments") if config.DEBUG >= 2: log.debug( f"Raw targets before doing lookup of hosts in YAML config" f"\ncomm_type: {comm_type}\ntargets: {targets}" f"\nmodule_set: {module_set}" ) # Use the hosts in the YAML config to populate the targets list if len(targets) == 1 and targets[0] == "all": log.info( "Attempting to use ALL of the hosts in the YAML config as targets, " "since 'all' was specified as the target" ) if not config.HOSTS: raise PeatError("No hosts in YAML config to use with the 'all' target") if comm_type not in ["unicast_ip", "serial"]: raise PeatError("The 'all' target only works with IP or serial hosts") targets = [] for host in config.HOSTS: if not host.get("identifiers"): log.warning( f"For 'all' target, skipping host with missing 'identifiers' field: {host}" ) continue if not host["identifiers"].get(id_key): continue # Get 'ip' or 'serial_port' field targets.append(host["identifiers"][id_key]) if host.get("peat_module"): module_set.add(host["peat_module"]) # Allow labels from hosts in a YAML config to be used as targets # Case-insensitive matching of labels, however full string must match elif config.HOSTS and comm_type in ["unicast_ip", "serial"]: # Build lookup table mapping host labels to identifiers # e.g. {"host1": "192.0.2.2"} lookup_id = {} # type: dict[str, str] lookup_mod = {} # type: dict[str, str] for host in config.HOSTS: # Skip hosts without a label since this is for label lookups only if not host.get("label"): continue # Get 'ip' or 'serial_port' field and add to lookup table if host.get("identifiers", {}).get(id_key): lookup_id[host["label"].lower()] = host["identifiers"][id_key] # Add the host's PEAT module to lookup table of modules if host.get("peat_module"): lookup_mod[host["label"].lower()] = host["peat_module"] # Replace any targets that match the label of a # host in the YAML config with that host's identifier. targets = [lookup_id.get(target.lower(), target) for target in targets] # Add peat modules from hosts in YAML to the set of modules to use module_set.update(lookup_mod[t] for t in targets if t in lookup_mod) module_names = list(module_set) # type: list[str] module_names = module_api.lookup_names(module_names) return targets, comm_type, module_names
[docs] def parse_scan_summary(summary: dict[str, Any]) -> TargetsType: """ Extract targets, communication method, and PEAT modules from a scan summary. """ if not summary: raise PeatError("Empty scan summary passed to parse_scan_summary()") module_names = summary.get("scan_modules", []) # Minor hack to make results from a sweep scan usable in a future scan if "scan_sweep" in module_names: config.SCAN_SWEEP = True module_names.remove("scan_sweep") comm_type = summary.get("scan_type", "") if not comm_type: raise PeatError("No 'scan_type' variable found in host file") if comm_type not in get_args(consts.AllowedCommTypes): raise PeatError( f"Unknown scan_type value '{comm_type}' in host file, expected " f"one of {get_args(consts.AllowedCommTypes)}" ) if comm_type == "broadcast_ip" and not summary.get("hosts_verified"): targets = summary["scan_targets"] elif summary.get("hosts_verified"): # If there were results from a broadcast IP scan, then use unicast IP # to the verified devices, since we now know their addresses. if comm_type == "broadcast_ip": comm_type = "unicast_ip" module_names = set() # Use a set to prevent duplicates targets = [] # Note: "hosts_verified" is a list of dict for dev in summary["hosts_verified"]: if dev.get("peat_module"): module_names.add(dev["peat_module"]) for comm_id in ["ip", "serial_port", "mac", "hostname"]: if dev.get(comm_id): targets.append(dev[comm_id]) break module_names = list(module_names) elif summary.get("hosts_online"): targets = summary["hosts_online"] else: targets = summary.get("scan_targets", []) return targets, comm_type, module_names
[docs] def read_host_file(host_file: Path | str) -> dict[str, Any] | None: """ Parse a scan summary from a file or STDIN into a scan summary dict. """ file = utils.check_file(host_file, ext=".json") if file is None: log.critical(f"Failed to parse host file '{str(host_file)}'") return None elif str(file) == "-": log.info("Parsing device information from standard input") return json.loads(str(sys.stdin.read())) else: file = Path(file) log.info(f"Parsing device information from file {file.name}") if file.suffix == ".json": with file.open(encoding="utf-8") as h_file: return json.load(h_file) else: log.error( f"Invalid file extension {file.suffix} for host " f"file {file.name}. Must be .json for scan results." ) return None
[docs] def write_readme() -> bool: """ Generate a README describing the output from PEAT. This file gets written in ``./peat_results/`` (or whatever is configured for ``OUT_DIR``). Returns: If the file was written successfully (or if the file already exists) """ readme_text = """ Process Extraction and Analysis Tool (PEAT). Refer to the PEAT documentation for details on usage and any other information. The documentation should've been provided to you via other channels. If you don't have access to the documentation, please reach out to your point of contact from which you acquired this release or any of the contacts listed below. If you have questions, feedback, find a bug, or have suggestions for improvements, please get in touch! PEAT team: peat@sandia.gov # Output files and folders ## PEAT results directory (OUT_DIR) By default, all PEAT runs are saved into `./peat_results/`, which is a directory in the same directory you were in when running PEAT. ## Description of folders in output - `devices/` : All data collected from OT devices and/or parsed out of files. - `elastic_data/` : Copies of documents pushed to Elasticsearch. These can be used to rebuild the Elasticsearch data if you only have the files or don't have a Elasticsearch server available when running PEAT. - `mappings/` : Elasticsearch type mappings for the PEAT indices - `heat_artifacts/` : Output from HEAT ("peat heat <args>") - `logs/` : PEAT's log files, including the main log file, JSON-formatted log files, and protocol- and module-specific log files (e.g. Telnet logs, ENIP logs). - `peat_metadata/` : JSON and YAML-formatted dumps of PEAT's configuration and internal state. - `summaries/` : Summary results of a command, e.g. scan-summary, pull-summary, or parse-summary. These include metadata about the operation (e.g., how many files were parsed), as well as a combined set of device summaries (most of the data, but some fields are excluded, like events, memory, blobs, etc.). To view the complete results for devices, look in the "devices/" directory. - `temp/` : Temporary files, used by PEAT during a run to put files temporarily before being moved elsewhere. ## Run directory (RUN_DIR) Every time PEAT is run, a new sub-directory of `./peat_results/` is created. This is the "run dir" or `RUN_DIR`. The name of this directory is auto-generated, with the following format: `<command>_<config-name>_<timestamp>_<run-id>` - `<command>` : PEAT command, e.g. "scan", "pull", "parse", etc. - `<config-name>` : name of YAML config file, set in `metadata: name: "name"`. If no config name is specified, then the string "default-config" is used. - `<timestamp>` : start time of the PEAT run, e.g. `2022-06-15_13-08-59`. - `<run-id>` : Run ID, aka `agent.id`, e.g. `165532013980`. Run dir examples: - pull_sceptre-test-config_2022-06-17_165532013980 - scan_default-config_2022-09-27_165532013980 ## Output file structure NOTE: the file structure below will differ if any of the `_DIR` variables were configured, e.g. `OUT_DIR`, `ELASTIC_DIR` or `LOG_DIR`. `...` represents "miscellaneous files". ``` ./peat_results/ README.md <command>_<config-name>_<timestamp>_<run_id>/ devices/ <device-id>/ device-data-summary.json device-data-full.json ... elastic_data/ mappings/ ... ... heat_artifacts/ ... logs/ enip/ ... elasticsearch.log debug-info.txt json-log.jsonl peat.log telnet.log ... peat_metadata/ peat_configuration.yaml peat_configuration.json peat_state.json summaries/ scan-summary.json pull-summary.json parse-summary.json temp/ ... ``` """.strip() # noqa: E501 readme_path = config.OUT_DIR / "README.md" if not readme_path.is_file(): log.debug(f"Writing README to {readme_path.as_posix()}") return utils.write_file(readme_text, readme_path) return True