Source code for peat.api.push_api

from pathlib import Path
from typing import Literal, get_args

from peat import PeatError, consts, datastore, log, module_api, utils
from peat.protocols import hosts_to_ips, sort_ips

from .scan_api import scan


[docs] def push( targets: list[str], comm_type: consts.AllowedCommTypes, device_types: list[str], input_source: Path | str, push_type: Literal["config", "firmware"], skip_scan: bool = False, ) -> bool: """ Push (upload) configuration or firmware to devices. .. note:: By default all targets are scanned and verified, and only the devices that are successfully verified are pushed to. This enables this function to be used without requiring a scan to be done and ensuring pushes are not performed to invalid devices. This behavior can be disabled by setting ``PUSH_SKIP_SCAN`` to True or passing ``--push-skip-scan`` on the CLI. Args: targets: Devices to push to, such as network hosts or serial ports comm_type: Method of communication for the push. Allowed values: - ``unicast_ip`` - ``broadcast_ip`` - ``serial`` device_types: Names of device modules or module aliases to use. If scanning is disabled this should be a single device type. input_source: Path of the file to push, as a string push_type: Type of push being performed. Valid push types are "config" and "firmware". skip_scan: If device verification (scanning) should be skipped. NOTE: this currently only applies to unicast_ip devices. Returns: If the push was successful Raises: PeatError: If the push failed due to an issue with configuration or arguments, such as invalid device types or push type. """ source = utils.check_file(input_source) if source is None: raise PeatError(f"bad input source '{source}'") if push_type not in get_args(consts.PushType): raise PeatError(f"Invalid '{push_type}' (supported types: {get_args(consts.PushType)})") # TODO: make input a list of # (device_id, device_comm_type, device_peat_module) # Skip scan on push (assume all devices online) if skip_scan: log.warning(f"Skipping verification scan and pushing to {len(targets)} targets") if len(device_types) > 1: raise PeatError("more than 1 device type specified when PUSH_SKIP_SCAN is enabled") modules = module_api.lookup_types(device_types, filter_attr="ip_methods") ips = sort_ips(hosts_to_ips(targets)) if not ips: log.error("Push failed: no IP targets were valid") return False devices = [] # type: list[DeviceData] for ip in ips: dev = datastore.get(ip) dev._is_active = True dev._is_verified = True dev._module = modules[0] devices.append(dev) else: scan_summary = scan(targets, comm_type, device_types) # type: ignore if not scan_summary or not scan_summary["hosts_verified"]: log.error("Push failed: no devices were found in scan") return False devices = datastore.verified # type: list[DeviceData] log.info(f"Beginning push to {len(devices)} devices") for device in devices: try: push_result = device._module.push( dev=device, to_push=source, push_type=push_type, ) except Exception: log.exception(f"Failed to push to {dev.get_id()}") return False if not push_result: return False log.info(f"Finished pushing to {len(devices)} devices") return True
__all__ = ["push"]