Source code for peat.api.pillage_api

import os
import shutil
import subprocess
import sys
import time
from pathlib import Path

# Allow module to be imported without errors
# on non-Linux systems (Windows and OSX).
try:
    import kmodpy

    KMODPY_INSTALLED = True
except (ImportError, OSError):
    KMODPY_INSTALLED = False

from peat import config, consts, log

# Keep track if the nbd module was already loaded when pillage started
# don't remove it if it was.  Keep the system in the same state when done
# pillaging
NBD_ALREADY_LOADED = False

# TODO: integrate pillage with parse. Pillage results should go in peat_results.


[docs] def pillage(source: str) -> bool: """ peat pillage. Args: config_file: Path to pillage config file source: Path of directory to pillage Returns: If pillaging was successful """ if not config.PILLAGE: log.error( "No Pillage config specified! Make sure the 'pillage' section " "is included in the peat config file!" ) return False results_path = Path("pillage_results") if not results_path.exists(): results_path.mkdir() log.info(f"Pillage source path: {source}") source_path = Path(source) # Validate source exists if source_path.exists(): if source_path.is_dir(): log.info(f"Pillaging directory {source}") return search(source_path, results_path) elif source_path.is_file(): # TODO: build in support for images made of up smaller image files if validate_image_mounting(): log.info(f"Pillaging image {source}") # Create temporary directory to mount image to in current directory temp_path = Path("pillage_temp") if temp_path.exists(): try: shutil.rmtree(temp_path) except OSError: unmount_image(temp_path) temp_path.mkdir() # Mount image log.debug(f"Mount image to {temp_path}") success = False if mount_image(source_path, temp_path): try: success = search(temp_path, results_path) except (Exception, KeyboardInterrupt) as ex: log.critical(f"Failed pillage search: {ex}") # remove Mount if unmount_image(temp_path): log.debug(f"Unmout image from '{temp_path}'") else: log.error("Failed to mount image") if temp_path.exists(): shutil.rmtree(temp_path) log.debug(f"Deleted temp directory '{temp_path}'") return success else: log.error("Unsupported source type") return False log.error(f"Source ({source}) does not exist") return False
[docs] def validate_image_mounting() -> bool: """Ensure all the requirements needed to mount an image are satisfied. In order to mount an image and search, certain conditions must be met: - PEAT must be run as ``root`` - PEAT must be run on a Linux system and not on Windows Subsystem for Linux (WSL) - ``kmodpy`` Python package must be installed - ``qemu-nbd`` must be installed and available in the system PATH Returns: If the requirements for mounting are met """ if consts.WSL or not consts.LINUX: log.error( "To mount an image for searching, you must be running on Linux " "(Windows, WSL, or OSX are not supported at this time)" ) return False elif os.geteuid() != 0: log.error("To mount an image for searching, you must be running as root") elif not KMODPY_INSTALLED: log.error("To mount an image for searching, you must have kmodpy installed") elif not shutil.which("qemu-nbd"): log.error( "Could not find 'qemu-nbd', which is required to mount a disk " "image. To fix this, install the 'qemu-utils' package: " "'sudo apt-get install qemu-utils'" ) else: return True return False
[docs] def mount_image(source: Path, mount_point: Path) -> bool: global NBD_ALREADY_LOADED if "nbd" not in list_loaded_modules(): try: # Equivalent of "modprobe nbd" kmodpy.Kmod().modprobe("nbd") except Exception as e: log.error(f"Failed to load 'nbd' kernel module: {e}") return False else: NBD_ALREADY_LOADED = True nbd_args = ["qemu-nbd", "-r", "-c", "/dev/nbd1", str(source)] nbd_result = subprocess.call(nbd_args) if nbd_result != 0: log.error(f"Failed to run '{' '.join(nbd_args)}' (return code: {nbd_result})") remove_nbd_module() return False # Give the system time to mount device before attempting to mount to folder time.sleep(0.5) mnt_args = ["mount", "-o", "ro", "/dev/nbd1p1", str(mount_point)] mnt_result = subprocess.call(mnt_args) if mnt_result != 0: log.error(f"Failed to run '{' '.join(mnt_args)}' (return code: {mnt_result})") remove_nbd_device() remove_nbd_module() return False return True
[docs] def unmount_image(mount_point: Path) -> bool: result = True umount_args = ["umount", str(mount_point)] retval = subprocess.call(umount_args) if retval != 0: log.error(f"Failed to run '{' '.join(umount_args)}' (return code: {retval})") result &= False result &= remove_nbd_device() result &= remove_nbd_module() return True
[docs] def remove_nbd_module() -> bool: if not NBD_ALREADY_LOADED: try: kmodpy.Kmod().rmmod("nbd") except Exception as e: log.error(f"Failed to remove 'nbd' module: {e}") return False return True
[docs] def remove_nbd_device() -> bool: nbd_args = ["qemu-nbd", "-d", "/dev/nbd1"] retval = subprocess.call(nbd_args) if retval != 0: log.error(f"Failed to run '{' '.join(nbd_args)}' (return code: {retval})") return False return True
[docs] def list_loaded_modules() -> list[str]: result = [] km = kmodpy.Kmod() for mod in km.list(): result.append(mod[0].decode("utf-8")) return result
# TODO: function not used yet, still need to decide how to search through specific locations # How to search through specific directories? Only search those listed? # Search through all but only validate a file if its one of those specified? # How to handle if one brand says search through everything and ones # says only a specific directory? # Searching through everything and then validating at a file level # would be easiest to implement but not be the quickest way to search. # If only a single location is specified that search will take the same # amount of time as a search with all directories. # def create_location_list(): # locations = [] # all_dirs = False # if 'brands' in config.PILLAGE: # for brand_data in config.PILLAGE['brands'].values(): # if 'locations' in brand_data: # if len(brand_data['locations']) > 0: # locations.append(brand_data['locations']) # else: # # if there is a location entry but no locations defined then look # # through all directories # all_dirs = True # if 'default' in config.PILLAGE: # if 'locations' in config.PILLAGE['default']: # if len(config.PILLAGE['default']) > 0: # locations.append(config.PILLAGE['default']['locations']) # else: # # if there is a location entry but no locations defined then look # # through all directories # all_dirs = True # return all_dirs, locations
[docs] def copy_file(src: Path, dst: Path) -> None: if not dst.parent.exists(): dst.parent.mkdir(exist_ok=True, parents=True) shutil.copyfile(str(src), str(dst)) log.info(f"[Pillage] Copied {src} to {dst}")
[docs] def is_valid_file(_file: Path): """ Determine if a file is valid based on the search criteria in the Pillage configuration. """ brand_list = [] valid_file = False config_type = "" if "brands" in config.PILLAGE: for brand, brand_data in config.PILLAGE["brands"].items(): valid_file, condition = check_file_conditions(_file, brand_data) if valid_file: brand_list.append((brand, condition)) if len(brand_list) > 0: brand_str = "" for brand, condition in brand_list: brand_str += f"{brand} ({condition}), " log.info( f"[Pillage] {_file} is valid for the following " f"brands and conditions: {brand_str[:-2]}" ) valid_file = True config_type = brand_list[0][0] if len(brand_list) > 1: config_type = "MULTIPLE" # If a valid file wasn't found using the specific brand conditions check for default files if not valid_file and "default" in config.PILLAGE: valid_file, condition = check_file_conditions(_file, config.PILLAGE["default"]) if valid_file: log.info(f"[Pillage] {_file} is valid for DEFAULT condition {condition}") config_type = "DEFAULT" return valid_file, config_type
[docs] def check_file_conditions(_file: Path, conditions: dict[str, list]) -> tuple: found = False condition_type = "" if conditions.get("filenames"): for cond_fname in conditions["filenames"]: if _file.name.lower() == cond_fname.lower(): condition_type = "FILENAME" found = True break if not found and conditions.get("extensions"): for cond_ext in conditions["extensions"]: if _file.suffix[1:].lower() == cond_ext.lower(): condition_type = "EXTENSION" found = True break # TODO: Check for magic number return found, condition_type