Source code for peat.device

import io
import shutil
from pathlib import Path
from typing import Any, get_args

from peat import (
    DeviceData,
    DeviceError,
    IPMethod,
    SerialMethod,
    config,
    consts,
    datastore,
    log,
    utils,
)


[docs] class DeviceModule: """ Base class for all PEAT device modules. The methods of this class represent the core PEAT API, and should be implemented by all devices that inherit from it. Sub-classes may add their own device-specific methods and data structures as needed. """ # This is used to dynamically set the "log" attribute on subclasses # to add the sub-class's name as metadata for the logger. # https://docs.python.org/3/reference/datamodel.html#object.__init_subclass__ def __init_subclass__(cls, *args, **kwargs): super().__init_subclass__(*args, **kwargs) cls.log = log.bind(classname=cls.__name__, peat_module=cls.__name__) ip_methods: list[IPMethod] = [] """ Methods for identifying devices via IP or Ethernet. """ serial_methods: list[SerialMethod] = [] """ Methods for identifying devices via a serial connection. """ device_type: str = "" """ Type of device, e.g "PLC", "Relay", "RTU", "RTAC", etc. Elasticsearch field: ``host.type``. """ vendor_id: str = "" """ Short-form vendor name. Elasticsearch field: ``host.description.vendor.id``. """ vendor_name: str = "" """ Long-form vendor name. Elasticsearch field: ``host.description.vendor.name``. """ brand: str = "" """ Device brand. Elasticsearch field: ``host.description.brand``. """ model: str = "" """ Device's default model (if not known). Elasticsearch field: ``host.description.model``. """ supported_models: list[str] = [] """ Device models this module supports or is known to work with. """ filename_patterns: list[str] = [] """ Patterns of files the module is capable of parsing, if any. Patterns are a literal name (``*SET_ALL.TXT``) or a Unix shell glob (``*.rdb``), are case-insensitive, and must start with a wildcard character (``*``). Globs can be anything accepted by :mod:`glob`. """ can_parse_dir: bool = False """ If the module will accept a directory as the source path for parsing. If this is True, a Path like ``Path("./some_files/")`` would be a valid target for parsing. Any handling of files in the directory will have to be handled by the module, not PEAT. """ module_aliases: list[str] = [] """ Alternative names for looking up the module, e.g. for the ``-d`` :term:`CLI` option. Aliases that can be used to refer to the device via the PEAT's Module API (:mod:`peat.module_manager`). """ annotate_fields: dict[str, Any] = {} """ Fields that will be annotated (populated) by default for most operations, such as scan pull, parse, etc. Examples include known OS versions or hardware architecture. These fields will populated ONLY IF they are already unset on the device being annotated. Format is the path to the field to populate, e.g. ``os.name``, ``os.vendor.id``, etc. """ default_options: dict[str, Any] = {} """ Define module-specific options and/or override global defaults, such as default ports for protocols or default credentials. """
[docs] @classmethod def pull(cls, dev: DeviceData) -> bool: """ Pull artifacts from the device, such as logic, configuration, or firmware. This wraps and calls ``_pull()``. PEAT modules implementing the pull interface should implement ``_pull()``, instead of this method. Args: dev: existing :class:`~peat.data.models.DeviceData` object representing the device to pull from. Returns: If the pull was successful, as a bool Raises: DeviceError: If a critical error occurred """ if not cls.method_implemented("_pull"): raise DeviceError(f"_pull() is not implemented by {cls.__name__}") if not isinstance(dev, DeviceData): raise DeviceError(f"dev has type '{type(dev)}', expected DeviceData") cls.log.info(f"Pulling from {dev.get_comm_id()}") result = cls._pull(dev) if not result: cls.log.debug(f"_pull() failed for {dev.get_comm_id()}") return False cls.log.info(f"Finished pulling from {dev.get_comm_id()}") cls.update_dev(dev) dev.purge_duplicates(force=True) return True
[docs] @classmethod def _pull(cls, dev: DeviceData) -> bool: """ Implemented by modules. Subclass :class:`~peat.device.DeviceModule` and override this method. """ pass
[docs] @classmethod def push( cls, dev: DeviceData, to_push: str | bytes | Path, push_type: consts.PushType, ) -> bool: """ Upload (push) configuration or firmware to a device. This wraps and calls ``_push()``. PEAT modules implementing the push interface should implement ``_push()``, instead of this method. Args: dev: existing :class:`~peat.data.models.DeviceData` object representing the device to push to. to_push: the information to push, either as a Path object pointing to a file or directory with config files to upload, or a raw string or bytes of file to upload. push_type: What information is being pushed, either 'config' or 'firmware'. This comes from the ``-t`` command line argument. Returns: If the push was successful, as a bool Raises: DeviceError: If a critical error occurred """ if not cls.method_implemented("_push"): raise DeviceError(f"_push() is not implemented by {cls.__name__}") if not isinstance(dev, DeviceData): raise DeviceError(f"dev has type '{type(dev)}', expected DeviceData") if push_type not in get_args(consts.PushType): raise DeviceError( f"Invalid '{push_type}' (supported types: {get_args(consts.PushType)})" ) # if Path, ensure path exists if isinstance(to_push, Path): file = utils.check_file(to_push) if not file: cls.log.error(f"Push failed: '{to_push}' doesn't exist") return False to_push = file cls.log.debug(f"Loading push data from file '{file}'") cls.log.info(f"Pushing {push_type} to {dev.get_id()}") result = cls._push(dev, to_push, push_type) if not result: cls.log.error(f"{push_type.capitalize()} push to {dev.get_id()} failed") return False cls.log.info(f"{push_type.capitalize()} push to {dev.get_id()} was successful") return True
[docs] @classmethod def _push( cls, dev: DeviceData, to_push: str | bytes | Path, push_type: consts.PushType, ) -> bool: """ Implemented by modules. Subclass :class:`~peat.device.DeviceModule` and override this method. """ pass
[docs] @classmethod def parse( cls, to_parse: str | bytes | Path | io.IOBase, dev: DeviceData | None = None, ) -> DeviceData | None: """ Parse device information from collected data or file artifacts. Args: to_parse: Data to be parsed. This can either be the :class:`~pathlib.Path` of a file or the raw data to parse. dev: existing :class:`~peat.data.models.DeviceData` object to use instead of the one created by the module Returns: Exported version of the parsed data object Raises: DeviceError: If a critical error occurred """ if not cls.method_implemented("_parse"): raise DeviceError(f"_parse() is not implemented by {cls.__name__}") if dev is not None and not isinstance(dev, DeviceData): raise DeviceError(f"dev has type '{type(dev)}', expected DeviceData") if isinstance(to_parse, Path): file = to_parse.resolve() if not file.exists(): cls.log.error(f"Parse failed: '{to_parse.as_posix()}' doesn't exist") return None cls.log.debug(f"Parsing data from {to_parse.as_posix()}") # Copy file or directory to temp dir if config.TEMP_DIR: if file.is_file(): utils.copy_file(file, config.TEMP_DIR / file.name) elif file.is_dir(): # NOTE: we allow directories for things like the SEL or the ION tmp_path = config.TEMP_DIR / file.name if tmp_path.exists(): shutil.rmtree(tmp_path) shutil.copytree(file, tmp_path, dirs_exist_ok=True) else: # Treat file streams as raw data if isinstance(to_parse, (io.RawIOBase, io.StringIO)): cls.log.debug(f"Parsing data from file stream ({to_parse.__class__.__name__}") to_parse = to_parse.read() elif isinstance(to_parse, io.TextIOWrapper): # Regular text file cls.log.debug("Parsing data from file buffer (TextIOWrapper)") to_parse = to_parse.buffer.read() # Don't decode, read raw else: cls.log.debug(f"Parsing raw data with type '{to_parse.__class__.__name__}'") # TODO: use "magic" fingerprinting to determine type # Implement using a "file_magic_methods" class # attribute with list of functions # sceptre: check for a string in XML file to "fingerprint" the file label = "raw-unparsed-data" ext = "" for pat in cls.filename_patterns: if label in pat and "." in pat: ext = f".{pat.partition('.')[2]}" break if ext and not ext.startswith("."): ext = f".{ext}" file = config.TEMP_DIR / consts.sanitize_filename(f"{label}{ext}") if not utils.write_file(data=to_parse, file=file): cls.log.error("Parse failed due to an error during file writing") return None # TODO: improve handling of empty files # Example: if you pass a 0-byte apx file to M340._parse(), # it will generate a bunch of boilerplate and say it's a M340, # TC6 XML, etc. when really there's nothing there. In these cases, # we really should just generate file metadata and return a device # based on file name instead of passing it to the module for parsing. if file.is_file() and file.stat().st_size == 0: cls.log.warning(f"Input file '{file.name}' is empty, PEAT may behave strangely") # Check if directory is empty elif file.is_dir() and not list(file.iterdir()): raise DeviceError(f"Input directory is empty: {file}") # TODO: flesh out a single-device result and single file parse # ION: parse multiple files to get one result # ION, Sage, etc: multiple devices potentially present in one file parse_dev = cls._parse(file=file, dev=dev) if parse_dev is None: cls.log.debug("device.parse() failed, dev from _parse() is None") return None if not isinstance(parse_dev, DeviceData): raise DeviceError( f"_parse() returned object of invalid type '{type(parse_dev)}', " f"expected DeviceData. This is either a bug in the device module " f"or a bug in PEAT." ) if parse_dev not in datastore.objects: cls.log.debug(f"Parsed device {parse_dev.get_id()} not in datastore, adding it now...") datastore.objects.append(parse_dev) cls.update_dev(parse_dev) # Ensure duplicates get purged since this is a fresh parse parse_dev.purge_duplicates(force=True) if config.DEVICE_DIR: parse_dev.export_to_files() if file.is_file(): utils.move_file(config.TEMP_DIR / file.name, parse_dev.get_out_dir() / file.name) elif file.is_dir(): # NOTE: we allow directories for things like the SEL or the ION shutil.move( str(config.TEMP_DIR / file.name), str(parse_dev.get_out_dir() / file.name), ) return parse_dev
[docs] @classmethod def _parse(cls, file: Path, dev: DeviceData | None = None) -> DeviceData | None: """ Implemented by modules. Subclass :class:`~peat.device.DeviceModule` and override this method. """ pass
[docs] @classmethod def update_dev(cls, dev: DeviceData) -> None: """ Update the device's data with metadata, inferences, and lookups. .. note:: Data values are only changed if they're *unset*. In other words, existing values will NOT be overwritten. What's populated: - Basic Module attributes, e.g. ``cls.vendor_name => dev.description.vendor.name``. - Any Module-defined fields in ``cls.annotate_fields``, if present. - Calls :meth:`~peat.data.models.DeviceData.populate_fields`, which populates fields such as adding description values, network interfaces, and other values. This call will also implicitly lookup MAC addresses, IP addresses, and/or hostnames, unless disabled with the appropriate PEAT global configuration options. Args: dev: DeviceData instance to annotate. """ # Annotate with the standard class fields (vendor info, etc.) if not dev.description.vendor.name: dev.description.vendor.name = cls.vendor_name if not dev.description.vendor.id: dev.description.vendor.id = cls.vendor_id if not dev.description.brand: dev.description.brand = cls.brand if not dev.description.model: dev.description.model = cls.model if not dev.type: dev.type = cls.device_type # Set the module if not dev._module: dev._module = cls elif dev._module != cls: cls.log.warning(f"Existing module {dev._module} != update_dev() module {cls}") # Add any module-defined fields if cls.annotate_fields: for field, value in cls.annotate_fields.items(): # If the field isn't populated, then set it to the module's preferred value if not utils.rgetattr(dev, field): utils.rsetattr(dev, field, value) # Fill in fields (inferences) and add common fields, like description.product dev.populate_fields()
[docs] @classmethod def method_implemented(cls, method_name: str) -> bool: """ Checks if a method of a subclass of :class:`~peat.device.DeviceModule` is implemented and overrides the method in :class:`~peat.device.DeviceModule`). """ return ( getattr(cls, method_name).__code__ is not getattr(DeviceModule, method_name).__code__ )
__all__ = ["DeviceModule"]