Source code for peat.settings_manager

import json
import os
from collections import ChainMap
from pathlib import Path
from types import UnionType  # NOTE: won't be needed once minimum python is 3.14
from typing import Any, Union, get_args, get_origin, get_type_hints

import yaml
from loguru import logger as log

from .consts import PeatError, convert, lower_dict, str_to_bool


# Source: https://github.com/python-discord/bot
[docs] def _env_var_constructor(loader, node): """ Implements a custom YAML tag for loading optional environment variables. If the environment variable is set, returns the value of it. Otherwise, returns :obj:`None`. Example usage in the YAML configuration: .. code-block:: yaml key: !ENV 'MY_APP_KEY' """ default = None # Check if the node is a plain string value if node.id == "scalar": value = loader.construct_scalar(node) key = str(value) else: # The node value is a list value = loader.construct_sequence(node) if len(value) >= 2: # If we have at least two values, then we have both a key and a default value default = value[1] key = value[0] else: # Otherwise, we just have a key key = value[0] return os.getenv(key, default)
[docs] def _join_var_constructor(loader, node): """ Implements a custom YAML tag for concatenating other tags in the document to strings. This allows for a much more DRY (Don't Repeat Yourself) configuration file. """ return "".join(str(x) for x in loader.construct_sequence(node))
yaml.SafeLoader.add_constructor("!ENV", _env_var_constructor) yaml.SafeLoader.add_constructor("!JOIN", _join_var_constructor)
[docs] class SettingsManager(dict): """ Stores and manages configuration values from multiple sources. Typical usage is to subclass this class and configure the possible variables and default values as class attributes, much like :mod:`dataclasses`. .. warning:: All class attributes MUST have a type! Otherwise, they will be skipped over and not appear in the list of defaults. This is due to the implementation being a hack on top of ``__annotations__``. Order of precedence for configurations - Runtime changes (example: ``config.DEBUG = 2``), including CLI arguments - Environment variables (example: ``export PEAT_DEBUG=2``) - Configuration file (YAML or JSON) - Default values set in subclasses of this class (example: ``DEBUG: int = 0``) Precedence is managed by a :class:`collections.ChainMap`. Args: label: The type of information being stored env_prefix: Prefix to use for environment variables init_env: Load values from environment variables during object initialization """ def __init__(self, label: str, env_prefix: str, init_env: bool = True) -> None: # Initialize the "dict" parent class super().__init__() # Label is used to refer to this instance self["label"] = label # NOTE(cegoes): dict preserves order in CPython 3.6+ and Python 3.7+ self["runtime_configs"] = {} self["env_configs"] = {} self["file_configs"] = {} # Use the class's variable attributes and annotations for defaults, # much like a dataclass (In fact, it's exactly what a dataclass does!). # https://github.com/python/cpython/blob/3.7/Lib/dataclasses.py#L828 annotations: dict = getattr(self, "__annotations__", {}) defaults: dict = { anno_name: dict.__getattribute__(self, anno_name) for anno_name in annotations.keys() } self["default_configs"] = defaults # Set the environment variable prefix self.env_prefix: str = env_prefix # ChainMap dynamically manages the lookup order. Changes to the # underlying objects will be immediately reflected in the map. self["CONFIG"] = ChainMap( self["runtime_configs"], self["env_configs"], self["file_configs"], self["default_configs"], ) # NOTE: this must occur AFTER the ChainMap (self["CONFIG"]) has been set if init_env: self.load_from_environment(env_prefix=env_prefix)
[docs] def _load_values(self, conf: dict[str, Any], load_to: str, key_prefix: str = "") -> None: """ Read and set configuration values from a input dictionary. .. note:: Any values that are :obj:`None` are skipped and will NOT be loaded Args: conf: Configuration to load load_to: Where in the Settings object the loaded configuration should be stored. Valid options are: ``runtime_configs``, ``env_configs``, and ``file_configs``. key_prefix: Optional value to prepend to the keys being looked up. Example use case is for loading environment variables prefixed with ``PEAT_``, where ``config=dict(os.environ)``. """ # Convert input config keys to upper case # for consistent case-insensitive key lookups. upper_conf = {k.upper(): v for k, v in conf.items()} # type: dict[str, Any] # Check if each possible option is in the input object, # since the input set is likely larger than the default set. # Furthermore, we do NOT want to accidentally add new # options that are not in the defaults to the object. for set_key in self["default_configs"].keys(): key = f"{key_prefix}{set_key}".upper() # Skip values that are None if upper_conf.get(key) is not None: try: self[load_to][set_key] = self.typecast(set_key, upper_conf[key]) except Exception as ex: log.critical(f"Failed to load config '{key}': {ex}") # !! Hack to make metadata directory configuration seamless and flexible !! if self["label"] == "configuration" and upper_conf.get("OUT_DIR"): self.OUT_DIR = self.OUT_DIR if self["label"] == "configuration" and upper_conf.get("RUN_DIR"): self.RUN_DIR = self.RUN_DIR
[docs] def load_from_dict(self, conf: dict[str, Any]) -> None: """ Update runtime configuration values from a dictionary. Args: conf: Configuration values to load Raises: AttributeError: If a configuration option in ``conf`` is not already defined on the class """ self._load_values(conf=conf, load_to="runtime_configs")
[docs] def load_from_environment(self, env_prefix: str | None = None) -> None: """ Update configuration values from environment variables. Args: env_prefix: String prefixing the environment variable names, e.g. ``PEAT_`` to load variables such as ``PEAT_DEBUG`` into ``DEBUG``. If :obj:`None`, then this is set to ``self.env_prefix``. Raises: AttributeError: If a configuration option in the environment is not already defined on the class """ if env_prefix is None: env_prefix = self.env_prefix self._load_values(conf=dict(os.environ), load_to="env_configs", key_prefix=env_prefix)
[docs] def load_from_file(self, file: Path) -> bool: """ Load stored values from a YAML or JSON file. Note that these settings can be overridden by environment variables or values set at runtime. Args: file: Path to a YAML or JSON file to load settings from Returns: If the load was successful Raises: AttributeError: If a configuration option loaded from the file is not already defined on the class """ log.info(f"Loading configuration from file '{file.name}'...") if not file.is_file(): log.error(f"Configuration file '{file.name}' is not a file or does not exist") return False if file.suffix.lower() in [".yml", ".yaml"]: log.debug(f"Loading configuration from YAML file '{file.name}'") with file.open(encoding="utf-8") as yaml_file: file_config = yaml.safe_load(yaml_file) elif file.suffix.lower() == ".json": log.debug(f"Loading configuration from JSON file '{file.name}'") with file.open(encoding="utf-8") as json_file: file_config = json.load(json_file) else: log.error( f"Unknown extension '{file.suffix}' for configuration file " f"'{file.name}', it should be '.json', '.yaml', or '.yml'. " f"You might have accidentally selected the wrong file." ) return False # Legacy config structure that allowed multi-app configs (other tools) if "PEAT" in file_config: file_config = file_config["PEAT"] self._load_values(file_config, load_to="file_configs") return True
[docs] def save_to_file(self, outdir: Path, save_yaml: bool = True, save_json: bool = True) -> None: """ Save the currently stored values to YAML and JSON files. Args: outdir: Directory path to save the files to save_yaml: set to False to disable YAML file saving save_json: if settings should be saved as JSON """ if not save_yaml and not save_json: raise PeatError("Either save_yaml or save_json must be true for save_to_file") if save_yaml: yaml_file = outdir / f"peat_{self['label']}.yaml" else: yaml_file = None if save_json: json_file = outdir / f"peat_{self['label']}.json" else: json_file = None # Hack to ensure the "state" and "configuration" files get included # in the set of files written by PEAT. # NOTE: this is done before the files are written to ensure # state.written_files includes the state paths as well. try: import peat if yaml_file: peat.state.written_files.add(yaml_file.as_posix()) if json_file: peat.state.written_files.add(json_file.as_posix()) except ImportError: pass # YAML format if yaml_file: if yaml_file.is_file(): log.warning( f"YAML {self['label'].capitalize()} file already exists " f"at {yaml_file.name}, overwriting existing data..." ) elif not yaml_file.parent.exists(): yaml_file.parent.mkdir(parents=True, exist_ok=True) # NOTE: newline argument to Path.write_text() requires Python 3.10+ with yaml_file.open("w", encoding="utf-8", newline="\n") as outfile: outfile.write(self.yaml()) # JSON format if json_file: data_to_save = self.export() # type: dict if json_file.is_file(): log.warning( f"JSON {self['label'].capitalize()} file already exists " f"at {json_file.name}, overwriting existing data..." ) elif not json_file.parent.exists(): json_file.parent.mkdir(parents=True, exist_ok=True) with json_file.open("w", encoding="utf-8", newline="\n") as outfile: json.dump(data_to_save, outfile, indent=4)
[docs] def export(self) -> dict[str, Any]: """ Current values in a deterministic format that can be exported. Returns: JSON-serializable :class:`dict` with uppercase keys, sorted "alphabetically" by key (well, technically UNICODE order). """ dict_config = dict(self.json_dict()) sorted_config = sorted(dict_config.items(), key=lambda x: str(x[0])) return dict(sorted_config)
[docs] def yaml(self) -> str: """ Export the current settings as YAML text. """ return yaml.dump(lower_dict(self.export()), line_break="\n")
[docs] def json(self) -> str: """ Export the current settings as JSON text. """ return json.dumps(self.export(), indent=4)
[docs] def json_dict(self, include_none_vals: bool = False) -> dict[str, Any]: """ Convert the current settings to a JSON dictionary. Returns: The current setting values as a JSON-serializable :class:`dict` with uppercase keys. """ return { key.upper(): self._serialize_value(value) for key, value in self["CONFIG"].items() if include_none_vals or value is not None # Strip Nones }
[docs] def get_serialized_value(self, item: str) -> Any: """ Get a configuration value in a JSON-serializable format. Args: item: Case-sensitive name of the attribute to get Returns: The configuration value in a JSON-serializable format Raises: KeyError: If the attribute named by ``item`` doesn't exist """ return self._serialize_value(self["CONFIG"][item])
[docs] @staticmethod def _serialize_value(value: Any) -> Any: # This allows nesting of SettingsManager instances as values if isinstance(value, SettingsManager): return value.export() else: return convert(value)
[docs] def typecast(self, key: str, value: Any) -> Any: """ Convert and store a value as the appropriate Python data type. Store the variable as the appropriate Python data type, such as bool, int, float, str, Path, list, etc. This converts "0.5" to 0.5, "/home/" to Path("/home/"), etc. If the type is properly annotated (e.g. VAR: str = 'stuff'), then we use the annotation, otherwise try to infer the type from the default value. However, the backup method does not work if the default value is :obj:`None`. Args: key: Case-sensitive name of the value (what attribute will be changed) value: The raw value to typecast (e.g. a string from an environment variable) Returns: The typecasted value as a valid Python datatype matching the annotation Raises: KeyError: If the attribute named by ``key`` doesn't exist """ fallback: type = type(self["default_configs"][key]) typecast: type = get_type_hints(self.__class__).get(key, fallback) # Handle complex types, e.g. typing.Union, typing.List, typing.Set, etc. # Resolves the type to it's container class, e.g: # get_origin(typing.List) => list # get_origin(typing.Union[str, Path]) => typing.Union og = get_origin(typecast) # Resolves arguments to the type, e.g: # get_args(Union[str, Path]) => (str, Path) # get_args(Optional[Union[Path, str]]) => (Path, str, None) args = get_args(typecast) if og and (og is Union or og is UnionType): if type(None) in args and value is None: # Value is None, don't accidentally return the string "None" return None elif Path in args: # If Path is in a Union, make it the typecast typecast = Path else: # Set typecast to the first type class in a Union that isn't "None" typecast = next(iter(filter(lambda x: not isinstance(x, type(None)), args))) # The typing container is a base Python type, e.g. list, set, dict, etc. elif og and isinstance(og, type): # TODO: typecast items in a container, e.g. a list of path strings # should be type-casted to a list of Path objects typecast = og elif og: log.warning( f"Unknown type container '{og}' for setting '{key}', " f"type casting to '{fallback}' as a fallback " f"(value being typecast: '{repr(value)}')" ) typecast = fallback # Expand paths if typecast is Path and isinstance(value, str): if value == "": # If value is empty string, don't set the path casted = "" else: casted = Path(os.path.realpath(os.path.expanduser(value))) # Handle boolean strings (e.g environment variable "true") elif typecast is bool and isinstance(value, str): # Accepts: yes, no, true, false, 0, 1 (case-insensitive) casted = str_to_bool(value) else: casted = typecast(value) # type: ignore return casted
[docs] def non_default(self, key: str) -> bool: """ If an item was sourced by a non-default method (env, file, runtime). Args: key: Name of the item to check Returns: If the item has a value that *overrides* the default value. Note that this method will also return :class:`False` if the key isn't valid. """ return any(key in self[k] for k in ["runtime_configs", "env_configs", "file_configs"])
[docs] def is_default_value(self, key: str) -> bool: """ If an item's current value matches the default value. Args: key: Name of the item to check Returns: If the item has a value that *matches* the default value Raises: AttributeError: If the attribute named by ``key`` doesn't exist """ return getattr(self, key) == self["default_configs"][key]
[docs] def fixup_dirs( self, new_parent: str | Path | None, dir_name: str, override_all: bool = False, ) -> None: if dir_name == "OUT_DIR": dirs = ["RUN_DIR"] elif dir_name == "RUN_DIR": dirs = [ "DEVICE_DIR", "ELASTIC_DIR", "META_DIR", "LOG_DIR", "SUMMARIES_DIR", "TEMP_DIR", "ZEEK_LOGDIR", "HEAT_ARTIFACTS_DIR", ] else: raise ValueError(f"invalid dir_name {dir_name}") for d in dirs: if new_parent is None and override_all: setattr(self, d, None) continue # Only change values that are still at their default values if not override_all and self.non_default(d): continue if new_parent is None: new_path = None else: old = getattr(self, d) # type: Path new_path = Path(os.path.realpath(Path(new_parent, old.name))) setattr(self, d, new_path)
def __getattribute__(self, item: str) -> Any: if not item.startswith("__") and item in self["CONFIG"]: return self["CONFIG"][item] else: return dict.__getattribute__(self, item) def __setattr__(self, key: str, value: Any) -> None: # !! Hack to make metadata directory configuration seamless and flexible !! if self["label"] == "configuration": if key in ["OUT_DIR", "RUN_DIR"]: self.fixup_dirs(value, key) # This makes class attribute assignments put stuff in the # runtime configs, which makes "config.DEBUG = 1" equivalent # to "config["runtime_configs"]["DEBUG"] = 1". self["runtime_configs"][key] = value
__all__ = ["SettingsManager"]