import os
import shutil
import subprocess
import sys
import time
from pathlib import Path
# Allow module to be imported without errors
# on non-Linux systems (Windows and OSX).
try:
import kmodpy
KMODPY_INSTALLED = True
except (ImportError, OSError):
KMODPY_INSTALLED = False
from peat import config, consts, log
# Keep track if the nbd module was already loaded when pillage started
# don't remove it if it was. Keep the system in the same state when done
# pillaging
NBD_ALREADY_LOADED = False
# TODO: integrate pillage with parse. Pillage results should go in peat_results.
[docs]
def pillage(source: str) -> bool:
"""
peat pillage.
Args:
config_file: Path to pillage config file
source: Path of directory to pillage
Returns:
If pillaging was successful
"""
if not config.PILLAGE:
log.error(
"No Pillage config specified! Make sure the 'pillage' section "
"is included in the peat config file!"
)
return False
results_path = Path("pillage_results")
if not results_path.exists():
results_path.mkdir()
log.info(f"Pillage source path: {source}")
source_path = Path(source)
# Validate source exists
if source_path.exists():
if source_path.is_dir():
log.info(f"Pillaging directory {source}")
return search(source_path, results_path)
elif source_path.is_file():
# TODO: build in support for images made of up smaller image files
if validate_image_mounting():
log.info(f"Pillaging image {source}")
# Create temporary directory to mount image to in current directory
temp_path = Path("pillage_temp")
if temp_path.exists():
try:
shutil.rmtree(temp_path)
except OSError:
unmount_image(temp_path)
temp_path.mkdir()
# Mount image
log.debug(f"Mount image to {temp_path}")
success = False
if mount_image(source_path, temp_path):
try:
success = search(temp_path, results_path)
except (Exception, KeyboardInterrupt) as ex:
log.critical(f"Failed pillage search: {ex}")
# remove Mount
if unmount_image(temp_path):
log.debug(f"Unmout image from '{temp_path}'")
else:
log.error("Failed to mount image")
if temp_path.exists():
shutil.rmtree(temp_path)
log.debug(f"Deleted temp directory '{temp_path}'")
return success
else:
log.error("Unsupported source type")
return False
log.error(f"Source ({source}) does not exist")
return False
[docs]
def validate_image_mounting() -> bool:
"""Ensure all the requirements needed to mount an image are satisfied.
In order to mount an image and search, certain conditions must be met:
- PEAT must be run as ``root``
- PEAT must be run on a Linux system and not on Windows Subsystem for Linux (WSL)
- ``kmodpy`` Python package must be installed
- ``qemu-nbd`` must be installed and available in the system PATH
Returns:
If the requirements for mounting are met
"""
if consts.WSL or not consts.LINUX:
log.error(
"To mount an image for searching, you must be running on Linux "
"(Windows, WSL, or OSX are not supported at this time)"
)
return False
elif os.geteuid() != 0:
log.error("To mount an image for searching, you must be running as root")
elif not KMODPY_INSTALLED:
log.error("To mount an image for searching, you must have kmodpy installed")
elif not shutil.which("qemu-nbd"):
log.error(
"Could not find 'qemu-nbd', which is required to mount a disk "
"image. To fix this, install the 'qemu-utils' package: "
"'sudo apt-get install qemu-utils'"
)
else:
return True
return False
[docs]
def mount_image(source: Path, mount_point: Path) -> bool:
global NBD_ALREADY_LOADED
if "nbd" not in list_loaded_modules():
try:
# Equivalent of "modprobe nbd"
kmodpy.Kmod().modprobe("nbd")
except Exception as e:
log.error(f"Failed to load 'nbd' kernel module: {e}")
return False
else:
NBD_ALREADY_LOADED = True
nbd_args = ["qemu-nbd", "-r", "-c", "/dev/nbd1", str(source)]
nbd_result = subprocess.call(nbd_args)
if nbd_result != 0:
log.error(f"Failed to run '{' '.join(nbd_args)}' (return code: {nbd_result})")
remove_nbd_module()
return False
# Give the system time to mount device before attempting to mount to folder
time.sleep(0.5)
mnt_args = ["mount", "-o", "ro", "/dev/nbd1p1", str(mount_point)]
mnt_result = subprocess.call(mnt_args)
if mnt_result != 0:
log.error(f"Failed to run '{' '.join(mnt_args)}' (return code: {mnt_result})")
remove_nbd_device()
remove_nbd_module()
return False
return True
[docs]
def unmount_image(mount_point: Path) -> bool:
result = True
umount_args = ["umount", str(mount_point)]
retval = subprocess.call(umount_args)
if retval != 0:
log.error(f"Failed to run '{' '.join(umount_args)}' (return code: {retval})")
result &= False
result &= remove_nbd_device()
result &= remove_nbd_module()
return True
[docs]
def remove_nbd_module() -> bool:
if not NBD_ALREADY_LOADED:
try:
kmodpy.Kmod().rmmod("nbd")
except Exception as e:
log.error(f"Failed to remove 'nbd' module: {e}")
return False
return True
[docs]
def remove_nbd_device() -> bool:
nbd_args = ["qemu-nbd", "-d", "/dev/nbd1"]
retval = subprocess.call(nbd_args)
if retval != 0:
log.error(f"Failed to run '{' '.join(nbd_args)}' (return code: {retval})")
return False
return True
[docs]
def list_loaded_modules() -> list[str]:
result = []
km = kmodpy.Kmod()
for mod in km.list():
result.append(mod[0].decode("utf-8"))
return result
[docs]
def search(source: Path, results: Path) -> bool:
if not source.is_dir():
log.error(f"Failed pillage search: source path '{source}' is not a valid directory.")
return False
excluded = [
"Windows",
"System Volume Information",
"PerfLogs",
"MSOCache",
"cygwin64",
"Recovery",
"Boot",
"$Recycle.Bin",
"bootmgr",
]
exclude_counter = 0
for root, _dirnames, filenames in os.walk(str(source)):
log.trace2(f"Searching in: {root}")
parts = root.split(os.sep)
if len(parts) >= 2 and parts[1] in excluded:
log.trace2(f"Skipping excluded dir '{root}'")
exclude_counter += 1
continue
for filename in filenames:
src_file = Path(root) / filename
is_valid, config_type = is_valid_file(src_file)
if is_valid:
dst_file = Path(results) / config_type / filename
# If a file with that name already exists in the results folder
# increment a counter and append that counter to the file name
name_only = dst_file.stem
ext_only = dst_file.suffix
count = 1
while dst_file.exists():
dst_file = Path(results) / config_type / f"{name_only}{ext_only}.{count}"
count += 1
# Prompt user if they want to copy each file
if not config.PILLAGE["auto_copy"]:
auto_copy = input(
f"Found {src_file}, do you want to copy it to {dst_file} (Y or N):"
)
print(auto_copy, file=sys.stderr, flush=True) # noqa: T201
if auto_copy == "Y":
copy_file(src_file, dst_file)
else:
log.warning(
f"[Pillage] Declined to copy {src_file} to "
f"{dst_file} (auto_copy is false)"
)
else:
copy_file(src_file, dst_file)
if not config.PILLAGE["recursive"]:
break
if exclude_counter:
log.info(f"Skipped {exclude_counter} excluded subdirectories")
return True
# TODO: function not used yet, still need to decide how to search through specific locations
# How to search through specific directories? Only search those listed?
# Search through all but only validate a file if its one of those specified?
# How to handle if one brand says search through everything and ones
# says only a specific directory?
# Searching through everything and then validating at a file level
# would be easiest to implement but not be the quickest way to search.
# If only a single location is specified that search will take the same
# amount of time as a search with all directories.
# def create_location_list():
# locations = []
# all_dirs = False
# if 'brands' in config.PILLAGE:
# for brand_data in config.PILLAGE['brands'].values():
# if 'locations' in brand_data:
# if len(brand_data['locations']) > 0:
# locations.append(brand_data['locations'])
# else:
# # if there is a location entry but no locations defined then look
# # through all directories
# all_dirs = True
# if 'default' in config.PILLAGE:
# if 'locations' in config.PILLAGE['default']:
# if len(config.PILLAGE['default']) > 0:
# locations.append(config.PILLAGE['default']['locations'])
# else:
# # if there is a location entry but no locations defined then look
# # through all directories
# all_dirs = True
# return all_dirs, locations
[docs]
def copy_file(src: Path, dst: Path) -> None:
if not dst.parent.exists():
dst.parent.mkdir(exist_ok=True, parents=True)
shutil.copyfile(str(src), str(dst))
log.info(f"[Pillage] Copied {src} to {dst}")
[docs]
def is_valid_file(_file: Path):
"""
Determine if a file is valid based on the search criteria
in the Pillage configuration.
"""
brand_list = []
valid_file = False
config_type = ""
if "brands" in config.PILLAGE:
for brand, brand_data in config.PILLAGE["brands"].items():
valid_file, condition = check_file_conditions(_file, brand_data)
if valid_file:
brand_list.append((brand, condition))
if len(brand_list) > 0:
brand_str = ""
for brand, condition in brand_list:
brand_str += f"{brand} ({condition}), "
log.info(
f"[Pillage] {_file} is valid for the following "
f"brands and conditions: {brand_str[:-2]}"
)
valid_file = True
config_type = brand_list[0][0]
if len(brand_list) > 1:
config_type = "MULTIPLE"
# If a valid file wasn't found using the specific brand conditions check for default files
if not valid_file and "default" in config.PILLAGE:
valid_file, condition = check_file_conditions(_file, config.PILLAGE["default"])
if valid_file:
log.info(f"[Pillage] {_file} is valid for DEFAULT condition {condition}")
config_type = "DEFAULT"
return valid_file, config_type
[docs]
def check_file_conditions(_file: Path, conditions: dict[str, list]) -> tuple:
found = False
condition_type = ""
if conditions.get("filenames"):
for cond_fname in conditions["filenames"]:
if _file.name.lower() == cond_fname.lower():
condition_type = "FILENAME"
found = True
break
if not found and conditions.get("extensions"):
for cond_ext in conditions["extensions"]:
if _file.suffix[1:].lower() == cond_ext.lower():
condition_type = "EXTENSION"
found = True
break
# TODO: Check for magic number
return found, condition_type