Source code for peat.data.models

from __future__ import annotations

import copy
import inspect
import json
import mimetypes
from copy import deepcopy
from datetime import datetime, timedelta
from pathlib import Path, PurePath
from typing import Any, AnyStr, Literal

from pydantic import (
    Field,
    PositiveInt,
    PrivateAttr,
    confloat,
    conint,
    constr,
    validator,
)

import peat
from peat import Elastic, config, consts, state, utils, log
from peat.es_mappings import KEYWORD_AND_TEXT
from peat.consts import SYSINFO, WINDOWS, PeatError
from peat.data.default_options import DEFAULT_OPTIONS
from peat.protocols import (
    address_to_pathname,
    addresses,
    ip_to_mac,
    mac_to_ip,
    mac_to_vendor_string,
)

from .base_model import BaseModel
from .data_utils import (
    DeepChainMap,
    dedupe_model_list,
    lookup_by_str,
    match_all,
    merge_models,
    sort_model_list,
    strip_empty_and_private,
    strip_key,
)
from .validators import (
    clean_protocol,
    cleanstr,
    convert_arbitrary_path_to_purepath,
    strip_quotes,
    validate_ecs,
    validate_hash,
    validate_hex,
    validate_ip,
    validate_mac,
)

# NOTE:
# - values added directly to sets/lists are not validated (e.g. related.*) (TODO: fix this somehow?)
# - validation is done on assignment because validate_assignment is true
# - validation is NOT done when items are added to a default value, e.g. "related.add("hash")"
# - Field(...) is ONLY used for schema customization, validation configuration is done using the type annotation.
# - on export, pydantic preserves the order fields are defined in the models
#     https://github.com/samuelcolvin/pydantic/issues/593#issuecomment-501735842
# https://en.wikipedia.org/wiki/God_object


# Needed for generation of JSON schema + the documentation
@classmethod
def __pathlib_modify_schema__(
    cls,  # noqa: ARG001
    field_schema: dict[str, Any],
) -> None:
    field_schema.update(type="string", format="path")


PurePath.__modify_schema__ = __pathlib_modify_schema__


