Source code for peat.api.pull_api

import timeit
from operator import itemgetter

from peat import __version__, config, consts, datastore, log, module_api, state, utils

from .scan_api import scan


[docs] def pull( targets: list[str], comm_type: consts.AllowedCommTypes, device_types: list[str] ) -> dict[str, dict | list | str | float | int] | None: """Pull from devices. Args: targets: Devices to pull from, such as network hosts or serial ports comm_type: Method of communication for the pull. Allowed values: - ``unicast_ip`` - ``broadcast_ip`` - ``serial`` device_types: Names of device modules or module aliases to use Returns: :ref:`pull-summary` as a :class:`dict`, or :obj:`None` if an error occurred """ # TODO: option to skip scan and manually specify devices to pull scan_summary = scan(targets, comm_type, device_types) if not scan_summary or not scan_summary["hosts_verified"]: log.error("Pull failed: no devices were found") state.error = True return None # * Combine any duplicate devices * datastore.deduplicate() # TODO: hack. Make this list the user input if scan is skipped. devices = datastore.verified # * Pull from all devices specified * log.info(f"Beginning pull for {len(devices)} devices") start_time = timeit.default_timer() pull_results = [] for device in devices: try: successful = device._module.pull(device) if not successful: log.error(f"Pull failed from {device.get_id()}") state.error = True continue device.purge_duplicates(force=True) exported_data = device.elastic() pull_results.append(exported_data) if state.elastic: device.export_to_elastic() except Exception: log.exception(f"Failed to pull from {device.get_id()}") state.error = True pull_duration = timeit.default_timer() - start_time log.info( f"Finished pulling from {len(devices)} devices in {utils.fmt_duration(pull_duration)}" ) # * Sort pull results by device ID for consistency * pull_results.sort(key=itemgetter("id")) # * Generate pull summary * pull_summary = { "peat_version": __version__, "peat_run_id": str(consts.RUN_ID), "pull_duration": pull_duration, "pull_modules": module_api.lookup_names(device_types), "pull_targets": scan_summary["scan_targets"], "pull_original_targets": targets, "pull_devices": [dev.get_id() for dev in devices], "pull_comm_type": comm_type, "num_pull_results": len(pull_results), "pull_results": pull_results, } # * Save pull results to a file * utils.save_results_summary(pull_summary, "pull-summary") # * Push pull results summary to Elasticsearch * if state.elastic: log.info( f"Pushing pull result summary to Elasticsearch " f"(index basename: {config.ELASTIC_PULL_INDEX})" ) if not state.elastic.push(config.ELASTIC_PULL_INDEX, pull_summary): log.error("Failed to push pull result summary to Elasticsearch") return pull_summary
__all__ = ["pull"]