Source code for peat.module_manager

import importlib
import inspect
import pkgutil
import sys
from collections.abc import Iterable
from pathlib import Path
from typing import Any

from peat import log, utils
from peat.device import DeviceData, DeviceModule, IPMethod, SerialMethod


[docs] class ModuleManager: """ Manager for PEAT modules that implement functionality of a device. Attributes: modules: All currently imported device modules, keyed by module name module_aliases: Mapping of alias keys to device modules runtime_imports: Modules that were imported at runtime runtime_paths: Paths of modules imported at runtime, if they were imported from a path. """ def __init__(self) -> None: # Initialize with the modules included with PEAT self.modules: dict[str, Any] = self._get_static_modules() self.module_aliases: dict[str, set[type[DeviceModule]]] = {} for module in self.modules.values(): self._update_aliases(module) # Track modules added at runtime (using import_module) self.runtime_imports: set[str] = set() self.runtime_paths: set[Path] = set() # If any were imported from files @property def names(self) -> list[str]: """ Sorted list of class names of all imported modules. """ return self._mods_to_names(self.modules.values()) @property def classes(self) -> list[type[DeviceModule]]: """ Sorted list of classes of all imported modules. """ return self._sort(list(self.modules.values())) @property def aliases(self) -> list[str]: """ Sorted list of all currently registered module aliases. """ return sorted(self.module_aliases.keys()) @property def alias_mappings(self) -> dict[str, list[str]]: """ Dict of aliases and the module names they map to. """ return {x: [z.__name__ for z in y] for x, y in self.module_aliases.items()}
[docs] def filter_names(self, filter_attr: str) -> list[str]: """ Return module names for which the attribute exists and is truthy. Example: set filter_attr to ``"ip_methods"`` to get all of the modules that support IP identification (:attr:`~peat.device.DeviceModule.ip_methods`). Args: filter_attr: Class attribute string to filter using Returns: Module names for which the attribute exists and is truthy. """ return self._mods_to_names(self._filter(self.modules.values(), filter_attr))
[docs] def import_module(self, module: Any, remove_aliases: bool = True) -> bool: """ Import a PEAT module. Args: module: module to import. Can be a path (:class:`str` or :class:`~pathlib.Path`, class, or list of paths. remove_aliases: if aliases for an existing module should be removed Returns: If the import was successful """ # List of module paths if isinstance(module, list): was_successful = False for mod in module: module_path = utils.check_file(mod) if module_path and self.import_module(mod, remove_aliases): was_successful = True self.runtime_paths.add(module_path) else: log.warning(f"Import failed for module '{mod!s}'") return was_successful elif isinstance(module, (str, Path)): return self.import_module_path(Path(module).resolve(), remove_aliases) else: return self.import_module_cls(module, remove_aliases)
def _extract_members(self, module_path: str) -> list[type[DeviceModule]]: """ Import a Python module e.g. ``module.path``. Note that the module's parent folder (and if there's a package, the package's parent) must be in :const:`sys.path`. """ try: pymod = importlib.import_module(module_path) except ModuleNotFoundError as ex: log.debug(f"Failed to import {module_path}: {ex}") return [] else: members = inspect.getmembers(pymod, inspect.isclass) return [m for _, m in members if ModuleManager.is_valid_module(m)]
[docs] def import_module_path(self, path: Path, remove_aliases: bool = True) -> bool: """ Import all modules from a directory. .. note:: The module CANNOT be "hidden" by placing the contents in an ``__init__.py`` file, and must reside in it's own .py file (e.g. ``mymodule.py``). Args: path: :class:`~pathlib.Path` to the directory containing modules to import remove_aliases: If aliases for an existing module should be removed Returns: If the import was successful """ log.trace(f"Attempting to import module(s) from file path {path!s}") # Unsure if needed, leaving to be safe importlib.invalidate_caches() # Tell Python where to find the module by adding it to the path sys.path.append(path.resolve().parent.as_posix()) if path.is_file(): found = self._extract_members(path.stem) elif path.is_dir(): sources = pkgutil.iter_modules([path.as_posix()], prefix=f"{path.name}.") found = [m for s in sources for m in self._extract_members(s.name)] else: log.error(f"Module import path does not exist: '{path!s}'") return False if found: log.trace3(f"Found modules: {found}") log.info(f"Attempting to import {len(found)} device modules from {path.name}") valid = 0 for module in found: if self.import_module_cls(module, remove_aliases): valid += 1 if valid: log.info(f"Finished importing {valid} valid device modules from {path.name}") self.runtime_paths.add(path) return True log.error(f"No valid modules found in path: {path!s}") return False
[docs] def import_module_cls(self, module: type[DeviceModule], remove_aliases: bool = True) -> bool: """ Adds the module object to the registered PEAT device modules. Args: module: The module class to import remove_aliases: If aliases for an existing module should be removed Returns: If the import was successful """ if not ModuleManager.is_valid_module(module): log.error(f"Invalid PEAT module: {module!s}") return False name = module.__name__.lower() if name in self.modules: log.warning( f"Overwriting existing module {self.modules[name].__name__} " f"defined in {self.modules[name].__module__} with the module " f"defined in {module.__module__}." ) if remove_aliases: log.debug(f"Removing aliases for module {self.modules[name].__name__}") # Remove the old module from the aliases for alias in self.module_aliases: if self.modules[name] in self.module_aliases[alias]: self.module_aliases[alias].remove(self.modules[name]) self.modules[name] = module self._update_aliases(module) self.runtime_imports.add(name) return True
[docs] def get_module(self, name: str) -> type[DeviceModule] | None: """ Get module by name. Args: name: Exact name of a PEAT module Returns: The PEAT module object (subclass of :class:`~peat.device.DeviceModule`), or :class:`None` if the module isn't imported or doesn't exist. """ return self.modules.get(self._norm_name(name))
[docs] def get_modules(self, name: str, filter_attr: str | None = None) -> list[type[DeviceModule]]: """ Get PEAT device module classes. Args: name: Module name or alias filter_attr: Only return modules for which this attribute is true Returns: List of module classes (subclasses of :class:`~peat.device.DeviceModule`) """ mods: list[type[DeviceModule]] = [] if self._norm_name(name) in self.modules: mods.append(self.modules[self._norm_name(name)]) elif self._norm_alias(name) in self.module_aliases: mods.extend(self.module_aliases[self._norm_alias(name)]) if filter_attr: # Only include if the attribute is Truthy mods = self._filter(mods, filter_attr) return self._sort(mods)
[docs] def lookup_types( self, dev_types: Any | None | list[str | type[DeviceModule] | DeviceModule] = None, filter_attr: str | None = None, subclass_method: str | None = None, filter_values: dict | None = None, ) -> list[type[DeviceModule]]: """ Process strings and classes into a sorted :class:`list` of module classes. Args: dev_types: DeviceModule names, aliases, classes, or instances to use and resolve into a list of module classes. If :obj:`None`, all currently imported modules searched. filter_attr: Only return modules for which this attribute is true subclass_method: Filter modules that have implemented a method from the base :class:`~peat.device.DeviceModule` class, e.g. :meth:`~peat.device.DeviceModule.identify_ip`. filter_values: Values to filter modules by, with :class:`dict` keys being names of module class attributes and values being the values to compare. Comparisons must be exact matches and strings are therefore case-sensitive. Example: ``{"device_type": "PLC"}`` Returns: List of module classes (subclasses of :class:`~peat.device.DeviceModule`) """ # Default to searching all imported modules if dev_types is None: dev_types = self.classes mods: set[type[DeviceModule]] = set() # set to prevent duplicates if not isinstance(dev_types, list): dev_types = [dev_types] for dev in dev_types: if isinstance(dev, str): if self._norm_name(dev) in self.modules: mods.add(self.modules[self._norm_name(dev)]) elif self._norm_alias(dev) in self.module_aliases: mods.update(self.module_aliases[self._norm_alias(dev)]) elif inspect.isclass(dev) and issubclass(dev, DeviceModule): mods.add(dev) # Just add the class that was passed elif isinstance(dev, DeviceModule): mods.add(type(dev)) # Extract the class from the instance else: log.error(f"Invalid device type: {dev}") if filter_attr: # Only include if the attribute is Truthy mods: list[type[DeviceModule]] = self._filter(mods, filter_attr) if subclass_method: # If the method has been overridden in the module subl: list[type[DeviceModule]] = [] for m in mods: if subclass_method in m.__dict__: subl.append(m) else: # Subclasses, e.g. for the SELs base = m.__bases__[0] if base is not DeviceModule and subclass_method in base.__dict__: subl.append(m) mods: list[type[DeviceModule]] = subl if filter_values: val_filtered: list[type[DeviceModule]] = [] for m in mods: for key, value in filter_values.items(): if getattr(m, key, None) == value: val_filtered.append(m) break mods: list[type[DeviceModule]] = val_filtered return self._sort(mods)
[docs] def lookup_names( self, dev_types: Any | None | list[str | type[DeviceModule] | DeviceModule], filter_attr: str | None = None, subclass_method: str | None = None, filter_values: dict | None = None, ) -> list[str]: """ Process strings and classes into a sorted :class:`list` of module names. This is a thin wrapper around :meth:`~peat.module_manager.ModuleManger.lookup_types`. Args: dev_types: DeviceModule names, aliases, classes, or instances to use and resolve into a list of module classes. If :obj:`None`, all currently imported modules searched. filter_attr: Only return modules for which this attribute is true subclass_method: Filter modules that have implemented a method from the base :class:`~peat.device.DeviceModule` class, e.g. :meth:`~peat.device.DeviceModule.identify_ip`. filter_values: Values to filter modules by, with :class:`dict` keys being names of module class attributes and values being the values to compare. Comparisons must be exact matches and strings are therefore case-sensitive. Example: ``{"device_type": "PLC"}`` Returns: List of module names """ types = self.lookup_types(dev_types, filter_attr, subclass_method, filter_values) return self._mods_to_names(types)
[docs] def alias_to_names(self, alias: str) -> list[str]: """ Resolve an alias into names of modules it corresponds to. Args: alias: The alias to resolve Returns: List of names of modules that the alias resolved to, or an empty list if it didn't resolve to anything. """ alias = self._norm_alias(alias) if alias not in self.module_aliases: log.error(f"Alias {alias} is not present") return [] return self._mods_to_names(self.module_aliases[alias])
@staticmethod def _sort(mods: set | list) -> list: """ Improve the determinism of repeated operations (e.g. scanning). """ return sorted(mods, key=lambda x: x.__name__) @staticmethod def _norm_alias(alias: str) -> str: """ Normalize module alias. """ return alias.strip().lower() # Any characters are fine in aliases @staticmethod def _norm_name(name: str) -> str: """ Normalize module name. """ return name.strip().lower().replace("-", "").replace(" ", "") @staticmethod def _filter(mods: Iterable[type[DeviceModule]], filter_attr: str) -> list[type[DeviceModule]]: """ Filter modules to those with a 'truthy' class attribute. """ return [m for m in mods if bool(getattr(m, filter_attr, False))] @staticmethod def _mods_to_names(mods: Iterable[type[DeviceModule]]) -> list[str]: """ Convert modules to a sorted list of string names. """ return sorted(x.__name__ for x in mods) @staticmethod def _get_static_modules() -> dict[str, Any]: """ Get all modules included with PEAT in ``/peat/modules/``. """ if "peat.modules" not in sys.modules: importlib.invalidate_caches() importlib.import_module("peat.modules") mods = inspect.getmembers(sys.modules["peat.modules"], inspect.isclass) return {n.lower(): m for n, m in mods if ModuleManager.is_valid_module(m)} def _update_aliases(self, module: type[DeviceModule]) -> None: """ Make the module resolvable by any aliases that apply to it. The default aliases are the device's vendor and ``"all"``. """ # Pull aliases from device class attributes and various other things # "all" makes the CLI magically work for scan/pull sources = [ "all", module.device_type, module.vendor_id, module.vendor_name, module.brand, module.model, *module.module_aliases, # Add all the aliases defined in the module ] # Normalize, remove empty values, and deduplicate using Set comprehension normalized_aliases: set[str] = {self._norm_alias(a) for a in sources if a} for alias in normalized_aliases: if alias not in self.module_aliases: self.module_aliases[alias] = set() # type: set[Type[DeviceModule]] self.module_aliases[alias].add(module)
[docs] @classmethod def is_valid_module(cls, module: Any) -> bool: """ Checks if a Python object is a PEAT device module. """ if module in [DeviceModule, DeviceData, IPMethod, SerialMethod]: # This hack is used to filter the PEAT core classes # out of a few areas, so don't log when it fails. return False elif not inspect.isclass(module): reason = "module object is not a Python class" elif not issubclass(module, DeviceModule): reason = "module is not a subclass of peat.DeviceModule" else: return True log.debug(f"Module validation failed for '{module}': {reason}") return False
module_api = ModuleManager() __all__ = ["ModuleManager", "module_api"]