Source code for peat.data.store

from typing import Any

from peat import log

from .data_utils import DeepChainMap, merge_models
from .models import DeviceData


[docs] class Datastore: """ `Registry <https://martinfowler.com/eaaCatalog/registry.html>`__ of :class:`~peat.data.models.DeviceData` instances. """ objects: list[DeviceData] = None """:class:`~peat.data.models.DeviceData` instances in this datastore.""" global_options: dict[str, Any] = None """ Options that will be applied and used by all devices in this datastore. Changes to the values in this :class:`dict` will be reflected in the options of all devices in this datastore. """ def __init__(self) -> None: self.objects = [] self.global_options = {} self._data_obj: DeviceData | None = None
[docs] def create(self, ident: str, ident_type: str) -> DeviceData: """ Create a new :class:`~peat.data.models.DeviceData` instance, add it to the datastore, and return it. """ dev_data = DeviceData(**{ident_type: ident}) self.objects.append(dev_data) return dev_data
[docs] def get(self, ident: str, ident_type: str = "ip") -> DeviceData: """ Get a :class:`~peat.data.models.DeviceData` instance from the datastore. .. warning:: If you want to search for an existing device and fail if one isn't found, then use :meth:`~peat.data.store.Datastore.search` instead. .. code-block:: python :caption: Examples of ``datastore.get()`` >>> from pprint import pprint >>> from peat import SCEPTRE, datastore >>> device = datastore.get("192.0.2.20") >>> device.ip '192.0.2.20' >>> device = datastore.get("192.0.2.123") # doctest: +SKIP >>> SCEPTRE.pull(device) # doctest: +SKIP >>> pprint(device.export()) # doctest: +SKIP >>> parsed_name = "relay_1" # Name extracted from the pulled config >>> device = datastore.get(parsed_name, "name") # doctest: +SKIP >>> device.data.name == parsed_name # doctest: +SKIP True Args: ident: Identifier to search for, such as ``192.168.0.1`` ident_type: What ``ident`` is, e.g. ``"serial_port"`` or ``"ip"`` Returns: The :class:`~peat.data.models.DeviceData` object found or a new object if there wasn't a device found that matched the arguments. """ dev = self.search(ident, ident_type) if dev: return dev return self.create(ident, ident_type)
[docs] def search(self, ident: str, ident_type: str) -> DeviceData | None: """ Search for the device with a given identifier. Example: During passive scanning a device was initialized with a MAC which gets resolved to a IP. Later during active scanning, we want to add information to the device. We can look it up by it's IP, even if it was originally added to the datastore using it's MAC. Args: ident: Identifier to search for, such as ``192.168.0.1`` ident_type: What ``ident`` is, e.g. ``"serial_port"`` or ``"ip"`` Returns: The :class:`~peat.data.models.DeviceData` found or :obj:`None` if the search failed """ for obj in self.objects: if getattr(obj, ident_type, None) == ident: return obj return None
[docs] def remove(self, to_remove: DeviceData) -> bool: """ Remove a device from the datastore. Args: to_remove: :class:`~peat.data.models.DeviceData` instance to remove Returns: If the device was successfully found and removed """ for i in range(len(self.objects)): if self.objects[i] is to_remove: del self.objects[i] return True return False
[docs] def prune_inactive(self) -> None: """Remove inactive devices from the datastore (``dev._is_active == False``).""" if not self.objects: return inactive_devs = [x for x in self.objects if not x._is_active] for dev in inactive_devs: self.remove(dev) log.debug(f"Pruned {len(inactive_devs)} inactive devices from datastore")
[docs] def deduplicate(self, prune_inactive: bool = True) -> None: """ Cleanup duplicate devices in the datastore. Duplicates are only merged if they have the same IP, MAC or serial port. Any duplicates found are merged into a single :class:`~peat.data.models.DeviceData` object, and the additional copies are removed from the list of objects. Args: prune_inactive: If inactive devices should be removed before beginning deduplication """ if not self.objects: return if prune_inactive: self.prune_inactive() # Only log at INFO level if there are enough objects to possibly make it lag msg = f"Searching for duplicates in {len(self.objects)} objects..." if len(self.objects) > 2: log.info(msg) else: log.debug(msg) for obj in self.objects: obj.purge_duplicates() deduped = [] # type: list[DeviceData] removed = set() # type: set[int] for obj in self.objects: # Object was removed as a duplicate if id(obj) in removed: continue # Compare anything that isn't the object # and wasn't already removed as a duplicate for comp in self.objects: if id(comp) in removed or comp is obj: continue if obj.is_duplicate(comp): log.info(f"Merging duplicate {comp.get_id()} into {obj.get_id()}") # TODO: copy/merge stuff other than the data, e.g. options? # TODO: delete duplicate timeseries document from Elasticsearch # Merge in data from the duplicate merge_models(obj, comp) obj._is_deduplicated = False # Purge any new duplicates from the now-merged object obj.purge_duplicates() # Add duplicate to exclusion list removed.add(id(comp)) deduped.append(obj) self.objects = deduped # Replace objects list with de-duped objects log.debug( f"Finished deduplicating objects, {len(removed)} duplicates were merged and removed" )
@property def verified(self) -> list[DeviceData]: """Devices that have been verified (``dev._is_verified == True``).""" return [d for d in self.objects if d._is_verified] @property def device_options(self) -> DeepChainMap: """ Get global options with module defaults and injects applied. This is a hack, to be sure, but refactoring will take more time than it's worth. """ if not self._data_obj: self._data_obj = DeviceData() return self._data_obj._options
#: Global singleton for managing :class:`~peat.data.models.DeviceData` #: instances and making them available throughout PEAT. datastore = Datastore() __all__ = ["Datastore", "datastore"]