[docs] class Vendor(BaseModel): """ Identifies a device vendor (SEL, Schneider Electric, Siemens, etc). """ id: constr(strip_whitespace=True) = "" """ Abbreviated version of the vendor name that can be used for lookups. Examples - ``SEL`` - ``WindRiver`` - ``Schneider`` - ``Siemens`` - ``Sandia`` """ name: constr(strip_whitespace=True) = "" """ The full expanded vendor name. Used for display in a visualization or dashboard. Examples - ``Schweitzer Engineering Laboratories`` - ``Wind River Systems`` - ``Schneider Electric`` """
[docs] class Description(BaseModel): """ Identifying information such as vendor, brand, and model. """ brand: constr(strip_whitespace=True) = "" """ Brand of the device. Can be empty string if not applicable, such as for most SEL devices. Examples - ``Modicon`` - ``PowerLogic ION`` - ``""`` """ contact_info: str = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ Contact info for the device, e.g. an email address, name, or phone number. This is commonly retrieved from :term:`SNMP`. """ description: str = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ Free-form description of the device, such as a "description" configuration value extracted from the device or other general information that is useful to note. """ full: constr(strip_whitespace=True) = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ Combination of vendor, brand, model, and any other identifiers. Used to perform lookups with fuzzy string matching. Examples - ``Schneider Electric Modicon M340`` - ``SEL-351S`` """ model: constr(strip_whitespace=True) = "" """ Model of the device. Examples - ``M340`` - ``351S`` """ product: constr(strip_whitespace=True) = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ The product identifier for the device, minus the vendor. This is includes the brand and model. Examples - ``Modicon M340`` - ``351S`` """ vendor: Vendor = Vendor() """The manufacturer/vendor of the device.""" _strip_quotes = validator("description", "contact_info", allow_reuse=True)(strip_quotes)
[docs] class Hardware(BaseModel): """ Hardware information of the device, e.g. amount of :term:`RAM`. """ cpu: Description = Description() """ Information about the CPU on the device, such as the vendor and model. """ id: constr(strip_whitespace=True) = "" """ Hardware ID of the device. """ storage_available: conint(ge=0) | None = None """ Amount of persistent storage currently available on the device, in bytes. """ storage_usage: conint(ge=0) | None = None """ Amount of persistent storage currently in use on the device, in bytes. """ storage_total: conint(ge=0) | None = None """ Total amount of storage on the device, in bytes. """ storage_type: constr(strip_whitespace=True, to_lower=True) = "" """ Type of storage on the device. Values should be lowercase and underscore-separated. Examples - ``hdd`` - ``ssd`` - ``nvram`` """ memory_available: conint(ge=0) | None = None """ Amount of volatile memory (e.g. :term:`RAM`) currently available, in bytes. """ memory_usage: conint(ge=0) | None = None """ Amount of volatile memory (e.g. :term:`RAM`) currently in use, in bytes. """ memory_total: conint(ge=0) | None = None """ Total amount of volatile memory (e.g. :term:`RAM`) on the device, in bytes. """ memory_type: constr(strip_whitespace=True, to_lower=True) = "" """ Type of volatile memory on the device, lowercase and underscore-separated. Examples - ``ddr2_sdram`` """ revision: constr(strip_whitespace=True) = "" """ Hardware revision of the device (e.g. MinorRev field in Rockwell L5X). This is distinct from the software (e.g., firmware or OS version), and is purely for the hardware itself (e.g., the mainboard or module). The detailed meaning of the value in this field is device-dependant. """ version: constr(strip_whitespace=True) = "" """ Hardware version of the device (e.g. MajorRev field in Rockwell L5X). This is distinct from the software (e.g., firmware or OS version), and is purely for the hardware itself (e.g., the mainboard or module). The detailed meaning of the value in this field is device-dependant. """
[docs] def annotate(self, dev: DeviceData | None = None): # noqa: ARG002 # If available + usage are set, and total is not, auto-calculate total if not self.memory_total and self.memory_available and self.memory_usage: self.memory_total = self.memory_available + self.memory_usage if not self.storage_total and self.storage_available and self.storage_usage: self.storage_total = self.storage_available + self.storage_usage
[docs] class Hash(BaseModel): """ Hashes of raw data or a file. .. note:: All hashes are uppercase hexadecimal strings, per :term:`ECS` """ # NOTE: "Optional" is used here since a default of "" will fail validation checks md5: constr(min_length=32, max_length=32, strip_whitespace=True) | None = Field( default=None, title="MD5 hash" ) """MD5 hash.""" sha1: constr(min_length=40, max_length=40, strip_whitespace=True) | None = Field( default=None, title="SHA1 hash" ) """SHA1 hash.""" sha256: constr(min_length=64, max_length=64, strip_whitespace=True) | None = Field( default=None, title="SHA256 hash" ) """SHA256 hash.""" sha512: constr(min_length=128, max_length=128, strip_whitespace=True) | None = Field( default=None, title="SHA512 hash" ) """SHA512 hash.""" # Validators _validate_hash = validator("md5", "sha1", "sha256", "sha512", allow_reuse=True)(validate_hash)
[docs] class User(BaseModel): """ Information describing a user on a device. """ description: constr(strip_whitespace=True) = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ General description of the user (this is open to interpretation). """ domain: constr(strip_whitespace=True) = "" """ Name of the domain the user is a member of. For example, an LDAP or Active Directory domain name. """ # TODO: validation of email? also, use to replace utils.is_email(). # - https://pypi.org/project/email-validator # - EmailStr (https://pydantic-docs.helpmanual.io/usage/types/#pydantic-types) email: constr(strip_whitespace=True) = "" """ User email address. Examples - example@example.com """ full_name: constr(strip_whitespace=True) = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ The user's full name, if known. Examples - Billy Bob Joe - Administrator """ id: str = "" """ Unique identifier of the user. """ name: constr(strip_whitespace=True) = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ Short name or login of the user. """ permissions: set[str] = set() """ Permissions the user has available. """ roles: set[str] = set() """ The user's roles, as strings. Examples - Administrator - User - engineers """ uid: constr(strip_whitespace=True) = "" """ The user's numeric user ID, if applicable. """ gid: constr(strip_whitespace=True) = "" """ The user's numeric group ID, if applicable. """ extra: dict = Field(default={}, elastic_type="flattened") """ Additional data or metadata about the user. This also includes unstructured raw data from the device that may be relevant. """ _sort_by_fields: tuple[str] = PrivateAttr(default=("id", "name", "full_name", "description"))
[docs] def annotate(self, dev: DeviceData | None = None): if dev: if self.email: dev.related.emails.add(self.email) if self.full_name: dev.related.user.add(self.full_name) if self.id: dev.related.user.add(self.id) if self.name: dev.related.user.add(self.name) if self.roles: for role in self.roles: dev.related.roles.add(role)
[docs] class File(BaseModel): """ Contextual information and metadata for a file. The file could be on disk, in memory, a directory, or simply represent an artifact that's known to be on the device but PEAT doesn't have the ability to access. """ created: datetime | None = None """ File creation time. """ description: constr(strip_whitespace=True) = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ General human-readable description of what the file is. """ device: constr(strip_whitespace=True) = "" """ Device that is the source of the file. If this is a static parse, then it should be the name of the system the file was recovered from or parsed on. Otherwise, this should be the ID of the device it was pulled from. """ directory: constr(strip_whitespace=True) = "" """ Path to the directory where the file is located. """ extension: constr(strip_whitespace=True) = "" """ File extension, without a leading ``.`` character. Examples - ``txt`` - ``tar.gz`` - ``xml`` - ``zip`` """ hash: Hash = Hash() """ Hashe(s) of the file's contents. """ local_path: Path | None = None """ Concrete path of the file on the local system (the system running PEAT). """ path: PurePath | None = None """ Path of the file, in it's original form. This may be either the path to the file on the device, or the path from the system it originated from (e.g. as extracted from a project file or using PEAT Pillage). """ peat_module: constr(strip_whitespace=True) = "" """ PEAT module associated with this file artifact. """ gid: constr(strip_whitespace=True) = "" """ Primary group ID (GID) of the file. """ group: constr(strip_whitespace=True) = "" """ File's owning group name. """ mime_type: constr(strip_whitespace=True) = "" """ MIME type should identify the format of the file or stream of bytes using the `IANA official types <https://datatracker.ietf.org/doc/html/rfc6838>`__, where possible. When more than one type is applicable, the most specific type should be used. """ mode: constr(strip_whitespace=True) = "" """ Mode of the file in octal representation. Examples: - ``0640`` - ``0644`` - ``0777`` """ mtime: datetime | None = None """ Last time the file content was modified. """ name: constr(strip_whitespace=True) = "" """ File's name, including extension (e.g. ``SET_ALL.txt``). """ original: bytes = b"" """ Raw contents of the file. """ owner: constr(strip_whitespace=True) = "" """ File owner's username. """ size: conint(ge=0) | None = None """ Size of the file in bytes. Only relevant when ``file.type`` is ``"file"``. """ target_path: PurePath | None = None """ Target path for symlinks. Only relevant when ``file.type`` is ``"symlink"``. """ type: Literal["file", "dir", "symlink", ""] = "" """ File type, following the :term:`ECS`. **Allowed values** - ``file`` : It's a file - ``dir`` : It's a directory - ``symlink`` : It's a symbolic link """ uid: constr(strip_whitespace=True) = "" """ Numeric user ID (UID) or security identifier (SID) of the file owner. """ extra: dict = Field(default={}, elastic_type="flattened") """ Additional information about the file that doesn't fit into the data model. """ _es_index_varname: str = PrivateAttr(default="ELASTIC_FILES_INDEX") _sort_by_fields: tuple[str] = PrivateAttr( default=( "directory", "type", "name", "device", ) ) # Validators
[docs] @validator("extension", allow_reuse=True) def clean_file_extension_string(cls, v): return v.strip().lower().strip(".")
_convert_paths = validator("path", "target_path", allow_reuse=True, pre=True)( convert_arbitrary_path_to_purepath )
[docs] def annotate(self, dev: DeviceData | None = None): # Auto-populate many of the fields process_file(self) if dev: if not self.device and (self.path or self.local_path or self.extension): dev_id = dev.get_id() # Set to current Device ID if "unknown-dev-" not in dev_id: self.device = dev_id # Add paths to related.files, if it isn't the local path # or a path to the same directory on the local system. # Technically, this will exclude files collected by a PEAT # run on, say, a SCADA server or Engineering Workstation, # but the common case is "peat parse" or calling DeviceModule.parse() # on a pulled file. if ( self.path and self.path != self.local_path and (not self.local_path or self.path.parent != self.local_path.parent) ): dev.related.files.add(self.path.as_posix()) if self.target_path: dev.related.files.add(self.target_path.as_posix()) _add_hashes_to_related(dev, self.hash)
[docs] def gen_elastic_content(self, dev: DeviceData | None = None) -> dict: self.annotate(dev) # populate fields time_now = Elastic.time_now() timestamp = time_now if self.created is not None: timestamp = Elastic.convert_tstamp(self.created) elif self.mtime is not None: timestamp = Elastic.convert_tstamp(self.mtime) content = { "@timestamp": timestamp, "message": self.description if self.description else str(self.path), "tags": ["file"], "event": {"ingested": time_now}, "file": self.dict(exclude_defaults=True), } if dev: # host.{basic fields} content["host"] = dev.gen_base_host_fields_content() content["tags"].extend( [ dev.description.vendor.name, dev.description.product, ] ) if dev._module: content["event"]["module"] = dev._module.__name__ return content
[docs] class Firmware(BaseModel): """ Device firmware. """ checksum: constr(strip_whitespace=True) = "" """ Checksum used by the device to verify the firmware image is valid. This is usually found in or with the firmware image file or the device configuration. """ extra: dict = Field(default={}, elastic_type="flattened") """ Additional unstructured information related to the firmware, generally vendor-specific information such as "settings_version". """ file: File = File() """ Firmware image metadata, such as size, hashes, etc. """ hash: Hash = Hash() """ Hashes of the raw firmware (the contents of ``original``). .. note:: This may differ from the file hash, if present. """ id: constr(strip_whitespace=True) = "" """ Firmware identification string, e.g. the "FID" or "BFID" strings in SEL devices. """ last_updated: datetime | None = None """ The timestamp of when the firmware was last updated on the device. """ original: bytes = b"" """ Full raw unmodified binary image of the device's firmware. """ revision: constr(strip_whitespace=True) = "" """ Revision of the device's current firmware (or operating system). Common field seen on devices that's sometimes distinct from the canonical version string. """ release_date: datetime | None = None """ The release date of the firmware. """ timestamp: datetime | None = None """ Timestamp as extracted from the device or firmware, device-dependent meaning. Often represents when the firmware was compiled/built or released. """ version: constr(strip_whitespace=True) = "" """ Version of the device's current firmware (or operating system). """
[docs] def annotate(self, dev: DeviceData | None = None): if self.original and not _all_hashes_set(self.hash): self.hash = Hash.parse_obj(utils.gen_hashes(self.original)) if dev: _add_hashes_to_related(dev, self.hash) # !! Hack to set file description properly !! if not self.file.description and self.hash.md5: if self.hash.md5 == dev.boot_firmware.hash.md5: self.file.description = "Boot firmware for the device" else: self.file.description = "Firmware for the device"
[docs] class Logic(BaseModel): """ What the device has been programmed to do (it's "logic"). In a :term:`PLC`, the logic is one or more of the five `IEC 61181-3 <https://en.wikipedia.org/wiki/IEC_61131-3>`_ languages: - Ladder Diagram (LD) - Function Block Diagram (FBD) - Structured Text (ST) - Instruction List (IL) - Sequential Function Chart (SFC) In a Relay, the logic is the protection schemes. In a Power Meter, the logic is the programmed metering/monitoring setpoints. .. note:: Logic is separate from protocol register mappings or values, such as Modbus or DNP3, as well as memory values. There is sometimes overlap, as some devices have been known to store their logic as e.g. a set of Modbus registers. """ author: constr(strip_whitespace=True) = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ Name of the person/organization/program that wrote the logic. """ created: datetime | None = None """ :term:`UTC` timestamp of when the logic was first created (when the source project file was created) or fist uploaded to the device. """ description: str = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ Description for the logic or project file. """ file: File = File() """ File or directory of the logic. """ formats: dict[str, AnyStr | dict] = Field(default={}, elastic_type="nested") """ Sub-formats the logic has been parsed into, such as ``"structured_text"`` or ``"tc6"``. Device dependent. """ hash: Hash = Hash() """ Hashes of the raw unparsed logic (the contents of``original``). .. note:: This may differ from the file hash, if present. """ id: constr(strip_whitespace=True) = "" """ Project ID or a similar identifier for the logic, e.g. a machine-generated :term:`UUID` for the logic stored by the device. """ last_updated: datetime | None = None """ :term:`UTC` timestamp of when the logic was last updated on the device. """ name: constr(strip_whitespace=True) = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ Project name or other such identifier for the logic, e.g. a human-readable name for the logic stored by the device. """ original: str = Field(default="", elastic_type="text") """ Unparsed device program logic, in whatever format makes sense for that device. It is the file(s) that make up the process state logic, e.g., the ladder logic on a :term:`PLC` or the configured protection schemes on a substation relay. """ parsed: str = Field(default="", elastic_type="text") """ Complete parsed device program logic, in whatever format makes sense for that device. It is the file(s) that make up the process state logic, e.g., the ladder logic on a :term:`PLC` or the configured protection schemes on a substation relay. """ _strip_quotes = validator("author", "description", "name", "id", "parsed", allow_reuse=True)( strip_quotes )
[docs] def annotate(self, dev: DeviceData | None = None): if self.original and not _all_hashes_set(self.hash): self.hash = Hash.parse_obj(utils.gen_hashes(self.original)) if dev: _add_hashes_to_related(dev, self.hash)
[docs] class CertEntity(BaseModel): """ Issuer or Subject in a x509 certificate. `ECS documentation: x509 Certificate Fields <https://www.elastic.co/guide/en/ecs/current/ecs-x509.html>`__ """ common_name: constr(strip_whitespace=True) = "" """Common name (CN).""" country: constr(strip_whitespace=True) = "" """Country code.""" distinguished_name: constr(strip_whitespace=True) = "" """Distinguished Name (DN).""" locality: constr(strip_whitespace=True) = "" """Locality (L).""" organization: constr(strip_whitespace=True) = "" """Organization (O).""" organizational_unit: constr(strip_whitespace=True) = "" """Organizational Unit (OU).""" state_or_province: constr(strip_whitespace=True) = "" """State or province names (ST, S, or P)."""
[docs] class X509(BaseModel): """ x509 certificate. `ECS documentation: x509 Certificate Fields <https://www.elastic.co/guide/en/ecs/current/ecs-x509.html>`__ """ alternative_names: list[constr(strip_whitespace=True)] = [] """List of subject alternative names (SAN).""" hash: Hash = Hash() """Hashes of raw certificate contents (the data stored in ``original``).""" issuer: CertEntity = CertEntity() """Issuing certificate authority.""" not_after: datetime | None = None """Time at which the certificate is no longer considered valid.""" not_before: datetime | None = None """Time at which the certificate is first considered valid.""" original: str = "" """The raw certificate data.""" public_key_algorithm: constr(strip_whitespace=True) = "" """Algorithm used to generate the public key.""" public_key_curve: constr(strip_whitespace=True) = "" """The curve used by the elliptic curve public key algorithm.""" public_key_exponent: conint(ge=0) | None = None """Exponent used to derive the public key.""" public_key_size: conint(ge=0) | None = None """The size of the public key space in bits.""" serial_number: constr(strip_whitespace=True) = "" """ Unique serial number issued by the certificate authority. For consistency, if this value is alphanumeric, it should be formatted without colons and uppercase characters. """ signature_algorithm: constr(strip_whitespace=True) = "" """Identifier for certificate signature algorithm.""" subject: CertEntity = CertEntity() """Certificate subject.""" version_number: constr(strip_whitespace=True) = "" """Version of x509 format."""
[docs] def annotate(self, dev: DeviceData | None = None): if self.original and not _all_hashes_set(self.hash): self.hash = Hash.parse_obj(utils.gen_hashes(self.original)) if dev: _add_hashes_to_related(dev, self.hash) if self.alternative_names: for an in self.alternative_names: if utils.is_ip(an): dev.related.ip.add(an) elif utils.is_email(an): dev.related.emails.add(an) if "://" in self.issuer.common_name: dev.related.urls.add(self.issuer.common_name) if "://" in self.subject.common_name: dev.related.urls.add(self.subject.common_name)
[docs] class UEFIHash(BaseModel): """ UEFI model that specifically labels objects from a UEFI file hash file. This model is different because it includes all file systems, not just the EFI File system File system is either FS0 or FS1 pathname is the pathname of the files in the file system hash is the SHA256 hash of the files computed via python script """ file_system: str = "" pathname: str = "" hash: str = "" _es_index_varname: str = PrivateAttr(default="ELASTIC_UEFI_HASHES_INDEX") _sort_by_fields: tuple[str] = PrivateAttr(default=("file_system", "pathname", "hash"))
[docs] def gen_elastic_content(self, dev: DeviceData | None = None) -> dict: self.annotate(dev) # populate fields time_now = Elastic.time_now() content = { # @timestamp is the time mentioned above "@timestamp": time_now, "event": {"ingested": Elastic.time_now()}, "uefi": self.dict(exclude_defaults=True), } if dev: # host.{basic fields} content["host"] = dev.gen_base_host_fields_content() if dev._module: content["event"]["module"] = dev._module.__name__ return content
[docs] class UEFIFile(BaseModel): """ UEFI model that specifically labels objects from a UEFIExtract report file. This model is different because it includes only the SPI file system included in an SPI file dump type is the type of entry. Examples are "Region, Volume" subtype is the subtype of the entry. Can be blank, can be empty or invalid base is the start of location in memory in HEX Size is the end of location in memory in HEX CRC32 is the calculate crc32 for the file Name is the name of the file path is the path of the file since the dumps are given in a file like structure """ type: str = "" subtype: str = "" base: str = "" size: str = "" crc32: str = "" guid: str | None = "" # Only occasionally exists name: str = "" path: str = "" created: datetime | None = None _es_index_varname: str = PrivateAttr(default="ELASTIC_UEFI_FILES_INDEX") _sort_by_fields: tuple[str] = PrivateAttr(default=("name", "subtype", "type"))
[docs] def gen_elastic_content(self, dev: DeviceData | None = None) -> dict: self.annotate(dev) # populate fields time_now = Elastic.time_now() content = { # @timestamp is the time mentioned above "@timestamp": time_now, "event": {"ingested": Elastic.time_now()}, "uefi": self.dict(exclude_defaults=True), } if dev: # host.{basic fields} content["host"] = dev.gen_base_host_fields_content() if dev._module: content["event"]["module"] = dev._module.__name__ return content
[docs] class Service(BaseModel): """ Communication protocol "service" configured or running on the device. Services can be over a variety of transports, including IP/Ethernet, serial direct, cellular, serial bus, field bus, etc. """ configured_port: conint(ge=0, le=65535) | None = None """ Port the service is *configured* to listen on (for TCP or UDP transports). This field should only be set from values read from a device configuration, e.g. a config file, config dump, project file, etc. It should NOT be set using information from a live port list, scanning, etc. This is intended to supplement the "port" field, e.g. if the listening port differs from what's in the config, that's forensically interesting. .. note:: The value must be between 0 and 65,535. Port 0 is allowed for the ``configured_port`` field, but not the ``port`` field, since there may be cases when it's set to 0 in a config (e.g. to disable). Examples - ``80`` - ``161`` - ``502`` """ enabled: bool | None = None """ If the service is enabled in the device configuration. .. warning:: This can differ from ``status``, don't assume they will match! """ extra: dict = Field(default={}, elastic_type="flattened") """ Additional unstructured information about the service, such as a banner grab, odd behavior, or other miscellaneous data. """ listen_address: str = Field(default="", elastic_type="ip") """ IP address the service is listening on. """ listen_interface: str = "" """ Network interface or serial port the service is listening on. """ process_name: str = "" """ Name of the system process or task associated with the service. """ process_pid: conint(ge=0) | None = None """ Process ID associated with the service. This is the PID of the network service's process. """ port: conint(ge=1, le=65535) | None = None """ Port the service is listening on (for TCP or UDP transports). .. note:: The value must be between 1 and 65,535. While a port of ``0`` is technically accurate, it's not allowed since it's not something that should be seen in the real world, and if it is, then there's probably a bug in PEAT or one of it's modules. Examples - ``80`` - ``161`` - ``502`` """ protocol: constr(strip_whitespace=True, to_lower=True) = "" """ Protocol name of the service. Must be lowercase with underscore separators. Format will be automatically checked and enforced. This is a short name or acronym, not an expanded or colloquial name. Examples - ``http`` - ``snmp`` - ``modbus_tcp`` - ``icmp`` """ protocol_id: constr(strip_whitespace=True) = "" """ Unique protocol identifier for the device, such as the Modbus Unit ID. Examples - ``"10"`` - ``"119"`` """ role: constr(strip_whitespace=True) = "" """ The operational role of the device for a given protocol. """ status: Literal["open", "closed", "verified", ""] = "" """ State of the service. .. note:: ``verified`` means verified over the a live connection, not just read from a configuration file. Instead, the ``enabled`` field should be used to reflect the configuration state. Valid values - ``open``: something is listening, though it may not be the named service - ``closed``: port is not able to be accessed. - ``verified``: service was positively identified (high certainty) - ``""``: the live status is unknown, such as when seen in a configuration or project file parsed offline. """ transport: constr(strip_whitespace=True, to_lower=True) = "" """ :term:`OSI` Layer 4 transport protocol. Examples - ``udp`` - ``tcp`` - ``icmp`` """ _sort_by_fields: tuple[str] = PrivateAttr( default=("port", "protocol", "transport", "status", "enabled") ) # Validators _clean_protocol = validator("protocol", allow_reuse=True)(clean_protocol) _validate_ip = validator("listen_address", allow_reuse=True)(validate_ip)
[docs] def annotate(self, dev: DeviceData | None = None): # auto-populate "transport" field if it's unset and protocol is set if not self.transport and self.protocol: if self.protocol in [ "telnet", "ftp", "http", "https", "modbus_tcp", "postgres", "smtp", ]: self.transport = "tcp" elif self.protocol in ["snmp", "sntp"]: self.transport = "udp" if dev: if self.port and ( self.enabled or self.status in ["open", "verified"] or not self.status ): dev.related.ports.add(self.port) if self.protocol and ( self.enabled or self.status in ["open", "verified"] or not self.status ): dev.related.protocols.add(self.protocol)
[docs] class Interface(BaseModel): """ Communication interface, such as a Ethernet port or Serial link. .. note:: Currently, the ``ip``, ``subnet_mask``, and ``gateway`` fields are assumed to be :term:`IP` version 4 (IPv4). However, they can and will hold IPv6 values in the future when PEAT adds IPv6 support. """ alias: str = "" """ Interface alias as reported by the system, typically used in firewall implementations for e.g. inside, outside, or dmz logical interface naming. """ application: str = "" """ Higher-level communication protocol being used regardless of whether the device is connected via serial or :term:`IP`. This field should be lowercase and without separators, when possible, or with underscore (``_``) separators otherwise. Examples - ``modbus`` - ``dnp3`` - ``sel`` """ connected: bool | None = None """ If the interface is currently connected to something (e.g. carrier signal on Ethernet or connected to a tower for wireless interfaces). """ description: Description = Description() """ Identifying information for the interface's hardware or software, such as vendor, brand, and model. """ duplex: Literal["half", "full", "auto", ""] = "" """ Duplex mode for Ethernet interfaces. Allowed values - half - full - auto - "" (empty string) """ enabled: bool | None = None """ If the interface is enabled in the device's configuration. """ extra: dict = Field(default={}, elastic_type="flattened") """ Additional unstructured information related to the interface, generally this is vendor-specific information. """ name: constr(strip_whitespace=True) = "" """ Interface name, as defined by the device. For example, SEL relays refer to the serial and network ports by names such as ``1``, ``2``, ``3``, or ``F``. Examples - ``PF`` - ``ens0`` - ``eth1`` - ``F`` """ type: str = "" """ The type of physical communication medium the communication interface utilizes. Lowercase, underscore separators. Examples - ``ethernet`` - ``loopback`` - ``point_to_point`` - ``rs_232`` - ``rs_422`` - ``rs_485`` - ``usb`` """ hostname: constr(strip_whitespace=True) = "" """ Hostname of the network interface. Examples - ``some-relay.local`` """ mac: str = "" """ The IEEE 802 standard 48-bit :term:`MAC` address of the interface. This is the current MAC address used by the interface. Only applicable to Ethernet-type interfaces. The MAC address is formatted as a uppercase colon-separated string. Examples - ``00:00:00:FF:FF:FF`` """ mac_vendor: str = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ Vendor name resolved from the :term:`MAC` address :term:`OUI`. This field is auto-populated by PEAT if the ``mac`` field is set. """ mtu: PositiveInt | None = Field(default=None, elastic_type="integer") """ Maximum Transmission Unit (MTU) size configured for the interface. This generally only applies to Ethernet interfaces. """ physical: bool | None = None """ If the interface is a physical interface (e.g. is a port on the device). If false, then it's likely a virtual interface or software-defined. Use the "type" and "description" fields to store additional details. """ promiscuous_mode: bool | None = None """ If the interface is in Promiscuous Mode (passive capture). """ speed: conint(ge=0) | None = Field(default=None, elastic_type="integer") """ Transmission rate of the interface, in Mbps (megabits per second). Example: for Gigabit Ethernet, this would be 1000. """ uptime: timedelta | None = None """ How long the interface has been connected, in milliseconds or as a :class:`~datetime.timedelta` instance. NOTE: normal integers can be assigned to this! (e.g. ``iface.uptime = 123``) """ hardware_mac: str = "" """ The hardware :term:`MAC` address of the interface. This is intrinsic to the physical :term:`NIC`, and may differ from the :term:`MAC` address currently in use by the interface. Only applicable to Ethernet-type interfaces. The MAC address is formatted as a uppercase colon-separated string. Examples - ``00:00:00:FF:FF:FF`` """ id: constr(strip_whitespace=True) = "" """ Identifier for the interface. The meaning of this value is device-dependent. """ ip: str = Field(default="", elastic_type="ip") """ The :term:`IP` address of the interface. This is usually applicable to Ethernet-type interfaces, but could be applicable to Serial interfaces as well (e.g. on SEL devices). Examples - ``192.0.2.123`` """ subnet_mask: str = Field(default="", elastic_type="ip") """ :term:`IP` subnet mask of the interface. Examples - ``255.255.255.0`` - ``255.255.255.192`` """ gateway: str = Field(default="", elastic_type="ip") """ IPv4 address of the default gateway of the interface. Examples - ``192.0.2.1`` """ serial_port: constr(strip_whitespace=True) = "" """ Serial port on the local system connected to the device. This could be a Windows COM port, e.g. ``COM4``, or a Linux file path, e.g. ``/dev/ttyS0``. This is also used for USB connections. Examples - ``COM4`` - ``/dev/ttyS0`` - ``/dev/ttyUSB0`` """ baudrate: PositiveInt | None = Field(default=None, elastic_type="integer") """ Data rate for a serial link. Examples - ``56700`` """ data_bits: conint(ge=0) | None = Field(default=None, elastic_type="byte") """ Number of data bits for a serial link. Examples - ``8`` """ parity: Literal["none", "even", "odd", ""] = "" """ Parity setting for a serial link. Allowed values - none - even - odd - "" (empty string) """ stop_bits: conint(ge=0) | None = Field(default=None, elastic_type="byte") """ Number of stop bits for a serial link. Examples - 0 - 1 """ flow_control: str = "" """ Flow control setting for a serial link. Should be ``none`` or ``rts/cts`` in most cases. Examples - none - rts/cts """ services: list[Service] = [] """ Communication protocols configured or running on the interface. """ version: str = "" """ Version of the interface's firmware or software. """ _sort_by_fields: tuple[str] = PrivateAttr( default=("name", "type", "ip", "serial_port", "id", "application") ) # Validators _clean_str = validator("application", "type", "parity", "flow_control", allow_reuse=True)( cleanstr ) _validate_ip = validator("ip", "subnet_mask", "gateway", allow_reuse=True)(validate_ip) _validate_mac = validator("mac", "hardware_mac", allow_reuse=True)(validate_mac) _strip_quotes = validator("description", allow_reuse=True)(strip_quotes)
[docs] def annotate(self, dev: DeviceData | None = None): # Resolve host if not set OR if IP is changed and host is not if config.RESOLVE_HOSTNAME and (self.ip and not self.hostname): self.hostname = addresses.resolve_ip_to_hostname(self.ip) # Resolve IP from MAC (may make an ARP request) if config.RESOLVE_IP and self.mac and not self.ip: self.ip = mac_to_ip(self.mac) # Resolve IP from hostname (may make a DNS request or broadcast) if config.RESOLVE_IP and self.hostname and not self.ip: self.ip = addresses.resolve_hostname_to_ip(self.hostname) # Resolve MAC from IP if config.RESOLVE_MAC and self.ip and not self.mac: self.mac = addresses.clean_mac(ip_to_mac(self.ip)) if self.mac and not self.mac_vendor: self.mac_vendor = mac_to_vendor_string(self.mac) # Add to the host's "related" fields if dev: if self.ip and self.ip not in dev.related.ip: dev.related.ip.add(self.ip) if self.mac and self.mac not in dev.related.mac: dev.related.mac.add(addresses.clean_mac(self.mac)) if self.hardware_mac and self.hardware_mac not in dev.related.mac: dev.related.mac.add(addresses.clean_mac(self.hardware_mac)) if self.gateway and self.gateway not in dev.related.ip: # sanity check if utils.is_ip(self.gateway): dev.related.ip.add(self.gateway) if self.hostname and self.hostname not in dev.related.hosts: dev.related.hosts.add(self.hostname)
[docs] class Register(BaseModel): """ Configured I/O protocol data point ("registers"), e.g. DNP3 or Modbus/TCP. """ address: constr(strip_whitespace=True) = "" """ Address of the data. Tells protocol parser how to identify a data field in a packet. A number, string, or more complex identifier. For Object Oriented protocols, this field flattens the data_address. Examples - ``12`` - ``123456`` - ``pump-jack-six-example`` - ``device-example_1234_trend-log`` """ data_type: str = "" """ Data type of the register. Tells the user or code reading our data how to interpret the field. Format: Lowercase, underscore-separated string. Examples - ``float_16`` - ``string`` - ``int_32`` """ description: str = Field(default="", elastic_type="text") """ Human-readable description of the register (some device configurations or project files have this). Examples - ``"Intake Fuel - Valve 1 - Second Boiler"`` """ enabled: bool | None = None """ If the register is considored to be "enabled", e.g. has a valid configuration or is otherwise enabled for use on the device. """ extra: dict = Field(default={}, elastic_type="flattened") """ Additional metadata for the register. """ group: constr(strip_whitespace=True) = "" """ Logical mapping or settings group (e.g. on SEL relays) associated with the Register. Examples: - D1 - D3 - M - DNPA """ io: constr(strip_whitespace=True) = "" """ I/O point it's attached to (e.g. protocol register or physical I/O). This allows direct reference to an IO object without requiring a Tag. Examples - ``rtu-8_I0`` """ measurement_type: constr(strip_whitespace=True) = "" """ Type of information the register is tracking (e.g analog I/O, Discrete I/O). Tells analytic which algorithms to deploy. For example, in Modbus a 16-bit register can track an event count (Discrete), a temperature (analog), or could be a set of 16 Boolean flags (alarms). Examples - ``analog`` - ``binary`` """ name: constr(strip_whitespace=True) = "" """ Name or unique descriptor of the register (if different from the address). Examples - ``AI_99`` - ``MOD_005`` """ protocol: str = "" """ The Parser uses this to distinguish protocols. Not all vendors follow the protocol spec. To indicate if this is a vendor- specific deviation from the standard, use the syntax ``[protocol]_[device or vendor name]``. Examples - ``dnp3`` - ``modbus`` """ read_write: Literal["read", "write", "read_write", ""] = "" """ Direction of information flow. Is register read, write, or both? **Allowed values** - ``read`` - ``write`` - ``read_write`` """ tag: constr(strip_whitespace=True) = "" """ Register tag given in config file. Provides analytic with some register context. May be a human-readable display name. Examples - ``valve_1`` """ _es_index_varname: str = PrivateAttr(default="ELASTIC_REGISTERS_INDEX") _sort_by_fields: tuple[str] = PrivateAttr( default=("protocol", "group", "measurement_type", "tag", "io", "read_write") ) # Validators _cleanstr = validator("protocol", "data_type", allow_reuse=True)(cleanstr) _clean_protocol = validator("protocol", allow_reuse=True)(clean_protocol) _strip_quotes = validator("description", allow_reuse=True)(strip_quotes) def __lt__(self, other): """ Allows for sorting of objects. """ return (self.protocol, self.measurement_type, self.address, self.tag) < ( other.protocol, other.measurement_type, other.address, other.tag, )
[docs] def gen_elastic_content(self, dev: DeviceData | None = None) -> dict: self.annotate(dev) # populate fields # Hacky way to make a nice fancy message describing the Register message = "" if self.description and self.protocol and self.tag: message = f"{self.protocol} - {self.tag} - {self.description}" elif self.description and self.protocol: message = f"{self.protocol} - {self.description}" elif self.description and self.tag: message = f"{self.tag} - {self.description}" elif self.description: message = self.description else: if self.protocol: message += self.protocol if self.tag: if message: message += " - " message += self.tag if self.read_write and len(message) < 80: if message: message += " - " message += self.read_write if self.measurement_type and len(message) < 80: if message: message += " - " message += self.measurement_type if self.data_type and len(message) < 80: if message: message += " - " message += self.data_type time_now = Elastic.time_now() # TODO: include service information related to the protocol? # e.g. if DNP3, include info from DNP3 Service in dev.services content = { "@timestamp": time_now, "message": message, "tags": ["register"], "event": {"ingested": time_now}, "register": self.dict(exclude_defaults=True), } if dev: # host.{basic fields} content["host"] = dev.gen_base_host_fields_content() content["tags"].extend( [ dev.description.vendor.name, dev.description.product, ] ) if dev._module: content["event"]["module"] = dev._module.__name__ return content
[docs] class Tag(BaseModel): """ Variable in a :term:`ICS`/:term:`OT` device, often mapping to physical I/O. These are commonly stored in a "tag database" in a :term:`SCADA` system or the configuration of a device. """ address: constr(strip_whitespace=True) = "" """ Address of the tag. Examples - ``29`` """ description: str = Field(default="", elastic_type="text") """ Human-readable description of the tag. """ io: constr(strip_whitespace=True) = "" """ I/O point it's attached to (e.g. protocol register or physical I/O). Examples - ``rtu-8_I0`` """ name: constr(strip_whitespace=True) = "" """ Tag name or label (e.g. how it's referenced). Examples - ``var_rtu-8_I0`` """ type: str = "" """ Data type of the tag, lowercase and underscore-separated. Examples - ``analog`` - ``binary`` """ _es_index_varname: str = PrivateAttr(default="ELASTIC_TAGS_INDEX") _sort_by_fields: tuple[str] = PrivateAttr(default=("type", "address", "name", "io")) # Validators _cleanstr = validator("type", allow_reuse=True)(cleanstr) _strip_quotes = validator("description", allow_reuse=True)(strip_quotes) def __lt__(self, other): """Allows for sorting of objects.""" return (self.type, self.name, self.address) < ( other.type, other.name, other.address, )
[docs] def gen_elastic_content(self, dev: DeviceData | None = None) -> dict: self.annotate(dev) # populate fields # Hacky way to make a nice fancy message describing the Tag message = "" if self.description and self.name: message = f"{self.name} - {self.description}" elif self.description and self.io: message = f"{self.io} - {self.description}" elif self.description: message = self.description else: if self.name: message += self.name elif self.io: message += self.io if self.type and len(message) < 80: if message: message += " - " message += self.type time_now = Elastic.time_now() content = { "@timestamp": time_now, "message": message, "tags": ["tag"], "event": {"ingested": time_now}, "tag": self.dict(exclude_defaults=True), } if dev: # host.{basic fields} content["host"] = dev.gen_base_host_fields_content() content["tags"].extend( [ dev.description.vendor.name, dev.description.product, ] ) if dev._module: content["event"]["module"] = dev._module.__name__ return content
[docs] class IO(BaseModel): """ Physical Input/Output (I/O) connections on a device. Physical I/O points are distinct from :class:`~peat.data.models.Register`, which handle communication protocols and may not necessarily map to physical I/O. Physical I/O points are typically referenced by a :class:`~peat.data.models.Tag`, though this may not always be the case. On module-based devices like a :term:`PLC`, Physical I/O points may be associated with a module, however this may not always be the case. """ address: constr(strip_whitespace=True) = "" """ Address of the I/O point (if applicable). Examples - ``29`` """ description: str = Field(default="", elastic_type="text") """ Human-readable description of the I/O point. """ direction: Literal["input", "output", ""] = "" """ Direction of the I/O point. **Allowed values** - ``input`` - ``output`` """ extra: dict = Field(default={}, elastic_type="flattened") """ Additional metadata for the I/O point. """ id: constr(strip_whitespace=True) = "" """ ID of the I/O point. Examples - ``rtu-1_I16`` - ``O0`` """ name: constr(strip_whitespace=True) = "" """ I/O point name or label (typically referenced by a :class:`~peat.data.models.Tag`). Examples - ``var_rtu-1_I16`` """ type: constr(strip_whitespace=True) = "" """ Data type of the I/O point. Possible values are device-dependent. Examples - ``analog`` - ``binary`` - ``EBOOL`` - ``DATE`` """ slot: list[constr(strip_whitespace=True)] = [] """ Slot number(s) of the module(s) the point is associated with, if any. """ _es_index_varname: str = PrivateAttr(default="ELASTIC_IO_INDEX") _sort_by_fields: tuple[str] = PrivateAttr( default=("name", "direction", "id", "address", "type") ) # Validators _cleanstr = validator("direction", allow_reuse=True)(cleanstr) _strip_quotes = validator("description", allow_reuse=True)(strip_quotes) def __lt__(self, other): """Allows for sorting of objects.""" return (self.type, self.direction, self.name, self.id, self.address) < ( other.type, other.direction, other.name, self.id, other.address, )
[docs] def gen_elastic_content(self, dev: DeviceData | None = None) -> dict: self.annotate(dev) # populate fields # Hacky way to make a nice fancy message describing the IO point message = "" if self.description and self.name: message = f"{self.name} - {self.description}" elif self.description and self.id: message = f"{self.id} - {self.description}" elif self.description: message = self.description else: if self.name: message += self.name elif self.id: message += self.id if self.type and len(message) < 80: if message: message += " - " message += self.type if self.direction and len(message) < 80: if message: message += " - " message += self.direction time_now = Elastic.time_now() content = { "@timestamp": time_now, "message": message, "tags": ["io"], "event": {"ingested": time_now}, "io": self.dict(exclude_defaults=True), } if dev: # host.{basic fields} content["host"] = dev.gen_base_host_fields_content() content["tags"].extend( [ dev.description.vendor.name, dev.description.product, ] ) if dev._module: content["event"]["module"] = dev._module.__name__ return content
[docs] class LatLon(BaseModel): """ Latitude and Longitude (geographical coordinates). """ # NOTE: floats in JSON schema are "number", so we define ES type here as "double" lat: confloat(ge=-90.0, le=90.0) | None = Field( default=None, title="Latitude", elastic_type="double" ) """ Latitude. """ lon: confloat(ge=-180.0, le=180.0) | None = Field( default=None, title="Longitude", elastic_type="double" ) """ Longitude. """
[docs] class Geo(BaseModel): """ Geolocation information (the device's physical location). """ city_name: constr(strip_whitespace=True) = "" """ Name of the city where the device is physically located. Examples - ``Albuquerque`` """ country_name: constr(strip_whitespace=True) = "" """ Name of the country where the device is physically located, in whatever form is reasonable. Examples - ``USA`` - ``United States of America`` - ``Canada`` """ location: LatLon = Field(default=LatLon(), elastic_type="geo_point") """ Latitude ("lat") and Longitude ("lon") of the device's physical location. """ name: constr(strip_whitespace=True) = "" """ Custom location name, as retrieved from the device. Examples - ``abq-dc`` - ``1st floor network closet`` """ timezone: constr(strip_whitespace=True) = "" """ Timezone configured for the device. Acceptable timezone formats are: a canonical ID (e.g. ``America/Denver``) or abbreviated (e.g. ``EST``). Canonical ID is preferred for PEAT. Examples - ``America/Denver`` - ``Etc/UTC`` - ``EST`` - ``MST`` - ``UTC`` """
[docs] class Event(BaseModel): """ Device log entry, such as logins, metering reads, or system events. """ action: constr(strip_whitespace=True, to_lower=True) = Field( default="", elastic_type=KEYWORD_AND_TEXT ) """ Type of event. Examples - ``alarm`` """ category: set[str] = set() """ :term:`ECS` category of the event, out of the `allowed values defined by ECS <https://www.elastic.co/guide/en/ecs/current/ecs-allowed-values-event-category.html>`__. This is a set of values, and is an array in Elasticsearch, which allows for Kibana queries such as ``host.event.type:alert and host.event.category:authentication``. Allowed values - ``authentication`` - ``configuration`` - ``database`` - ``driver`` - ``file`` - ``host`` - ``iam`` - ``intrusion_detection`` - ``malware`` - ``network`` - ``package`` - ``process`` - ``registry`` - ``session`` - ``web`` """ created: datetime | None = None """ When the event occurred. """ dataset: constr(strip_whitespace=True) = "" """ What log the event came from. This is especially important on devices with multiple log types. Examples - ``metering_reads`` """ extra: dict = Field(default={}, elastic_type="flattened") """ Other event metadata that doesn't fit anywhere in the model, but is still worth capturing. """ hash: Hash = Hash() """ Hash of raw field to be able to demonstrate log integrity. """ id: str = "" """ Unique identifier for the Event, if any. """ ingested: datetime | None = None """ When the event was generated by PEAT, e.g. when it was parsed or pulled from a device. .. warning:: This should almost always differ from ``created`` and the two should NOT be confused. """ kind: set[str] = set() """ Gives high-level information about what type of information the event contains, without being specific to the contents of the event. For example, values of this field distinguish alert events from metric events. `Further reading <https://www.elastic.co/guide/en/ecs/current/ecs-allowed-values-event-kind.html>`__ This is a set of values, and is an array in Elasticsearch, which allows for Kibana queries such as ``host.event.kind:event and host.event.type:deleted``. Allowed values - ``alert`` - ``event`` - ``metric`` - ``state`` - ``pipeline_error`` : Used for indicating there was an error processing the event """ message: constr(strip_whitespace=True) = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ Simplified message body, for example a human-readable portion of the raw event. This should be set *in addition to* setting the ``original`` field. """ module: constr(strip_whitespace=True) = "" """ Name of the module this data is coming from, e.g. the PEAT module. """ original: str = Field(default="", elastic_type="text") """ Original raw text of the log entry. """ outcome: constr(strip_whitespace=True) = "" """ Outcome of the event. `Further reading <https://www.elastic.co/guide/en/ecs/current/ecs-allowed-values-event-outcome.html>`__ Allowed values - ``success`` - ``failure`` - ``unknown`` """ provider: constr(strip_whitespace=True) = "" """ Source of the event. This is almost always the Device ID. """ sequence: int | None = None """ Sequence number of the event. The sequence number is a value published by some event sources, to make the exact ordering of events unambiguous, regardless of the timestamp precision. """ severity: constr(strip_whitespace=True) = "" """ Severity or log level of the event as stored on the device. Examples - ``debug`` - ``ERR`` """ timezone: constr(strip_whitespace=True) = "" """ Timezone for the event. This field should be populated when the event's timestamp does not include timezone information already. It's optional otherwise. .. note:: This field will be auto-populated from the device's timezone field (DeviceData.geo.timezone), if the timestamp isn't timezone-aware and the device's timezone is known. Acceptable timezone formats are: a canonical ID (e.g. ``Europe/Amsterdam``) or abbreviated (e.g. ``EST``). Canonical ID is preferred for PEAT. Examples - ``Europe/Amsterdam`` - ``America/Denver`` - ``Etc/UTC`` - ``EST`` - ``MST`` - ``UTC`` """ type: set[str] = set() """ List of event category "sub-buckets" the event falls under. The valid values depend on the value for ``category``, refer to the `ECS documentation for <https://www.elastic.co/guide/en/ecs/current/ecs-allowed-values-event-category.html>`__ details. This is a set of values, and is an array in Elasticsearch, which allows for Kibana queries such as ``host.event.type:user and host.event.type:deleted``. Allowed values (refer to the `ECS documentation <https://www.elastic.co/guide/en/ecs/current/ecs-allowed-values-event-type.html>`__) - ``access`` - ``admin`` - ``allowed`` - ``change`` - ``connection`` - ``creation`` - ``deletion`` - ``denied`` - ``end`` - ``error`` - ``group`` - ``info`` - ``installation`` - ``protocol`` - ``start`` - ``user`` """ _es_index_varname: str = PrivateAttr(default="ELASTIC_EVENTS_INDEX") _sort_by_fields: tuple[str] = PrivateAttr( default=("provider", "dataset", "sequence", "created") ) # Validators _validate_ecs = validator( "category", "kind", "outcome", "type", each_item=True, allow_reuse=True )(validate_ecs)
[docs] def annotate(self, dev: DeviceData | None = None): if dev: # If provider isn't set, set it to the device's ID if not self.provider and dev.ip: self.provider = dev.ip elif not self.provider and dev.id: self.provider = dev.id # Set the module if not set if not self.module and dev._module: self.module = dev._module.__name__ # If timezone isn't set, set to the timezone of the timestamp if # the timestamp is present and timezone-aware. # If it's not timezone-aware, then set it to the device's # timezone, if it's set. if not self.timezone: if self.created and self.created.tzinfo: self.timezone = self.created.tzname() elif dev and dev.geo.timezone: self.timezone = dev.geo.timezone # Add "event" to kind if nothing has been added if not self.kind: self.kind = {"event"} elif self.kind and "event" not in self.kind: self.kind.add("event")
[docs] def gen_elastic_content(self, dev: DeviceData | None = None) -> dict: self.annotate(dev) # populate fields time_now = Elastic.time_now() timestamp = time_now if self.created is not None: timestamp = Elastic.convert_tstamp(self.created) ingested = time_now if self.ingested is not None: ingested = Elastic.convert_tstamp(self.ingested) content = { "@timestamp": timestamp, "message": self.message if self.message else self.original, "tags": ["events"], "event": { # event.* **self.dict(exclude_defaults=True), # event.ingested "ingested": ingested, }, } if self.original: content["hash"] = utils.gen_hashes(self.original) if dev: # host.{basic fields} content["host"] = dev.gen_base_host_fields_content() content["tags"].extend( [ dev.description.vendor.name, dev.description.product, ] ) return content
[docs] class OS(BaseModel): """ Operating System (OS) information, such as the name and version. """ family: constr(strip_whitespace=True, to_lower=True) = "" """ Operating system family, such as Debian, Windows, etc. Lowercase value. This can be general (e.g. ``linux``) or specific (e.g. ``debian``). Examples - ``debian`` - ``windows`` - ``linux`` """ full: constr(strip_whitespace=True) = "" """ Full operating system name, including the version or code name. Examples - ``WindRiver VxWorks 7`` """ kernel: constr(strip_whitespace=True) = "" """ Operating system kernel version as a raw string. Examples: - ``4.4.0-112-generic`` """ name: constr(strip_whitespace=True) = "" """Operating system name, without the version. Examples - ``VxWorks``, ``Linux`` """ timestamp: datetime | None = None """ Timestamp of the OS, as extracted from the device or firmware. Device-dependent meaning. Often represents when the OS was compiled/built or released. """ vendor: Vendor = Vendor() """ The vendor of the OS, if known. """ version: constr(strip_whitespace=True) = "" """ Operating system version as a raw string. """
[docs] class Memory(BaseModel): """ Physical memory values (e.g. :term:`RAM`, EEPROM). """ address: str = "" """ Starting address of the read, as a hexadecimal string. This should be zero-padded hex bytes, without a leading hex identifier, and uppercase characters. Examples: - ``00000003`` - ``D3ADB33F`` """ created: datetime | None = None """ When the read occurred. Represents when in time the memory address had the value. """ dataset: constr(strip_whitespace=True) = "" """ Data source of the memory read, such as the memory region or log it was extracted from, if applicable. Examples - ``watchdog_log`` - ``internal_memory`` - ``RAM`` - ``EEPROM`` """ device: constr(strip_whitespace=True) = "" """ Device that was the source of the read. This is almost always the device ID. """ process: constr(strip_whitespace=True) = "" """ Name of the system process or task this memory read is associated with. """ size: int | None = None """ Size of the memory read, in bytes. """ value: str = "" """ The value read from memory, as a hexadecimal string. Each hex pair (e.g. ``3f``) represents 1 byte. The length of this string should be twice the value of ``size`` (size*2). This should be zero-padded hex bytes, without a leading hex identifier, and uppercase characters. Examples: - ``00000003`` - ``D3ADB33F`` """ extra: dict = Field(default={}, elastic_type="flattened") """ Additional data or metadata about the memory read. """ _es_index_varname: str = PrivateAttr(default="ELASTIC_MEMORY_INDEX") _sort_by_fields: tuple[str] = PrivateAttr(default=("device", "dataset", "address", "created")) # Validators _validate_hexes = validator("address", "value", allow_reuse=True)(validate_hex)
[docs] def annotate(self, dev: DeviceData | None = None): if dev: # Automatically set the device field if it's not already if not self.device: dev_id = dev.get_id() if "unknown-dev-" not in dev_id: self.device = dev_id
[docs] def gen_elastic_content(self, dev: DeviceData | None = None) -> dict: self.annotate(dev) # populate fields # Hacky way to make a nice fancy message describing the Memory message = "" if self.dataset: message += f"[{self.dataset}] " message += f"0x{self.address}: 0x{self.value}" time_now = Elastic.time_now() timestamp = time_now if self.created is not None: timestamp = Elastic.convert_tstamp(self.created) content = { "@timestamp": timestamp, "message": message, "tags": ["memory"], "event": {"ingested": time_now}, "memory": self.dict(exclude_defaults=True), } if dev: # host.{basic fields} content["host"] = dev.gen_base_host_fields_content() content["tags"].extend( [ dev.description.vendor.name, dev.description.product, ] ) if dev._module: content["event"]["module"] = dev._module.__name__ return content
[docs] class SSHKey(BaseModel): """ SSH keys (public or private). """ description: constr(strip_whitespace=True) = "" """ Description of the SSH key and/or any comments. """ file: File = File() """ The file associated with the key, if any. """ host: constr(strip_whitespace=True) = "" """ Host associated with the key (hostname, DNS name, or IP). """ id: constr(strip_whitespace=True) = "" """ Unique identifier for the key, if any. """ original: constr(strip_whitespace=True) = "" """ Complete contents of the key, with any trailing whitespace removed. """ type: Literal["public", "public", ""] = "" """ Type of key, either ``public`` or ``private``. """ user: constr(strip_whitespace=True) = "" """ Name of user associated with the key. """ _sort_by_fields: tuple[str] = PrivateAttr( default=("host", "user", "type", "id", "description", "original") )
[docs] def annotate(self, dev: DeviceData | None = None): if not dev: return if self.file.path: dev.related.files.add(self.file.path.as_posix()) elif self.file.name: dev.related.files.add(self.file.name) if self.user: dev.related.user.add(self.user) if self.host: if utils.is_ip(self.host): dev.related.ip.add(self.host) else: dev.related.hosts.add(self.host)
[docs] class DeviceData(BaseModel): """ Container and manager of all data about a device, e.g. name, :term:`IP` address, version, etc. .. note:: If unset, the :attr:`~peat.data.models.DeviceData.id` attribute on this object will be set to the IP of the first Interface added via :meth:`~peat.data.models.DeviceData.store`. There are two main methods of storing and retrieving data: - Directly via class attributes. This should be used for most operations. - Via :meth:`~peat.data.models.DeviceData.store` and :meth:`~peat.data.models.DeviceData.retrieve`. These are used for containers of objects, e.g. ``dev.interface`` or ``dev.event"``. .. note:: See the documentation for :meth:`~peat.data.models.DeviceData.store` and :meth:`~peat.data.models.DeviceData.retrieve` for detailed examples of how to use those methods. Storing data: - Direct assignment: ``dev.os.version = "7"`` - Storing to a list: ``dev.store("interface", Interface(ip="192.0.2.10"))`` Reading data: - General data: ``value = dev.os.version`` - List of data: ``value = dev.retrieve("interface", {"ip": "192.0.2.10"})`` Methods for exporting data: - :meth:`~peat.data.models.DeviceData.export` - :meth:`~peat.data.models.DeviceData.export_summary` - :meth:`~peat.data.models.DeviceData.elastic` - :meth:`~peat.data.models.DeviceData.dict` - :meth:`~peat.data.models.DeviceData.json` - :meth:`~peat.data.models.DeviceData.export_to_elastic` - :meth:`~peat.data.models.DeviceData.export_to_files` .. note:: The device can be module or component of a larger system, e.g. a module in a :term:`PLC` or a wireless add-on module on a power meter. The :attr:`~peat.data.models.DeviceData.module` field is an example of this use case (a :class:`list` of :class:`~peat.data.models.DeviceData`). """ successful_pulls: dict = Field(default={}) """ Indicates the success of the peat pull per protocol """ architecture: constr(strip_whitespace=True) = "" """ Architecture of the device :term:`CPU`. """ boot_firmware: Firmware = Firmware() """ Boot firmware information, if applicable. """ description: Description = Description() """ Identifying information such as vendor, brand, and model. """ endian: Literal["big", "little", ""] = "" """ "Endianness" of the CPU of the system where the memory was read from. """ firmware: Firmware = Firmware() """ Device firmware information. """ hardware: Hardware = Hardware() """ Information about the device's hardware specifications and configuration (RAM, storage, etc.). """ hostname: constr(strip_whitespace=True) = "" """ Hostname of the device (if resolved). In the case of a device with multiple communication modules, this is the hostname of the module PEAT primarily uses to communicate (or first discovered). """ id: constr(strip_whitespace=True) = "" """ Unique identifier for the device. Can be anything, as long as it's consistent in the module. Defaults to the device MAC, IP, or COM port. """ ip: str = Field(default="", elastic_type="ip") """ :term:`IP` address of the device. In the case of a device with multiple communication modules, this is the IP address of the module PEAT primarily uses to communicate (or first discovered). """ mac: str = "" """ :term:`MAC` address of the device. In the case of a device with multiple communication modules, this is the :term:`MAC` address of the module PEAT primarily uses to communicate (or first discovered). """ mac_vendor: str = Field(default="", elastic_type=KEYWORD_AND_TEXT) """ Vendor name resolved from the :term:`MAC` address :term:`OUI`. This field is auto-populated by PEAT if the ``mac`` field is set. """ serial_port: constr(strip_whitespace=True) = "" """ Serial port on the local system connected to the device. This could be a Windows COM port, e.g. ``COM4``, or a Linux file path, e.g. ``/dev/ttyS0``. This is also used for USB connections. To get the specific serial settings, lookup the interface with the matching port in ``data.interface``. """ name: constr(strip_whitespace=True) = "" """ Name to refer to the device as, e.g. as pulled from a config or resolved via :term:`DNS`. Defaults to :term:`FQDN` resolved from the IP address, if hostname resolutions are enabled in the PEAT configuration. """ label: str = "" """ User-specified label from the PEAT configuration file. This field is automatically set by PEAT, and device modules shouldn't write to this field. """ comment: str = "" """ User-specified comment from the PEAT configuration file. This field is automatically set by PEAT, and device modules shouldn't write to this field. """ part_number: constr(strip_whitespace=True) = "" """ Part number of the device, as defined by the vendor and stored on the device. """ type: constr(strip_whitespace=True) = "" """ The type/class of device, e.g. "PLC", "Relay", "RTU", "Controller" (catch-all), etc. Examples of type for a module include Communications Adapter, General Purpose Discrete I/O, or CPU. """ serial_number: constr(strip_whitespace=True) = "" """ Unique serial number of the device, as defined by the vendor and stored on the device. """ manufacturing_date: datetime | None = None """ When the device was manufactured (physically created). """ run_mode: constr(strip_whitespace=True) = "" """ Run mode of the device. For example, on a PLC, there may be a key in the front of the device that sets PROG or RUN (program vs running). What this field means depends on the device, for instance a PLC's potential run modes will differ from a RTU's potential run modes. """ slot: constr(strip_whitespace=True) = "" """ Position of the device in a rack or larger device. This can be a relative position, e.g. "0" for the first module in a :term:`PLC`, or a name or other identifier for the position (such as an internal bus address). """ start_time: datetime | None = None """ :term:`UTC` timestamp of when the device last powered on. """ status: constr(strip_whitespace=True) = "" """ Status of the device. The meaning of this field is device-dependant. """ uptime: timedelta | None = None """ Number of seconds the host has been up (powered on/online), as either a integer or :class:`~datetime.timedelta`. .. note:: Normal Python integers (:class:`int`) can be assigned to this directly and they will be automatically converted to a :class:`~datetime.timedelta`. """ os: OS = OS() """ Operating System (OS) information, such as the name and version. """ geo: Geo = Geo() """ Geolocation information. This includes the device's physical location and configured timezone. """ logic: Logic = Logic() """ What the device has been programmed to do, aka the "process logic". """ files: list[File] = [] """ All files that are present on the device, or were present at some point in time. """ interface: list[Interface] = [] """ All communication interfaces configured or present on the device. """ service: list[Service] = [] """ All communication services configured or running on the device. """ ssh_keys: list[SSHKey] = [] """ Any SSH keys found on the device or associated with the device. """ related: Related = Related() """ Information that is related to a host or interface. """ # NOTE: "register" shadows a field from pydantic.BaseModel # It has been renamed to "registers" to avoid conflict registers: list[Register] = [] """ All Input/Output (I/O) protocol data points configured on the device, e.g. DNP3 and Modbus. """ tag: list[Tag] = [] """ Data variables ("tags") in a device. Often mapped in a device's logic to physical I/O and/or registers. """ io: list[IO] = [] """ Physical Input/Output (I/O) connections on a device. """ event: list[Event] = [] """ Event log entries on the device, aggregated from all log sources. """ memory: list[Memory] = [] """ Physical memory values (e.g. RAM, EEPROM). """ module: list[DeviceData] = [] """ Physical add-on modules in a device, e.g. slots in a :term:`PLC` or rack. These also include add-on components, such as a wireless radio. These can include analog and digital I/O modules, COMMs modules (Ethernet, various serial protocols, Wi-Fi, LTE, etc.), CPU modules, and anything else really. While there are general sorts of modules that are typically seen in devices like a :term:`PLC`, the reality is there are a ton of modules that sometimes highly specific to a vendor or application. Therefore, while we define a set of module types, they are not required to be used if the module does not fall in the set of defined types. """ users: list[User] = [] """ Users on the device. """ x509: X509 = X509() """ x509 certificate associated with the device, e.g. from a HTTPS/TLS service or extracted from a file in a blob. """ uefi_image: list[UEFIFile] = [] """ uefi_image holds all the spi files for the UEFIFile object """ uefi_hashes: list[UEFIHash] = [] """ uefi_hashes holds all the hashes for a file that is linked to a device. """ # NOTE: trying out "flattened" to prevent type mapping explosion # https://www.elastic.co/guide/en/elasticsearch/reference/current/flattened.html extra: dict = Field(default={}, elastic_type="flattened") """ Additional vendor/model-specific information that doesn't currently fit into the defined model, but may be useful and we don't want to leave on the cutting room floor, so to speak. In other words: a piece of data belongs here if it's useful and doesn't fit elsewhere in the model. """ _cache: dict = PrivateAttr(default={}) """ Cache for internal usage by device modules. Can include data such as Python sockets, protocol classes (e.g. :class:`~peat.protocols.ftp.FTP`) or other objects or state. """ _es_index_varname: str = PrivateAttr(default="ELASTIC_HOSTS_INDEX") _sort_by_fields: tuple[str] = PrivateAttr( default=( "slot", "ip", "id", "name", "serial_port", "mac", "hostname", "serial_number", ) ) _module: type | None = PrivateAttr(default=None) """ :class:`~peat.device.DeviceModule` class used for this device (generally set after identification). """ _out_dir: Path | None = PrivateAttr(default=None) """ Output directory for any file output and results associated with this device. Don't use this directly, instead use :func:`~peat.data.DeviceData.get_out_dir`, :func:`~peat.data.DeviceData.get_sub_dir`, or :func:`~peat.data.DeviceData.write_file`. """ _options: DeepChainMap = PrivateAttr() """ Master ChainMap combining all of the device options. .. warning:: Don't access this, :attr:`peat.data.DeviceData.options`. Modifications to this will show up in :attr:`peat.data.DeviceData.options`. NOTE: to change values, either add individual fields or use ``.update({...})``. !!! DON'T DIRECTLY ASSIGN A NEW DICT TO THIS FIELD, this will break pydantic !!! """ _runtime_options: dict = PrivateAttr(default={}) """ Options set or overridden at runtime. .. warning:: Don't access this, :attr:`peat.data.DeviceData.options`. Modifications to this will show up in :attr:`peat.data.DeviceData.options`. NOTE: to change values, either add individual fields or use ``.update({...})``. !!! DON'T DIRECTLY ASSIGN A NEW DICT TO THIS FIELD, this will break pydantic !!! """ _host_option_overrides: dict = PrivateAttr(default={}) """ Overrides for this particular host. .. warning:: Don't access this, :attr:`peat.data.DeviceData.options`. Modifications to this will show up in :attr:`peat.data.DeviceData.options`. NOTE: to change values, either add individual fields or use ``.update({...})``. !!! DON'T DIRECTLY ASSIGN A NEW DICT TO THIS FIELD, this will break pydantic !!! """ _module_default_options: dict = PrivateAttr(default={}) """ Default options injected from a module. .. warning:: Don't access this, :attr:`peat.data.DeviceData.options`. Modifications to this will show up in :attr:`peat.data.DeviceData.options`. NOTE: to change values, either add individual fields or use ``.update({...})``. !!! DON'T DIRECTLY ASSIGN A NEW DICT TO THIS FIELD, this will break pydantic !!! """ _last_module: str = PrivateAttr(default="") """ Used by :attr:`peat.data.DeviceData.options`. """ _is_active: bool = PrivateAttr(default=False) """ If the device is active and responding. """ _is_verified: bool = PrivateAttr(default=False) """ If the device has been verified as a specific type. """ _is_deduplicated: bool = PrivateAttr(default=False) """ If the device has already had :meth:`~peat.datamodels.DeviceData.purge_duplicates` called on it. """ _ID_KEY_ORDER: list[str] = [ "label", "ip", "name", "hostname", "mac", "serial_port", "id", "part_number", ] """ Device identification precedence with a focus on being human-friendly. """ _COMM_ID_KEY_ORDER: list[str] = [ "serial_port", "ip", "hostname", "mac", "name", "id", ] """ Device identifier precedence order with a focus on getting a communication ID. """ _DEFAULT_OPTIONS: dict = PrivateAttr(default=DEFAULT_OPTIONS) """ Default options for all devices. These get set from ``peat/data/default_options.py`` """ # Validators _validate_ip = validator("ip", allow_reuse=True)(validate_ip) _validate_mac = validator("mac", allow_reuse=True)(validate_mac) def __init__(self, **data): super().__init__(**data) # BUGFIX: import here to make imports work with ipython notebooks from peat import module_api module_defaults = [ # deepcopy to prevent changes to self._options from modifying source deepcopy( {k: v for k, v in module.default_options.items() if k not in self._DEFAULT_OPTIONS} ) for module in module_api.modules.values() if getattr(module, "default_options", None) ] if self._module: self._module_default_options.clear() self._module_default_options.update(deepcopy(self._module.default_options)) self._options = DeepChainMap( # Options changed at runtime for this specific device self._runtime_options, # Options changed at initialization time for this specific device self._host_option_overrides, # Global option runtime changes, managed in datastore peat.data.store.datastore.global_options, # minor hack with direct reference... # Defaults from the module associated with this device (or from the caller) self._module_default_options, # Fallback to module-specific defaults if the current module isn't known # e.g. "sel", keys that (mostly) aren't generalized *module_defaults, # Standard defaults # deepcopy to prevent changes to self._options from modifying source deepcopy(self._DEFAULT_OPTIONS), ) # If data is passed to initialization and there are hosts from YAML config, # then attempt to automatically set the label from the config. # # This will miss data objects that don't have any parameters passed during # initialization, but that is by design. if data and not self.label and config.HOSTS: # At initialization time, likely only one of these will be set, and certainly # only in the case of a device read from a config. Therefore, the first one # that's set should be used for the label lookup. ident_key = "" ident_val = "" # TODO: config identifier keys probably should be defined somewhere...meh for key in ["ip", "mac", "serial_port", "name", "hostname"]: if getattr(self, key): ident_key = key ident_val = getattr(self, key) break if ident_key: for host in config.HOSTS: if not host.get("label") or not host.get("identifiers"): continue if host["identifiers"].get(ident_key) == ident_val: self.label = host["label"] if host.get("comment"): self.comment = host["comment"] @property def address(self) -> str: """ Communication address of the device (``ip``, ``serial_port`` or ``mac``). """ valid_addresses = ["ip", "serial_port", "mac"] for key in valid_addresses: value = getattr(self, key, None) if value: if not isinstance(value, str): value = str(value) # convert ipaddress objects to str return value raise PeatError( f"'.address': no valid address defined out of {valid_addresses} (dev.id: {self.id})" ) @property def options(self) -> DeepChainMap: """ PEAT configuration options for this device. This includes Service configurations (timeout, port, etc.), login credentials, etc. The options are composed from multiple sources and use the following order of precedence: #. Runtime changes (``self._runtime_options``) #. Host-specific changes (``self._host_option_overrides``) #. Global option changes (stored in :attr:`datastore.global_options <peat.data.store.Datastore.global_options>`) #. Module-specific defaults (from :attr:`peat.device.DeviceModule.default_options` for the module) #. Global defaults (:attr:`peat.data.DeviceData._DEFAULT_OPTIONS`) """ if self._module and not self._module_default_options: self._module_default_options.update(deepcopy(self._module.default_options)) elif self._module and self._last_module: self._last_module = "" self._module_default_options.clear() self._module_default_options.update(deepcopy(self._module.default_options)) elif not self._module: try: c_locals = inspect.stack()[1][0].f_locals klass = c_locals.get("cls", c_locals.get("self")) if klass and hasattr(klass, "default_options"): k_name = getattr(klass, "__name__", klass.__class__.__name__) # type: str if not self._last_module or ( self._last_module and k_name != self._last_module ): self._last_module = k_name self._module_default_options.clear() self._module_default_options.update(deepcopy(klass.default_options)) except Exception: pass return self._options
[docs] def get_id(self, attribute_precedence: list[str] | None = None) -> str: """ Get a canonical device ID. If the lookup fails a randomly generated ID is used, generated by :func:`~peat.consts.gen_random_dev_id`. The attribute used as the ID is selected based on an order of precedence. Each attribute is checked in the order defined, and the first attribute with a defined value (non-empty) is used as the ID. Args: attribute_precedence: Define a custom order of precedence for attributes to use for an ID. If :obj:`None`, then the default :attr:`~peat.data.DeviceData._ID_KEY_ORDER` is used. Returns: Device ID or a randomly generated ID if lookup fails """ if attribute_precedence is None: attribute_precedence = self._ID_KEY_ORDER for attr in attribute_precedence: value: str = getattr(self, attr, None) if value: if not isinstance(value, str): value = str(value) # convert ipaddress objects to str return value if not self._cache.get("_rand_id"): self._cache["_rand_id"] = consts.gen_random_dev_id() log.critical(f"Failed to find a valid ID! Using '{self._cache['_rand_id']}' as the ID") return self._cache["_rand_id"]
[docs] def get_comm_id(self) -> str: """ Get a canonical communication protocol ID for this device (e.g. IP address, MAC address, serial port) Same as :meth:`~peat.data.models.DeviceData.get_id` except :attr:`~peat.data.models.DeviceData._COMM_ID_KEY_ORDER` is used as the order of precedence. Returns: Communication protocol ID of the device (e.g. IP, MAC, serial port) """ return self.get_id(attribute_precedence=self._COMM_ID_KEY_ORDER)
[docs] def service_status(self, lookup: dict) -> str: """ Returns the status of a service, or ``"unknown"`` if the service isn't found. """ hsvc = self.retrieve("service", lookup) if not hsvc or not isinstance(hsvc, Service): return "unknown" if isinstance(hsvc, list): # !! hack for duplicate services !! hsvc = hsvc[0] return hsvc.status
[docs] def annotate_edge_cases(self): # Annotate X509 and other models that are set on initialization for list_attr in self.get_attr_names(BaseModel): val = getattr(self, list_attr, None) if val and hasattr(val, "annotate"): val.annotate(self) # Annotate all objects in lists. This works around cases where # the object was added directly via a list.append() instead of # via the .store() API (generally for performance reasons). for list_attr in self.get_attr_names(list): val = getattr(self, list_attr, None) if val: for item in val: if hasattr(item, "annotate"): item.annotate(self)
[docs] def export( self, include_original: bool = False, exclude_fields: list[str] | None = None, only_fields: str | list[str] | None = None, ) -> dict: """ Return device data as a normalized JSON-friendly :class:`dict`. Args: include_original: If ``original`` keys should be included (this is the raw data, e.g. raw firmware or raw logic) exclude_fields: Field names (keys) to exclude from the returned :class:`dict`. This recursively excludes fields! only_fields: Only include the specified fields (keys) in the returned :class:`dict` (Note: this only applies to top-level fields in the data, e.g. ``name``, ``firmware``, etc.) Returns: The exported data as a JSON-serializable :class:`dict` .. note:: The order of data returned will be the same as the order of the fields in the models """ self.populate_fields() self.purge_duplicates() self.annotate_edge_cases() to_exclude = {} if exclude_fields: if isinstance(exclude_fields, str): to_exclude[exclude_fields] = True else: for ef in exclude_fields: to_exclude[ef] = True if not to_exclude: to_exclude = None to_include = {} if only_fields: if isinstance(only_fields, str): to_include[only_fields] = True else: for of in only_fields: to_include[of] = True if not to_include: to_include = None results = self.dict(exclude=to_exclude, include=to_include, exclude_defaults=True) if not include_original: # Note: parsed logic is stored in "parsed" results = strip_key(results, "original") results = consts.convert(results) # Normalize to JSON-friendly values results = strip_empty_and_private(results) # minor hack to apply IP sorting to the list of IPs in related.ip if results.get("related", {}).get("ip"): results["related"]["ip"] = addresses.sort_ips(results["related"]["ip"]) # NOTE: pydantic preserves the order they're defined in the models # https://github.com/samuelcolvin/pydantic/issues/593#issuecomment-501735842 return results
[docs] def export_summary(self, cached_export: dict | None = None) -> dict: """ Return a summarized version of the device data as a normalized JSON-friendly :class:`dict`, with certain large fields removed. Returns: The exported data as a JSON-serializable :class:`dict` .. note:: The order of data returned will be the same as the order of the fields in the models """ # Summary of data, excluding large/excessive fields like "original" if not cached_export: summary_data = self.export(include_original=True) else: summary_data = copy.deepcopy(cached_export) # Hack to directly exclude "original" only from # firmware and logic, not other things like events. for original_key in ["logic", "firmware", "boot_firmware"]: if summary_data.get(original_key, {}).get("original"): del summary_data[original_key]["original"] # Exclude fields from the summary for exclude_key in ["memory", "registers", "tag", "io", "event", "files"]: if summary_data.get(exclude_key): del summary_data[exclude_key] # Exclude "raw-values" from summary data if summary_data.get("logic", {}).get("formats", {}).get("raw-values"): del summary_data["logic"]["formats"]["raw-values"] if not summary_data["logic"]["formats"]: del summary_data["logic"]["formats"] return summary_data
[docs] def export_to_files(self, overwrite_existing: bool = False) -> bool: """ Export data to files named ``device-data-full`` and ``device-data-summary``. Args: overwrite_existing: if any files that already exist should be replaced with new data (overwritten) Returns: :obj:`True` if the writes completed, :obj:`False` if an exception occurred """ self.populate_fields() self.purge_duplicates() # TODO: export large fields to separate JSON files # memory # event # registers try: # All data, including "original" and other large fields full_device_data = self.export(include_original=True) self.write_file( data=full_device_data, filename="device-data-full.json", overwrite_existing=overwrite_existing, ) # Summary of data, excluding excessively large fields # such as logic.original or memory. summary_data = self.export_summary(cached_export=full_device_data) self.write_file( data=summary_data, filename="device-data-summary.json", overwrite_existing=overwrite_existing, ) return True except Exception: log.exception(f"Failed to export data to files for device '{self.get_id()}'") state.error = True return False
[docs] def export_to_elastic(self, elastic: Elastic | None = None) -> bool: """ Save device data to an Elasticsearch database. Args: elastic: The :class:`~peat.elastic.Elastic` instance to use. If unspecified, this defaults to the global :class:`~peat.elastic.Elastic` instance in :attr:`~peat.settings.State.elastic`. Returns: If the export was successful """ dev_id = self.get_id() elastic = resolve_es_instance(elastic, dev_id) if not elastic: return False log.info(f"Exporting device {dev_id} to {elastic.type}") success = True # Export this DeviceData object (self) if not export_models_to_elastic([self], self, elastic): success = False # Automatically determine what lists of models to push for list_attr_name in self.get_attr_names(list): model_list = getattr(self, list_attr_name, None) if not model_list: log.trace2( f"Skipping {elastic.type} export of DeviceData.{list_attr_name} (no data)" ) continue # If the list is populated, and it has "_es_index_varname" attribute defined, # then proceed with attempting to export it to Elasticsearch. # Also, skip self.modules. if ( model_list and getattr(model_list[0], "_es_index_varname", None) and not isinstance(model_list[0], DeviceData) ): log.info( f"Exporting {len(model_list)} {model_list[0].__repr_name__()} " f"models (DeviceData.{list_attr_name}) to {elastic.type} for " f"device '{self.get_id()}'" ) # Export all models in the list to Elasticsearch if not export_models_to_elastic(models=model_list, dev=self, elastic=elastic): success = False if success: log.debug(f"Exported device '{self.get_id()}' to {elastic.type}") return True else: log.error(f"Failed to export device '{self.get_id()}' to {elastic.type}") state.error = True return False
[docs] def elastic(self) -> dict[str, Any]: """ This generates the ``host`` portion of Elasticsearch data. .. note:: Attributes in any data objects with an empty value or a name that starts with an underscore (``_``) will not be included in the return value Returns: The host's data as a elasticsearch-friendly dictionary """ self.populate_fields() self.purge_duplicates() # We don't use self.export() here because consts.convert() transforms objects # like datetime() into strings. The PeatSerializer Elasticsearch serializer # will handle the conversions instead. results = strip_empty_and_private(self.dict(exclude_defaults=True)) if not config.ELASTIC_SAVE_BLOBS: results = strip_key(results, "original") # minor hack to apply IP sorting to the list of IPs in related.ip # this is because parse_api calls dev.elastic() instead of dev.export() if results.get("related", {}).get("ip"): results["related"]["ip"] = addresses.sort_ips(results["related"]["ip"]) return results
[docs] def gen_elastic_content( self, dev: DeviceData | None = None, # noqa: ARG002 ) -> dict: self.populate_fields() content = { "@timestamp": Elastic.time_now(), "message": f"{self.description.vendor.name} {self.type}", "tags": [ self.description.vendor.name, self.description.product, "devices", ], "host": self.elastic(), # Export the current data } # If the logic is a dict (like with raw-values), then # convert it to a JSON string before pushing to elastic. if content["host"].get("logic", {}).get("original"): if isinstance(content["host"]["logic"]["original"], dict): content["host"]["logic"]["original"] = json.dumps( consts.convert(content["host"]["logic"]["original"]) ) # Exclude "raw-values" from Elasticsearch data if content["host"].get("logic", {}).get("formats", {}).get("raw-values"): del content["host"]["logic"]["formats"]["raw-values"] if not content["host"]["logic"]["formats"]: del content["host"]["logic"]["formats"] return content
[docs] def gen_base_host_fields_content(self) -> dict: """Populate ``host`` field values for new indices.""" content = {} for key in [ "hostname", "id", "ip", "mac", "mac_vendor", "serial_port", "name", "label", "comment", "type", "serial_number", "slot", "description", "geo", ]: if not self.is_default(key) and getattr(self, key): value = getattr(self, key) if isinstance(value, BaseModel): value = value.dict(exclude_defaults=True, exclude_none=True) content[key] = value return content
[docs] def write_file( self, data: Any, filename: str, overwrite_existing: bool = False, out_dir: Path | None = None, merge_existing: bool = False, ) -> Path: """ Save data to a file in the device's output directory. .. note:: Data will NOT be written if both :attr:`~peat.settings.Configuration.DEVICE_DIR` and ``self._out_dir`` are unset. Args: data: Raw data to write filename: Name including extension of the file overwrite_existing: If existing files with the same name should be overwritten instead of being written with a ".<num>" appended to the name. out_dir: Directory the data should be written to. Defaults to result of ``dev.get_out_dir()`` merge_existing: If the file already exists and is JSON, then read the data from the existing file, merge the new data with it, then overwrite the file with the merged data. Returns: Path to the file that was written """ filename = consts.sanitize_filepath(filename) # If device file output is disabled, return the filename as a path if not self._out_dir and not config.DEVICE_DIR: log.debug(f"Device file output is disabled, skipping write to file {filename}") return Path(filename) if not out_dir: out_dir = self.get_out_dir() full_path = out_dir / filename utils.write_file( data=data, file=full_path, overwrite_existing=overwrite_existing, merge_existing=merge_existing, ) return full_path
[docs] def get_out_dir(self) -> Path: """ Get the path to the directory for any file output and results associated with this device. """ if not self._out_dir: dir_name = address_to_pathname(self.get_id()) self._out_dir = config.DEVICE_DIR / dir_name return self._out_dir
[docs] def get_sub_dir(self, basename: str) -> Path: """ Generate a directory path for specific file output, for example FTP files. The path will be a sub-dir in the device's results directory. """ return self.get_out_dir() / basename
[docs] def populate_fields(self, network_only: bool = False) -> None: """ Populate new values by extrapolating from other existing values. .. note:: This method also removes duplicate services and interfaces Example: if the device object only has a ``mac`` field populated, this will resolve and update the ``ip`` and ``hostname`` fields, then add a ``ethernet`` :class:`~peat.data.models.Interface` with those fields populated. Args: network_only: Only update network-related fields (like ``interface``) """ # Deduplicate network attributes, such as redundant services # e.g. Service(port=80) and Service(port=80, protocol="http") # are duplicates and the former would be removed. for data_attr in ["interface", "service"]: data_val = getattr(self, data_attr, None) # type: Optional[list] if data_val: deduped = dedupe_model_list(data_val) setattr(self, data_attr, deduped) # Add a interface and populate other ID fields if not self.interface: if self.serial_port: serial_iface = Interface(serial_port=self.serial_port) self.store("interface", serial_iface) else: for field_name in ["ip", "mac", "hostname"]: val = getattr(self, field_name, None) if val: # Note: remaining fields will be auto-populated iface = Interface(type="ethernet") setattr(iface, field_name, val) iface.annotate(self) self.store("interface", iface) break # Deduplicate services associated with interfaces else: for iface in self.interface: iface.annotate(self) if iface.services: iface.services = dedupe_model_list(iface.services) sort_model_list(iface.services) # Add to Related fields if self.ip and self.ip not in self.related.ip: self.related.ip.add(self.ip) if self.hostname and self.hostname not in self.related.hosts: self.related.hosts.add(self.hostname) # Always set the device ID to the IP, if it's known if self.ip and (self.id != self.ip or (self.name and self.id != self.name)): self.id = self.ip # If device ID isn't set, use one of the identifying values (IP, etc.) if not self.id: if self.ip: self.id = self.ip elif self.hostname: self.id = self.hostname elif self.name: self.id = self.name elif self.mac: self.id = self.mac # If ID is localhost, attempt to fallback to name or MAC, if they're defined if self.id in ["127.0.0.1", "localhost"]: if self.name: self.id = self.name elif self.mac: self.id = self.mac if self.mac and not self.mac_vendor: self.mac_vendor = mac_to_vendor_string(self.mac) if network_only: return # Generate/update file fields and read/save data, if applicable for field_name in self.__fields__.keys(): field_obj = getattr(self, field_name) file_obj = getattr(field_obj, "file", None) # type: Optional[File] if file_obj and isinstance(file_obj, File): if hasattr(field_obj, "original"): # Run annotate_obj_and_file on all attributes # with a "file" and "original" annotate_obj_and_file(field_obj, field_name, self) else: # Run process_file on all modules with "File" type field file_obj.annotate(self) process_file(file_obj) # Generate the product string if not self.description.product and self.description.brand: if self.description.brand == self.description.model: self.description.product = self.description.brand else: self.description.product = ( f"{self.description.brand} {self.description.model}".strip() ) # Generate the description "full" field if not self.description.full: if self.description.vendor.name and self.description.product: self.description.full = ( f"{self.description.vendor.name} {self.description.product}".strip() ) elif self.description.vendor.id and self.description.product: self.description.full = ( f"{self.description.vendor.id} {self.description.product}".strip() ) # Populate vendor name from ID if not self.description.vendor.name: self.description.vendor.name = self.description.vendor.id # Populate os.full if not self.os.full: vend = self.os.vendor.id if self.os.vendor.name: vend = self.os.vendor.name self.os.full = " ".join( [x for x in [vend, self.os.name, self.os.version] if x] ).strip()
[docs] def retrieve(self, attr: str, search: dict[str, Any]) -> BaseModel | list[BaseModel] | None: """ Retrieve a complex device data value. .. code-block:: python >>> from peat.data import DeviceData, Interface, Service, Tag >>> dev = DeviceData() >>> dev.store("interface", Interface(ip="192.0.2.123", type="ethernet")) >>> dev.store("interface", Interface(ip="192.0.2.20", type="ethernet")) >>> dev.store("service", Service(protocol="http", port=80)) >>> dev.store("tag", Tag(name="var_rtu-8_I0", type="binary")) >>> dev.store("tag", Tag(name="var_rtu-9_I1", type="binary")) >>> dev.store("tag", Tag(name="var_rtu-10_Q0", type="analog")) # Interface with IP address of 192.0.2.20 >>> iface = dev.retrieve("interface", {"ip": "192.0.2.20"}) >>> iface.ip '192.0.2.20' # All "ethernet" interfaces >>> eth_ifaces = dev.retrieve("interface", {"type": "ethernet"}) >>> len(eth_ifaces) 2 >>> iface in eth_ifaces True # The 'HTTP' service >>> svc = dev.retrieve("service", {"protocol": "http"}) >>> svc.port 80 # Tag with name of var_rtu-8_I0 >>> tag = dev.retrieve("tag", {"name": "var_rtu-8_I0"}) >>> tag.name 'var_rtu-8_I0' # All the binary tags >>> binary_tags = dev.retrieve("tag", {"type": "binary"}) >>> len(binary_tags) 2 >>> tag in binary_tags True Args: attr: Attribute name to lookup as a string, e.g. ``"interface"`` search: Dict with key-values to search for. Note that all key-value pairs must match for a search to succeed. Returns: The matching item or list of items if the search succeeded, otherwise :obj:`None` (the search failed or an error occurred). Items are data model objects, such as :class:`~peat.data.models.Interface`, :class:`~peat.data.models.Service`, or :class:`~peat.data.models.Tag`. Raises: PeatError: unexpected input or an invalid internal program state """ # Basic check if it's a valid attribute if not hasattr(self, attr): raise PeatError( f"Data does not have attribute '{attr}'. " f"You should never see this error, if you are " f"then there's a severe bug in the PEAT module!" ) obj = getattr(self, attr) # Attempt to get the object if not obj: return None found = [] for val in obj: # Compare the key/value pairs in search dict against the # attributes of each value in the top-level container. if all(getattr(val, k, None) == v for k, v in search.items()): found.append(val) if len(found) == 1: return found[0] # Found one result, return the item elif len(found) >= 2: return found # Multiple results, return list of items return None
[docs] def store( self, key: Literal[ "interface", "service", "ssh_keys", "registers", "tag", "io", "event", "memory", "module", "users", "uefi_image", "uefi_hashes", "files", ], value: BaseModel, lookup: str | list | dict | None = None, interface_lookup: dict | None = None, append: bool = False, ) -> None: """ Add or update complex device data. .. code-block:: python >>> from datetime import datetime >>> from pprint import pprint >>> from peat.data import DeviceData, Interface, Memory, Tag, Register # Create the device instance >>> dev = DeviceData() # Add a single network interface with IP of 192.0.2.20 # NOTE: MAC address and hostname will be auto-resolved # the next time "dev.populate_fields()" is called. >>> dev.store("interface", Interface(ip="192.0.2.20", type="ethernet")) >>> dev.export(only_fields="interface") {'interface': [{'type': 'ethernet', 'ip': '192.0.2.20'}]} # Add a HTTP service to the interface with an IP of 192.0.2.20 >>> dev.store( "service", Service(protocol="http", port=80), # Lookup the interface for the service to be associated with interface_lookup={"ip": "192.0.2.20"}) >>> dev.export(only_fields="service") {'service': [{'port': 80, 'protocol': 'http', 'transport': 'tcp'}]} >>> pprint(dev.export(only_fields="interface")) {'interface': [{'ip': '192.0.2.20', 'services': [{'port': 80, 'protocol': 'http', 'transport': 'tcp'}], 'type': 'ethernet'}]} # Services are also stored in interfaces >>> dev.service[0] == dev.interface[0].services[0] True # However, keep in mind it's not the same instance, so changes to the # interface in dev.service will not be reflected in the one in # interface.services. If making changes, use store(). >>> dev.service[0] is dev.interface[0].services[0] False # I/O protocol registers, e.g. for Modbus and DNP3 >>> dev.store("registers", Register(protocol="dnp3", data_type="bool")) >>> pprint(dev.export(only_fields="registers")) {'registers': [{'data_type': 'bool', 'protocol': 'dnp3'}]} # I/O tags, e.g. from a SCADA database >>> dev.store("tag", Tag(name="var_rtu-8_I0", type="binary")) >>> pprint(dev.export(only_fields="tag")) {'tag': [{'name': 'var_rtu-8_I0', 'type': 'binary'}]} # Store a raw read from device memory >>> dev.store("memory", Memory( address="0000FFAB", created=datetime(2019, 2, 25, 17, 39, 11, 507318), value="D3ADB33F")) >>> dev.memory [Memory(address='0000FFAB', created=datetime.datetime(2019, 2, 25, 17, 39, 11, 507318), device='192.0.2.20', value='D3ADB33F')] >>> pprint(dev.export(only_fields="memory")) {'memory': [{'address': '0000FFAB', 'created': '2019-02-25 17:39:11.507318', 'device': '192.0.2.20', 'value': 'D3ADB33F'}]} # Adding a module by constructing a new DeviceData object >>> io_module = DeviceData(name="digitalIO", type="I/O", slot="1") >>> dev.store("module", io_module) >>> dev.export(only_fields="module") {'module': [{'name': 'digitalIO', 'type': 'I/O', 'slot': '1'}]} .. note:: If unset, the :attr:`~peat.data.models.DeviceData.id` attribute on this object will be set to the IP of the first Interface added via :meth:`~peat.data.models.DeviceData.store` .. note:: When adding a service, the interface the service should be associated with can be specified by including specific keys in the ``interface_lookup`` argument. These keys are: ``name``, ``ip``, ``serial_port``, ``mac``, and ``hostname``. Example: ``interface_lookup={"ip": "192.0.2.20"}`` will add the service to the Interface object with an IP address of ``192.0.2.20``. Args: key: Name of the field to add or edit, e.g. ``interface`` to add data to a new or existing interface. value: Value to store. Type and structure depends on the field being changed. lookup: Values to use to search for an existing item to edit. .. note:: If :obj:`None`, then ``lookup`` will fallback to hardcoded search defaults if the type is :class:`~peat.data.models.Service` or :class:`~peat.data.models.Interface`. The lookup value can be one of the following: - String of an attribute name to compare, e.g. ``"ip"`` to use the ``ip`` attribute to compare interfaces. - A list of strings of attribute names to compare, e.g. ``["name", "ip"]``. The attributes will be checked in order, so a interface with the same ``name`` attribute will be merged before one that matches the ``ip`` attribute. - a dict of values to lookup, with key being attribute name and value the value to compare. ALL values MUST match for a lookup to be successful! .. code-block:: python :caption: Examples of different lookup argument data types >>> from pprint import pprint >>> from peat.data import DeviceData, Memory, Service, IO >>> dev = DeviceData(ip="192.0.2.20") # Specify name of a service to update >>> dev.store("service", Service(protocol="telnet")) >>> dev.export(only_fields="service") {'service': [{'protocol': 'telnet', 'transport': 'tcp'}]} >>> dev.store("service", value=Service(status="open"), lookup={"protocol": "telnet"}) >>> dev.export(only_fields="service") {'service': [{'protocol': 'telnet', 'status': 'open', 'transport': 'tcp'}]} # Lookup using a key >>> dev.store("memory", Memory(address="0000FFAB")) >>> dev.export(only_fields="memory") {'memory': [{'address': '0000FFAB', 'device': '192.0.2.20'}]} >>> dev.store("memory", value=Memory( address="0000FFAB", created=datetime(2019, 2, 25, 17, 39, 11, 507318), ), lookup="address") >>> pprint(dev.export(only_fields="memory")) {'memory': [{'address': '0000FFAB', 'created': '2019-02-25 17:39:11.507318', 'device': '192.0.2.20'}]} # Lookup using list of keys >>> dev.store("io", IO(address="0001", direction="input")) >>> dev.export(only_fields="io") {'io': [{'address': '0001', 'direction': 'input'}]} >>> dev.store("io", IO(address="0001", direction="input", type="analog"), lookup=["address", "direction"] ) >>> dev.export(only_fields="io") {'io': [{'address': '0001', 'direction': 'input', 'type': 'analog'}]} interface_lookup: :class:`dict` with :class:`~peat.data.models.Interface` attribute keys and values to lookup when storing a :class:`~peat.data.models.Service` append: Append the item to the list and don't attempt lookups Raises: PeatError: Invalid key specified or other errors indicative of issues with module code """ # Require value to be a instance of a model. no dict nonsense anymore. if not isinstance(value, BaseModel): raise PeatError( f"Invalid value type '{value.__class__.__name__}', " f"expected BaseModel (value={value})" ) if lookup and not isinstance(lookup, (dict, str, list)): raise PeatError( f"Invalid type '{lookup.__class__.__name__}' for 'lookup' argument " f"to store(), expected dict, str, or list (lookup={lookup})" ) # Check the destination list attribute name is valid if not hasattr(self, key): raise PeatError(f"No attribute for key '{key}'. Value: {value}") # Get the list of objects (self.memory => list[Memory], etc.) container = getattr(self, key) # type: list[BaseModel] # Automatically call annotate() value.annotate(self) # Append item to the list and return, don't lookup if append: container.append(value) return # !! hack !! # Fallback to hardcoded values for Service and Interface if lookup is None if not lookup: if isinstance(value, Service): lookup = ["protocol", "port"] elif isinstance(value, Interface): lookup = ["name", "mac", "ip", "serial_port", "id"] elif isinstance(value, File): if value.local_path: lookup = "local_path" elif value.path: lookup = "path" else: lookup = "name" # Add the value to the container, and merge with any existing values # if they match the lookup value = self._lookup_and_merge(container, value, lookup) # If it's a service, add it to relevant interface(s) # NOTE: if there isn't a positive match for a interface lookup, # then the default is to NOT add it to any interfaces if interface_lookup and isinstance(value, Service): if_position = match_all(self.interface, interface_lookup) if if_position is not None: # Merge with existing services on the interface self._lookup_and_merge(self.interface[if_position].services, value, lookup) # Copy comm attributes from first interface added if key == "interface" and len(self.interface) == 1: # Ensure serial_port and ip/etc are mutually exclusive if not self.serial_port and self.interface[0].serial_port: self.serial_port = self.interface[0].serial_port else: for attr in ["ip", "mac", "hostname"]: if not getattr(self, attr, None): val = getattr(self.interface[0], attr, None) if val: setattr(self, attr, val) if not self.id: self.id = val
def _lookup_and_merge( self, container: list[BaseModel], value: BaseModel, lookup: str | list | dict | None, ) -> BaseModel: """ Handle adding objects to lists. This is needed as a standalone method from store() so services can be added to interfaces in the same manner as they are in store(). """ # Append to a list of objects if no lookup is specified # If there is a lookup specified, and the list is empty, # just append it since there's nothing to look up. if not lookup or not container: container.append(value) return value # Attempt to lookup an existing object in the list of objects position = None # type: Optional[int] if isinstance(lookup, str): # Attribute to use for lookup, e.g. "ip" for Interface position = lookup_by_str(container, value, lookup) elif isinstance(lookup, list): # List of attributes to use for lookup, # e.g. ["port", "protocol"] for Service # NOTE: only one value needs to match for lookup to succeed for lookup_str in lookup: position = lookup_by_str(container, value, lookup_str) if position is not None: break elif isinstance(lookup, dict): # Multiple key-value pairs to look for, e.g. # {"ip": "192.0.2.123", "type": "Ethernet"} # NOTE: ALL values must match for lookup to succeed position = match_all(container, lookup) else: raise PeatError( f"Invalid type '{lookup.__class__.__name__}' for 'lookup' argument " f"to store(), expected str, list, or dict (lookup={lookup})" ) # If the lookup was successful, then copy the values from # the new object to existing object if position is not None: merge_models(container[position], value) container[position].annotate(self) value = container[position] # Otherwise, just append the value to the list else: container.append(value) return value
[docs] def is_duplicate(self, other: DeviceData) -> bool: """ If this device is likely a duplicate of another. .. note:: Only deduplicate if devices have the same communication ID (IP, MAC, Serial port) or label (from a PEAT config file) Args: other: Device to compare Returns: If the device is likely a duplicate of this device """ # NOTE: we assume SEL devices with different serial numbers or part numbers # are different devices. This fixes an issue with parsing large numbers # of SEL files. if self.description.vendor.id == "SEL" and other.description.vendor.id == "SEL": for attr in ["part_number", "serial_number"]: this_val = getattr(self, attr, None) other_val = getattr(other, attr, None) if this_val and other_val and this_val != other_val: return False for attr in ["label", "ip", "mac", "serial_port"]: this_val = getattr(self, attr, None) other_val = getattr(other, attr, None) # If current value is not empty or None, # and equals the other devices value, # then mark it as a duplicate if this_val and other_val == this_val: return True return False
[docs] def purge_duplicates(self, force: bool = False) -> None: """ Removes duplicates from all :class:`list`-type attributes on this object that aren't private. Once performed, ``self._is_deduplicated`` is set to True. If True, subsequent calls won't perform deduplication. To override this behavior, set force=True, or set ``self._is_deduplicated`` to False. """ if self._is_deduplicated and not force: return log.debug(f"Purging duplicates from device '{self.get_id()}'") # Iterate over all list-type attributes for list_attr in self.get_attr_names(list): val = getattr(self, list_attr, None) # TODO: temporary hack to only deduplicate smaller lists # "Large" lists of even just 30,000 objects can take close # to an hour to dedupe! # Need to refactor how models are stored and how deduplication # occurs. if val and len(val) < 5000: deduped = dedupe_model_list(val) sort_model_list(deduped) setattr(self, list_attr, deduped) self._is_deduplicated = True
[docs] def get_attr_names(self, typ: type) -> list[str]: """ Get names of attributes on this instance that aren't private. Args: typ: Class to check for, e.g. :class:`list` or BaseModel """ attrs = [k for k in self.__dict__.keys() if not k.startswith("_")] attribute_names = [attr for attr in attrs if isinstance(getattr(self, attr, None), typ)] return attribute_names
# Required by pydantic to resolve types that are "forward-references" (self-referential) DeviceData.update_forward_refs()
[docs] def process_file(file: dict | File) -> File: """ Transform a :class:`dict` into a :class:`~peat.data.models.File` object and populate unfilled fields. """ # Convert the dict into a new File instance if isinstance(file, dict): file = File.parse_obj(file) # Create the path from the directory and name (note: the name includes extensions) if not file.path and (file.directory and file.name): file.path = PurePath(file.directory, file.name) # Resolve to absolute path if file.local_path and file.local_path.exists(): file.local_path = file.local_path.resolve() # If path isn't set, then set it to the local path if not file.path and file.local_path: file.path = PurePath(file.local_path) # Use local file metadata to fill out fields, e.g. size, modification time, etc. if file.local_path and file.local_path.is_file(): try: if not file.mtime: mtime = file.local_path.stat().st_mtime file.mtime = datetime.fromtimestamp(mtime) if not file.created: ctime = file.local_path.stat().st_ctime file.created = datetime.fromtimestamp(ctime) # .owner() and .group() don't work on Windows # WINDOWS: NotImplementedError: Path.group() is unsupported on this system if not WINDOWS: if not file.group: file.group = file.local_path.group() if not file.owner: file.owner = file.local_path.owner() except (NotImplementedError, OSError, KeyError) as err: msg = ( f"Failed to get ownership info and/or timestamps " f"for file '{file.local_path.name}'. This can occur if " f"you're running PEAT in a container environment " f"(e.g. Docker or podman). Error that occurred: {err}" ) # KeyError: 'getgrgid(): gid not found: 89040' # If in a docker container it's not important, so log as debug # This shouldn't happen outside of a container, so log that as a warning if SYSINFO.get("containerized"): log.debug(msg) else: log.warning(msg) if not file.size: file.size = file.local_path.stat().st_size if not _all_hashes_set(file.hash): file.hash = Hash.parse_obj(utils.gen_hashes(file.local_path)) if not file.directory and file.path: file.directory = file.path.parent.as_posix() if not file.name and file.path: file.name = file.path.name if not file.type and (file.path or file.local_path): file_type = "file" if file.local_path: try: if file.local_path.is_dir(): file_type = "dir" except Exception: pass file.type = file_type process_file_extension(file) if file.directory and not file.directory.endswith("/"): file.directory += "/" return file
[docs] def process_file_extension(file: File) -> None: """ Automatically infer and populate unset fields on a :class:`~peat.data.models.File` object. """ if not file.extension and file.path: # NOTE: we just get the last extension even if a file may have more, # e.g. ".tar.gz" will only save as ".gz". This is due to many files # having multiple "."'s in them, so joining suffixes leads to weird # output. # NOTE2: make extension lowercase even if filename is upper case for # consistent matching/searching, since the Elastic type is "keyword". file.extension = file.path.suffix.lower().strip(".") elif not file.extension and not file.path and "." in file.name: file.extension = ".".join(PurePath(file.name).suffixes).lower().strip(".") if file.extension: file.extension = file.extension.lower().strip(".").split(".")[-1] if not file.mime_type and not file.type == "dir": # Hardcode known binary file types if file.extension in [ "rdb", "db", "pkl", "dmk", "apx", "ztx", "stu", "mc7", "firmware", "dcf", ]: file.mime_type = "application/octet-stream" # Hardcode known text file types elif file.extension in ["upg", "st"]: file.mime_type = "text/plain" elif file.name: # guess_type() returns: tuple(type, encoding) guessed_type = mimetypes.guess_type(file.name, strict=False)[0] if guessed_type: # Windows returns "text/xml" instead of "application/xml" # for unknown (though likely historical) reasons. The latter # form is preferred for our use cases, and for consistency # across platforms. Refer to RFC 3023 for more details: # https://tools.ietf.org/html/rfc3023 if guessed_type == "text/xml": guessed_type = "application/xml" file.mime_type = guessed_type
[docs] def annotate_obj_and_file(obj: Firmware | Logic, field_name: str, dev: DeviceData) -> None: """ Populate original field if not set, and save data to file if it hasn't been. """ obj.file.annotate(dev) process_file(obj.file) # If no raw data and the file has data, read data from the file if not obj.original and obj.file.local_path and obj.file.local_path.is_file(): if isinstance(obj, Firmware): obj.original = obj.file.local_path.read_bytes() else: try: with obj.file.local_path.open(encoding="utf-8", newline="") as f: obj.original = f.read() except UnicodeDecodeError: log.warning( f"Skipping save of non-text source for logic (source: {obj.file.local_path})" ) # If there's data and the data hasn't been saved locally (and output directory is set) if obj.original and not obj.file.local_path and config.DEVICE_DIR: file_ext = obj.file.extension if not file_ext: file_ext = ".bin" if isinstance(obj.original, bytes) else ".txt" if not file_ext.startswith("."): file_ext = f".{file_ext}" obj.file.local_path = dev.write_file( data=obj.original, # "firmware", "boot-firmware", "logic" filename=field_name.lower().replace("_", "-") + file_ext, ) process_file(obj.file) # Calculate hashes for original if obj.original and not _all_hashes_set(obj.hash): obj.hash = Hash.parse_obj(utils.gen_hashes(obj.original)) _add_hashes_to_related(dev, obj.hash)
def _all_hashes_set(hash_obj: Hash) -> bool: """ Check all hashes in ``config.HASH_ALGORITHMS`` have been calculated and set. """ for algo in config.HASH_ALGORITHMS: if hasattr(hash_obj, algo) and not getattr(hash_obj, algo, None): return False return True def _add_hashes_to_related(dev: DeviceData, hash_obj: Hash) -> None: """ Add any hashes to ``dev.related.hash``. """ for algo in config.HASH_ALGORITHMS: hash_value = getattr(hash_obj, algo, None) if hash_value and hash_value not in dev.related.hash: dev.related.hash.add(hash_value.upper())
[docs] def export_models_to_elastic( models: list[BaseModel], dev: DeviceData, elastic: Elastic | None = None ) -> bool: """ Export model objects to an Elasticsearch database. Under the hood, this uses the Elasticsearch Bulk API to do efficient exporting in parallel and with fewer API requests. Args: models: the models to export. All models in the list must be of the same type (don't mix models). To export a single model, wrap the model in a list, e.g. ``models=[mymodel]``. dev: the DeviceData object the model(s) are associated with elastic: The :class:`~peat.elastic.Elastic` instance to use. If unspecified, this defaults to the global :class:`~peat.elastic.Elastic` instance in :attr:`~peat.settings.State.elastic`. Returns: True if the export was successful, False if there were any errors """ dev_id = dev.get_id() elastic = resolve_es_instance(elastic, dev_id) if not elastic: return False # Resolve configuration name to the configured index name # If this fails, you probably messed up your model class :) mn = models[0].__repr_name__() if not hasattr(config, models[0]._es_index_varname): log.critical( f"Failed to determine index name for '{mn}': PEAT config " f"has no attribute '{models[0]._es_index_varname}'" ) state.error = True return False if not all(m.__repr_name__() == mn for m in models): log.critical("Mixed model types in export_models_to_elastic!") state.error = True return False # Skip push if index is disabled (config value is "" or None) ts_index = getattr(config, models[0]._es_index_varname, None) if not ts_index: return True to_push = [] for model in models: content = model.gen_elastic_content(dev) content = utils.sort(content) # Sort for determinism/consistency # Save doc ID of first push, subsequent calls should just update the # existing doc. This ensures it's safe to call this at various points # of execution without creating a bunch of copies in the same run. if not model._elastic_doc_id: model._elastic_doc_id = elastic.gen_id() elif config.DEBUG >= 3: log.trace3( f"Updating EXISTING timeseries '{mn}' data from device " f"'{dev_id}' to {elastic.type} server {elastic!s}" ) to_push.append((model._elastic_doc_id, content)) failed_msg = ( f"Failed {elastic.type} export for '{mn}' timeseries data from " f"device '{dev_id}' to {elastic.type} server {elastic!s}" ) try: if not elastic.bulk_push( index=ts_index, contents=to_push, ): log.error(failed_msg) state.error = True return False except Exception: log.exception(failed_msg) state.error = True return False return True
[docs] def resolve_es_instance(elastic: Elastic | None, dev_id: str) -> Elastic | None: if not elastic: # Use the global instance by default elastic = state.elastic if not elastic: log.error( f"export_models_to_elastic() called for device '{dev_id}' but " f"Elasticsearch or OpenSearch has not been configured or initialized!" ) state.error = True return elastic
__all__ = [ "IO", "OS", "X509", "CertEntity", "Description", "DeviceData", "Event", "File", "Firmware", "Geo", "Hardware", "Hash", "Interface", "LatLon", "Logic", "Memory", "Register", "Related", "SSHKey", "Service", "Tag", "User", "Vendor", "export_models_to_elastic", "process_file", ]