Source code for peat.protocols.http

from __future__ import annotations

import re
import socket
import ssl
import tempfile
import urllib.parse
from pathlib import Path
from typing import Literal

from bs4 import BeautifulSoup
from requests import Response, Session

import peat  # Avoid circular imports
from peat import config, consts, log, utils


[docs] class HTTP: """ Basic set of reusable HTTP functionality. """ page_cache: dict[str, Response] = {} # Global cache of page data, keyed by URL DEFAULT_HEADERS: dict = {} def __init__( self, ip: str, port: int = 80, timeout: float = 5.0, dev: peat.data.models.DeviceData | None = None, protocol: Literal["http", "https", ""] = "", ) -> None: """ Args: ip: IP address of HTTP host port: TCP port to use timeout: Default timeout for requests dev: Default :class:`~peat.data.models.DeviceData` instance to use for various things like saving files """ self.ip: str = ip self.port: int = port self.timeout: float = timeout self.protocol: Literal["http", "https", ""] = protocol if not self.protocol and self.port == 80: self.protocol = "http" elif not self.protocol and self.port == 443: self.protocol = "https" # Instance-level logger self.log = log.bind( classname=self.__class__.__name__, target=f"{self.protocol}://{self.ip}:{self.port}", ) self._session: Session | None = None # default device object to use self.dev: peat.data.models.DeviceData | None = dev # WARNING: do NOT uncomment the code below UNLESS Elasticsearch # export is disabled, otherwise all Elasticsearch traffic will # be emitted to STDOUT (including logging!). # # if config.DEBUG >= 3: # # Enable request/response debugging output (print statements) # # can we monkeypatch this so it goes to logging instead of just stdout? # from http.client import HTTPConnection # HTTPConnection.debuglevel = 1 self.log.trace(f"Initialized {repr(self)}") @property def url(self) -> str: return f"{self.protocol}://{self.ip}:{self.port}" @property def session(self) -> Session: if self._session is None: self._session = self.gen_session() if self.DEFAULT_HEADERS: self._session.headers.update(self.DEFAULT_HEADERS) return self._session @session.setter def session(self, sess: Session) -> None: if self._session is not None: self._session.close() self._session = sess @property def connected(self) -> bool: return bool(self._session)
[docs] def disconnect(self) -> None: if self._session is not None: self._session.close()
def __enter__(self) -> HTTP: return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.disconnect() if exc_type: self.log.debug(f"{exc_type.__name__}: {exc_val}") def __str__(self) -> str: return self.ip def __repr__(self) -> str: return f"{self.__class__.__name__}({self.ip}, {self.port}, {self.timeout})"
[docs] def _save_response_to_file( self, response: Response, page: str, url: str, dev: peat.data.models.DeviceData | None, ) -> Path | None: """ Save raw text data from response to disk, even if bad status code. """ try: if response.text: f_name = "" cd = response.headers.get("content-disposition", "") if cd and "filename" in cd: fn_match = re.findall(r"filename=(.+)", cd) if fn_match: f_name = fn_match[0].strip('"') if not f_name: if not page and url: parts = urllib.parse.urlparse(url) page = parts.path.strip("/") # handle paths with args, e.g. ?0x0D00 vs ?0X0000 are different if parts.query: page = f"{page}{parts.query}.html" if not page or page == "/": f_name = "index.html" elif page.endswith(".html") or page.endswith(".htm"): f_name = page else: f_name = f"{page}.html" if not dev: dev = peat.data.datastore.get(self.ip) # Sanitize characters that are invalid filenames on Windows # This avoids a warning in utils.write_file() for char in ["?", ":", '"']: if char in f_name: f_name = f_name.replace(char, "_") path = dev.write_file( response.text, filename=f_name, out_dir=dev.get_sub_dir("http_files"), ) dev.related.files.add(f_name) self.log.trace2(f"Saved response from {url} to {path.as_posix()}") return path except Exception: self.log.exception(f"Failed to write page '{page}' to file")
[docs] def get( self, page: str = "", protocol: Literal["http", "https", ""] = "", url: str = "", use_cache: bool = True, params: dict | None = None, auth=None, allow_errors: bool = False, dev: peat.data.models.DeviceData | None = None, timeout: float | None = None, **kwargs, ) -> Response | None: """ Perform a HTTP ``GET`` request and return the response. .. warning:: Results of queries for an identical URL are cached by default for a single run of PEAT. If your tool is querying the status or looking for changes within a single run of PEAT, then set ``use_cache`` to :obj:`False`. .. note:: The response object will have three additional attributes: ``request_timestamp``, ``response_timestamp``, ``file_path``. Args: page: URL path of the page to get protocol: Name of the protocol to use If empty string (default), the HTTP instance's "protocol" will be used, if set. Otherwise, it will default to "http". url: URL to use instead of the auto-constructed one use_cache: If the internal page cache should be used. params: Additional HTTP parameters to include in the request auth: Authentication to use for the request (refer to Requests docs) dev: DeviceData object to save files to timeout: Timeout for the query. If :obj:`None`, the default timeout for this class instance is used instead. kwargs: Additional keyword arguments that will be passed directly to ``Requests.get()`` Returns: The response object, or :obj:`None` if the request failed. The response object will have three additional attributes: ``request_timestamp``, ``response_timestamp``, ``file_path``. """ if not protocol and self.protocol: protocol = self.protocol elif not protocol: protocol = "http" if not url: if protocol == "https" and self.port == 80: # TODO: hack self.log.debug( f"Protocol is https and port is 80, hardcoding " f"to port 443 for request for page {page}" ) port = 443 else: port = self.port # trim leading slash for ergonomics if page.startswith("/"): page = page[1:] # TODO: use urllib.parse.urljoin() url = f"{protocol}://{self.ip}:{port}/{page}" # TODO: add a lifetime to the cache if use_cache and self.page_cache.get(url): self.log.info(f"GET -> {url} (using cached response)") return self.page_cache[url] self.log.info(f"GET -> {url}") if not dev and self.dev: dev = self.dev if timeout is None: timeout = self.timeout try: req_ts = utils.utc_now() # rough timestamp of send time response: Response = self.session.get( url, timeout=timeout, params=params, auth=auth, **kwargs ) file_path = self._save_response_to_file(response, page, url, dev) if not allow_errors and response.status_code != 200: err = f"status code {response.status_code}" else: # Record rough timestamps of request and response response.request_timestamp = req_ts response.response_timestamp = req_ts + response.elapsed # Record where the file was saved as a Path object response.file_path = file_path self.page_cache[url] = response return response except Exception as ex: err = str(ex) self.log.warning(f"Failed to GET '{url}': {err}") return None
[docs] def post( self, url: str, timeout: float | None = None, dev: peat.data.models.DeviceData | None = None, use_cache: bool = False, **kwargs, ) -> Response | None: """ Perform a HTTP ``POST`` request and return the response. .. note:: The response object will have three additional attributes: ``request_timestamp``, ``response_timestamp``, ``file_path``. Args: url: URL to use for the request timeout: Timeout for the query. If :obj:`None`, the default timeout for this class instance is used instead. dev: DeviceData object to save files to use_cache: If the internal page cache should be used kwargs: Additional keyword arguments that will be passed directly to ``Requests.post()`` Returns: The response object, or :obj:`None` if the request failed. The response object will have three additional attributes: ``request_timestamp``, ``response_timestamp``, ``file_path``. """ # TODO: add a lifetime to the cache if use_cache and self.page_cache.get(url): self.log.info(f"POST -> {url} (using cached response)") return self.page_cache[url] self.log.info(f"POST -> {url}") if not dev and self.dev: dev = self.dev if timeout is None: timeout = self.timeout try: req_ts = utils.utc_now() # rough timestamp of send time response: Response = self.session.post(url, timeout=timeout, **kwargs) # Save the raw response text body to disk as an artifact parts = urllib.parse.urlparse(url) page = parts.path.strip("/") # handle paths with args, e.g. ?0x0D00 vs ?0X0000 are different if parts.query: page = f"{page}{parts.query}.html" file_path = self._save_response_to_file(response, page, url, dev) if response.status_code != 200: err = f"status code {response.status_code}" else: # Record rough timestamps of request and response response.request_timestamp = req_ts response.response_timestamp = req_ts + response.elapsed # Record where the file was saved as a Path object response.file_path = file_path self.page_cache[url] = response return response except Exception as ex: err = str(ex) self.log.warning(f"Failed to POST '{url}': {err}") return None
[docs] def get_ssl_certificate(self) -> peat.data.models.X509 | None: """ Retrieve and parse the server's SSL certificate. Returns: SSL certificate data in Elastic Common Schema (:term:`ECS`)-compliant format """ emsg = "Failed to get SSL certificate" try: # (01/04/2022) Workaround issues in newer Pythons (3.10+) # with ssl.get_server_certificate() when used with older # ICS/OT devices (in other words, what PEAT does). # # References: # https://stackoverflow.com/a/49132495 # https://stackoverflow.com/a/71007463 # # These code snippets may be useful in future: # context.options &= ~ssl.OP_NO_SSLv3 # context.check_hostname = False # context.verify_mode = ssl.CERT_NONE # context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.set_ciphers("DEFAULT") with socket.create_connection( address=(self.ip, self.port), timeout=self.timeout ) as connection: with context.wrap_socket(connection, server_hostname=self.ip) as ssl_sock: der_cert = ssl_sock.getpeercert(True) if not der_cert: self.log.warning(f"{emsg}: No certificate returned from server") return None raw_cert = ssl.DER_cert_to_PEM_cert(der_cert) # type: str except Exception as ex: self.log.exception(f"{emsg}: {ex}") return None if not raw_cert: self.log.warning(f"{emsg}: Empty certificate or no certificate was returned") return None decoded = self.decode_ssl_certificate(raw_cert) return self.parse_decoded_ssl_certificate(decoded[0], decoded[1])
[docs] def decode_ssl_certificate( self, source: str | bytes | Path ) -> tuple[dict[str, str | tuple | int], str]: """ Decode a raw SSL certificate retrieved from a server into a raw :class:`dict`. Args: source: SSL certificate in string or bytes format, or the file path to a certificate (as a :class:`~pathlib.Path` object). Returns: Decoded SSL certificate data as a :class:`dict` """ tmp_name_base = f"{self.ip.replace('.', '_')}_{self.port}" if isinstance(source, Path): raw = source.read_text(encoding="utf-8") path = source.resolve() else: raw = source if isinstance(raw, bytes): raw = raw.decode() f_name = f"{tmp_name_base}_raw-ssl-certificate.crt" if config.TEMP_DIR: path = utils.write_temp_file(raw, f_name) else: # Create a temporary directory to put the cert data to be parsed t_dir = tempfile.mkdtemp() path = Path(t_dir, f_name) path.write_text(raw, encoding="utf-8") # Source: https://stackoverflow.com/a/50072461 decoded = ssl._ssl._test_decode_cert(path) utils.write_temp_file(decoded, f"{tmp_name_base}_decoded-ssl-certificate.json") self.log.trace2(f"Decoded SSL cert\n{decoded}") return decoded, raw
[docs] def parse_decoded_ssl_certificate( self, decoded: dict[str, str | tuple | int], raw: str ) -> peat.data.models.X509: """ Parse a decoded SSL certificate into Elastic Common Schema (:term:`ECS`) format usable with the x509 data model (:class:`~peat.data.models.X509`). Args: decoded: Decoded SSL certificate, usually obtained from calling :meth:`~peat.protocols.http.decode_ssl_certificate`. raw: The original SSL certificate text Returns: SSL certificate data in Elastic Common Schema (:term:`ECS`)-compliant format """ serial_number = str(decoded.get("serialNumber", "")) serial_number = serial_number.strip().upper().replace(":", "") # NOTE: hashes will get generated by annotate() cert = peat.data.models.X509( original=raw, serial_number=serial_number, version_number=str(decoded.get("version", "")), not_after=(utils.parse_date(decoded["notAfter"]) if decoded.get("notAfter") else None), ) if decoded.get("notAfter"): cert.not_after = utils.parse_date(decoded["notAfter"]) if decoded.get("notBefore"): cert.not_before = utils.parse_date(decoded["notBefore"]) # Extract Issuer and Subject fields alternative_names = set() for group in ["issuer", "subject"]: if group not in decoded: continue entity = peat.data.models.CertEntity() for field in decoded[group]: if len(field) > 1 or len(field[0]) > 2: self.log.warning(f"Abnormal length for SSL field {field}") f_name = utils.convert_to_snake_case(field[0][0]) if "common" not in f_name and "distinguished" not in f_name: f_name = f_name.replace("_name", "") val = str(field[0][1]).strip() if hasattr(entity, f_name): setattr(entity, f_name, val) elif f_name == "email_address": alternative_names.add(val) else: self.log.warning( f"Skipping value '{f_name}' with value '{val}' for " f"'{group}' since it's not a valid CertEntity field" ) setattr(cert, group, entity) cert.alternative_names.extend(alternative_names) cert.annotate(None) self.log.trace2(f"Parsed SSL cert\n{cert}") return cert
[docs] @staticmethod def gen_soup(text: str | bytes) -> BeautifulSoup: """ Generate a BeautifulSoup instance from the text using the efficient ``lxml`` library if it's available or ``html.parser`` otherwise. Returns: A ``bs4.BeautifulSoup`` instance with the parser set to the value of :data:`peat.consts.BS4_PARSER` """ return BeautifulSoup(text, features=consts.BS4_PARSER)
[docs] @staticmethod def gen_session() -> Session: """ Session with SSL certificate verification disabled and no proxies from environment (e.g. ``http_proxy``/``https_proxy``). """ session = Session() session.verify = False session.trust_env = False return session
__all__ = ["HTTP"]