#!/usr/bin/env python
"""
This module contains the class enable the ``vm_resource_handler`` to
run. This runs as a process for each VM that is launched with FIREWHEEL and
controls the interaction with the QEMU Guest Agent.
"""
import os
import sys
import json
import time
import random
import socket
import asyncio
import inspect
import contextlib
import importlib.util
from queue import Queue, PriorityQueue
from pathlib import Path, PureWindowsPath
from datetime import datetime, timedelta
from threading import Timer, Thread, Condition
from firewheel.config import config as global_config
from firewheel.lib.log import UTCLog
from firewheel.lib.minimega.api import minimegaAPI
from firewheel.vm_resource_manager import api, utils
from firewheel.control.repository_db import RepositoryDb
from firewheel.vm_resource_manager.vm_mapping import VMMapping
from firewheel.vm_resource_manager.schedule_db import ScheduleDb
from firewheel.vm_resource_manager.schedule_event import (
ScheduleEvent,
ScheduleEventType,
)
from firewheel.vm_resource_manager.abstract_driver import AbstractDriver
from firewheel.vm_resource_manager.schedule_updater import ScheduleUpdater
from firewheel.vm_resource_manager.vm_resource_store import VmResourceStore
[docs]
class VMResourceHandler:
"""
Main class for communicating with a VM. Kicks off the ScheduleUpdater thread
and handles all ScheduleEvents and their side effects.
"""
[docs]
def __init__(self, config, check_interval=10):
"""
Args:
config (dict): VM configuration.
check_interval (int): Seconds between checks for an updated schedule
This gets passed to the ScheduleUpdater. Defaults to 10.
"""
# Class variable initialization
self.log_directory = (
Path(global_config["logging"]["root_dir"])
/ global_config["logging"]["vmr_log_dir"]
)
self.driver_directory = Path(__file__).resolve().parent / "drivers"
self.driver = None
self.experiment_start_time = None
self.current_time = None
self.state = None
# Make sure the directory for logs exists
self.log_directory.mkdir(exist_ok=True, parents=True)
self.config = config
# Set up the logging file
self.log_filename = self.log_directory / (f"{self.config['vm_name']}.log")
self.json_log_filename = str(
self.log_directory / (f"{self.config['vm_name']}.json")
)
# We want VM Resource logs to be in UTC
log_format = "[%(asctime)s %(levelname)s] %(message)s"
log_level = global_config["logging"]["level"]
self.log = UTCLog(
"VMResourceHandler",
log_file=self.log_filename,
log_format=log_format,
log_level=log_level,
).log
self.json_log = UTCLog(
"VMResourceHandlerJSON",
log_file=self.json_log_filename,
log_format="%(message)s",
log_level=log_level,
).log
self.log.info("Starting RESOURCE HANDLER")
self.log.info("Using Python %s", sys.version)
self.log.info(self.config)
# Start the current time to be the most negative
self.initial_time = -(sys.maxsize - 1)
self.current_time = self.initial_time
# Get a handle to the vm_resources store
self.vm_resource_store = VmResourceStore()
# Get a handle to the ScheduleDb
self.schedule_db = ScheduleDb(log=self.log)
# Get a handle to vm_mapping
self.vm_mapping = VMMapping()
# Get a handle to repository_db
self.repository_db = RepositoryDb()
# Priority Queue to hold on to ScheduleEvents
self.prior_q = PriorityQueue()
self.mma = minimegaAPI()
self.load_balance_factor = self.mma.get_cpu_commit_ratio() + 1
self.log.info("Using load_balance_factor of %s", self.load_balance_factor)
# Kick off the schedule updater thread that
# periodically checks for schedule updates and
# the experiment start time. Updates get placed
# in the priority queue
self.condition = Condition()
self.schedule_updater = ScheduleUpdater(
self.config,
self.prior_q,
self.condition,
self.vm_resource_store,
self.schedule_db,
self.repository_db,
self.log,
self.log_filename,
self.load_balance_factor,
check_interval,
)
# Make sure the path is available
socket_path = Path(self.config["path"])
try:
os.stat(socket_path)
except FileNotFoundError:
self.log.debug("Waiting for path: %s", socket_path)
time.sleep(self.load_balance_factor * 1)
except PermissionError:
self.log.info(
"PermissionError: Trying to update permissions for %s through minimega",
socket_path,
)
parent_dir = socket_path.parent
self.mma.set_group_perms(parent_dir)
self.log.debug("Found path")
# load the driver for the virtualization engine
try:
self.driver_class = self.import_driver()
except Exception as exp: # noqa: BLE001
self.log.exception(exp)
connected = self.connect_to_driver()
if connected:
self.set_state("configuring")
else:
sys.exit(1)
# Don't kick off the updater until we're connected to the VM
self.schedule_updater.start()
# Grab the target OS
self.target_os = self.driver.get_os()
# Windows allow QGA connections before the system is actually fully functional.
# Because the resource handler starts executing tasks on connect, it is important
# that the VM is fully functional first. Without this, Windows VMs will not
# be ready to have processes modifying disk and issues could arise.
if "Windows" in self.target_os:
time.sleep(self.load_balance_factor * 10)
self.log.info("Setting time")
# Set the time in the VM
self.driver.set_time()
self.log.info("Done setting time")
[docs]
def connect_to_driver(self):
"""
Instantiate the driver class to communicate to the VM
Returns:
bool: True if the driver is connected, False otherwise.
"""
if self.driver:
self.driver.close()
while True:
try:
if not self.driver:
# Establish connectivity to the VM
self.log.info("New driver connection")
self.driver = self.driver_class(self.config, self.log)
else:
# Reestablish connectivity to the VM
self.log.info("Resetting driver connection")
sync = self.driver.connect()
self.log.info("Synced: %s", sync)
return True
except Exception as exp: # noqa: BLE001
# Sleep another timeout amount of time
self.log.exception(exp)
time.sleep(
self.load_balance_factor * random.SystemRandom().randint(3, 10)
)
[docs]
def run(self):
"""
Run the VMResourceHandler.
"""
try:
self.log.info("VmResourceHandler: Starting the _run function")
self._run()
self.log.info("VmResourceHandler: Finished the _run function.")
except Exception as exp: # noqa: BLE001
self.log.info("VmResourceHandler: Stopping due to an exception.")
self.log.exception(exp)
finally:
self.log.info("VmResourceHandler: Exiting.")
[docs]
def _run(self):
"""
This is the main processing loop for the VMResourceHandler.
It kicks off the ScheduleUpdater thread and threads to run the individual
vm_resources.
"""
self.preload_files()
while True:
# This function will block if there are not events
# in the priority queue
events = self.get_events()
threads = []
reboot_queue = Queue()
for event in events:
if event.get_type() == ScheduleEventType.EXPERIMENT_START_TIME_SET:
self.log.debug("PROCESSING EXPERIMENT START EVENT")
# Set the experiment start time
self.experiment_start_time = event.get_data()
elif event.get_type() == ScheduleEventType.EMPTY_SCHEDULE:
# If there is an empty schedule returned by the downloader
# at the beginning of the downloaded ordered schedule list,
# then we have no negative time vm_resources at this point and
# need to pass the global barrier
self.log.debug("PROCESSING NO SCHEDULE EVENT")
self.current_time = 0
self.set_state("configured")
elif event.get_type() == ScheduleEventType.NEW_ITEM:
self.log.debug("PROCESSING NEW ITEM EVENT")
schedule_entry = event.get_data()
# Determine the paths inside the VM for this vm_resource
try:
self.driver.create_paths(schedule_entry)
except OSError as exp:
self.log.exception(exp)
success = self.load_files_in_target(schedule_entry)
if not success:
self.log.error(
"Unable to load files into the VM: %s", schedule_entry
)
# Ignoring a failure typically will happen by a helper
# (e.g. push file) where we don't want the VM resource
# handler to exit on failure
if not schedule_entry.ignore_failure:
self.set_state("FAILED")
sys.exit(1)
if not schedule_entry.executable:
# No executable means that we're done
# once the data is loaded into the VM
continue
# Handle negative time vm_resources by kicking them off
# immediately
if schedule_entry.start_time < 0:
args = {"schedule_entry": schedule_entry, "queue": reboot_queue}
# rate limit by some small random time
time.sleep(
self.load_balance_factor
* random.SystemRandom().randint(1, 5)
)
thread = Thread(target=self.run_vm_resource, kwargs=args)
# Keep track of negative time vm_resource threads
threads.append(thread)
# Start the vm_resource
thread.start()
elif schedule_entry.start_time > 0:
if not self.experiment_start_time:
self.log.error(
"Processing positive time vm_resource "
"but no experiment start time!"
)
continue
# Determine when to fire the timer
runtime = self.experiment_start_time + timedelta(
seconds=schedule_entry.start_time
)
curtime = datetime.utcnow()
delay = (runtime - curtime).total_seconds()
start_seconds = (
self.experiment_start_time - curtime
).total_seconds()
self.log.debug(
"Experiment will start in %s seconds", start_seconds
)
self.log.debug(
"The `ScheduleEntry` '%s' with start time %s will start in %s seconds",
schedule_entry.executable,
schedule_entry.start_time,
delay,
)
# Set a timer to kick off the vm_resource runner with
# the vm_resource
thread = Timer(
delay, self.run_vm_resource, args=(schedule_entry,)
)
# Positive time vm_resources don't get held onto since
# they can be long running
thread.start()
elif event.get_type() == ScheduleEventType.TRANSFER:
schedule_entry = event.get_data()
data = schedule_entry.data
if schedule_entry.start_time < 0:
for entry in data:
entry["name"] = self.config["vm_name"]
thread = Thread(target=self.transfer_data, kwargs=entry)
thread.start()
else:
if not self.experiment_start_time:
self.log.error(
"Processing positive time file transfer "
"but no experiment start time!"
)
continue
# Determine when to fire the timer
runtime = self.experiment_start_time + timedelta(
seconds=schedule_entry.start_time
)
curtime = datetime.utcnow()
delay = (runtime - curtime).total_seconds()
start_seconds = (
self.experiment_start_time - curtime
).total_seconds()
self.log.debug(
"Experiment will start in %s seconds", start_seconds
)
self.log.debug(
"Transfer of `%s` with start time %s will start in %s seconds.",
schedule_entry.executable,
schedule_entry.start_time,
delay,
)
for entry in data:
entry["name"] = self.config["vm_name"]
# Set a timer to kick off the transfer
thread = Timer(delay, self.transfer_data, kwargs=entry)
thread.start()
elif event.get_type() == ScheduleEventType.EXIT:
self.stop(event.get_data())
# Wait for all threads to finish
for thread in threads:
thread.join()
# Check for reboot requests before moving on
# Don't need to lock on reboot_queue since all threads have
# finished by this point.
if not reboot_queue.empty():
self.log.debug("Reboot has been requested")
self.reboot()
# Do need to lock on the main priority queue of events
self.condition.acquire()
while not reboot_queue.empty():
schedule_entry = reboot_queue.get()
self.log.debug(
"Putting vm_resource back in event queue: %s",
schedule_entry.executable,
)
event = ScheduleEvent(ScheduleEventType.NEW_ITEM, schedule_entry)
# recycle vm_resource into the main processing queue
self.prior_q.put((schedule_entry.start_time, event))
self.condition.release()
[docs]
def stop(self, exitcode):
"""
Stop the vm resource handler. This is used for unit testing.
Args:
exitcode (int): exit code to use when exiting the program
"""
self.schedule_updater.stop_thread()
self.log.debug("Wait for join")
self.schedule_updater.join()
self.driver.close()
self.log.debug("Exiting: %d", exitcode)
sys.exit(exitcode)
[docs]
def run_vm_resource(self, schedule_entry, queue=None):
"""
Wrapper around the logic of running an vm_resource.
The new thread calls this wrapper, which can output errors to the
log file. This gives a convenient way to pass information back to
the user via the log (with few good alternatives).
Args:
schedule_entry (ScheduleEntry): ``ScheduleEntry`` object
specifying the VM resource to run.
queue (Queue): Queue to pass messages, specifically reboot
requests, back to the main processing loop. Defaults to
``None``.
"""
try:
self._run_vm_resource(schedule_entry, queue)
except Exception as exp: # noqa: BLE001
self.log.exception(exp)
[docs]
def _run_vm_resource(self, schedule_entry, queue=None):
"""
Function to handle starting a VM resource.
Args:
schedule_entry (ScheduleEntry): ``ScheduleEntry`` object
specifying the VM resource to run.
queue (Queue): Queue to pass messages, specifically reboot
requests, back to the main processing loop. Defaults to
``None``.
"""
if not hasattr(schedule_entry, "reboot"):
schedule_entry.reboot = False
# Create the call arguments script in the VM
if not schedule_entry.reboot:
try:
preload = schedule_entry.preloaded
except AttributeError:
preload = False
if not schedule_entry.data and not preload:
# If there isn't data then the CWD hasn't been
# created inside the VM, so create them here
self.log.info("FILES NOT PRELOADED, CREATING DIRS")
ret_value = self.driver.create_directories(
str(schedule_entry.working_dir)
)
while not ret_value:
self.log.error(
"Unable to create directories to write call arguments"
)
self.connect_to_driver()
ret_value = self.driver.create_directories(
str(schedule_entry.working_dir)
)
ret_value = False
while not ret_value and not preload:
self.log.info("FILES NOT PRELOADED, WRITING CALL ARGS")
ret_value = self.driver.create_directories(
str(schedule_entry.working_dir)
)
ret_value = self.driver.write(
str(schedule_entry.call_args_filename),
schedule_entry.call_arguments,
)
if not ret_value:
self.log.error("WRITE FAILED WHEN WRITING CALL ARGS")
time.sleep(self.load_balance_factor * 1)
if not preload:
self.driver.make_file_executable(str(schedule_entry.call_args_filename))
elif schedule_entry.reboot:
# Clear the reboot flag
schedule_entry.reboot = False
# Delete the reboot file
while True:
ret = self.driver.delete_file(str(schedule_entry.reboot_file))
if ret is True:
break
if ret is False:
self.log.error("Unable to delete reboot file")
self.connect_to_driver()
# Call the call_arguments file to execute the vm_resource
while True:
start = time.time()
self.log.info("CALL ARGS: %s", schedule_entry.call_args_filename)
pid = self.driver.async_exec(str(schedule_entry.call_args_filename))
self.log.info("PID returned: %s", pid)
if not pid:
self.log.info("No PID, resetting driver")
self.connect_to_driver()
continue
# Wait for the VM resource to finish
try:
exitcode = self.driver.get_exitcode(pid)
while exitcode is None:
# Print streaming output while the command is running
self.print_output(schedule_entry, pid)
time.sleep(self.load_balance_factor * 2)
exitcode = self.driver.get_exitcode(pid)
# Retry on errors and timeouts (`TimeoutError` is a subclass of `OSError`)
# - NOTE: Python 3.11 aliases `asyncio.TimeoutError` to `TimeoutError` which
# will eventually make the second check redundant
except (OSError, asyncio.TimeoutError):
self.log.error(
"FAILED: Unable to get exitcode of running process; retry command"
)
self.connect_to_driver()
continue
end = time.time()
if exitcode != 0:
self.log.warning(
"%s (%s) exited after %05f seconds with code: %s",
schedule_entry.executable,
pid,
end - start,
exitcode,
)
else:
self.log.debug(
"%s (%s) exited after %05f seconds with code: %s",
schedule_entry.executable,
pid,
end - start,
exitcode,
)
if "powershell" in schedule_entry.executable.lower() and (end - start) < 2:
self.log.error(
"Powershell took less than two seconds, this is most likely"
" an error, retrying"
)
time.sleep(self.load_balance_factor * 5)
continue
# Handle stdout and stderr
self.print_output(schedule_entry, pid)
break
# Determine if a reboot is required
need_reboot = None
if exitcode == 10:
self.log.info("Rebooting based on exit code")
need_reboot = True
else:
need_reboot = self.check_for_reboot(str(schedule_entry.reboot_file))
# Execute the reboot if required
if need_reboot:
schedule_entry.reboot = True
if not queue:
self.log.error(
"Can not handle reboots since the Queue"
" was not passed to the vm_resource runner"
)
return
queue.put(schedule_entry)
[docs]
def check_for_reboot(self, reboot_filepath):
"""
Check if the reboot file exists and a reboot is needed.
Args:
reboot_filepath (str): The path to the reboot file on the VM.
Returns:
bool: True if the reboot file exists and a reboot is required,
otherwise False.
"""
need_reboot = self.driver.file_exists(reboot_filepath)
while need_reboot is None:
self.log.error("Unable to check existence of the reboot file")
self.connect_to_driver()
time.sleep(self.load_balance_factor * 1)
need_reboot = self.driver.file_exists(reboot_filepath)
return need_reboot
[docs]
def transfer_data(self, name, location, interval=None, destination=None):
"""
Transfer data from the VM to the physical host based on the input
parameters.
Args:
name (str): The name of file to transfer.
location (str): The full absolute path of where the file is located
on the VM.
interval (int): How often to transfer the data in seconds.
destination (str): The full, absolute path of where to put the file
on the physical host.
"""
try:
self._transfer_data(name, location, interval, destination)
except Exception as exp: # noqa: BLE001
self.log.exception(exp)
[docs]
def _transfer_data(self, name, location, interval=None, destination=None): # noqa: DOC503
"""
The helper function which transfers data from the VM to the physical host
based on the input parameters.
Args:
name (str): The name of file to transfer.
location (str): The full absolute path of where the file is located
on the VM.
interval (int): How often to transfer the data in seconds.
destination (str): The full, absolute path of where to put the file
on the physical host.
Raises:
RuntimeError: If the transfer path is not absolute.
RuntimeError: If it is unable to list files at the location.
"""
if destination is not None:
destination = Path(destination)
local_path = Path(destination) / name
else:
destination = Path(global_config["logging"]["root_dir"])
local_path = Path(destination / "transfers" / name)
local_time = None
if "Windows" in self.target_os:
target_path = PureWindowsPath(location)
if not target_path.is_absolute() and not Path(location).is_absolute():
raise RuntimeError(
f"Transfer paths must be absolute! Cannot transfer: {location}"
)
else:
target_path = Path(location)
if not target_path.is_absolute():
raise RuntimeError(
f"Transfer paths must be absolute! Cannot transfer: {location}"
)
while True:
result = self.driver.file_exists(str(target_path))
if result is None:
if not interval or not isinstance(interval, int):
return
# An error occurred, reconnect to driver and try again
self.log.debug(
"An error occurred checking if the file=%s exists."
"Reconnecting to the driver, sleeping for %s seconds, and retrying.",
str(target_path),
interval,
)
self.connect_to_driver()
time.sleep(interval)
continue
self.log.debug("The file '%s' exists: %s", target_path, result)
if result is False:
if not interval or not isinstance(interval, int):
return
self.log.debug(
"The file=%s was not found, sleeping for %s seconds and retrying.",
str(target_path),
interval,
)
time.sleep(interval)
continue
# Get eligible file names
filenames = self.driver.get_files(str(target_path), local_time)
if filenames is None:
raise RuntimeError(f"Unable to list files at location: {target_path}")
self.log.debug("Found the following filenames for transfer: %s", filenames)
if filenames:
self.log.debug("Getting files: %s", filenames)
for filename in filenames:
if "Windows" in self.target_os:
# Make a legitimate posix path using the windows path
fname = Path(local_path) / PureWindowsPath(filename).as_posix()
else:
fname = Path(local_path) / filename.strip("/")
# Only pull files if they've changed
success = self.driver.read_file(filename, fname)
if not success or not fname.exists():
continue
# Set permissions on all the directories and files that
# have been pulled from the VM so the user doesn't have
# to be root to read them
touched = fname
while str(touched) != "/":
if touched.is_dir():
self.log.debug("Changing permissions for '%s' to 777.", touched)
touched.chmod(0o777)
else:
self.log.debug("Changing permissions for '%s' to 666.", touched)
touched.chmod(0o666)
if touched.parent == destination:
self.log.debug("Finished changing permissions")
break
touched = touched.parent
if not interval or not isinstance(interval, int):
return
# Update the local VM time to only grab
# updated files next time
local_time = self.driver.get_time()
self.log.debug(
"New local_time is %s -- pausing for %d seconds before next file transfer",
str(datetime.fromtimestamp(local_time)),
int(interval),
)
time.sleep(interval)
[docs]
def log_json(self, content):
"""
Print any JSON line in vm_resource output to the json log.
Args:
content (str or dict): Buffer from agent output.
"""
# Handle content that is already a dictionary, i.e. from the handler
if isinstance(content, dict):
try:
# Only log a line if it is a JSON object.
content["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.json_log.info(json.dumps(content))
except TypeError:
self.log.debug("Could not parse '%s' into JSON formatting.", content)
else:
try:
# Buffer can contain multiple lines of output.
split_lines = [s.strip() for s in content.splitlines()]
except AttributeError:
return
for line in split_lines:
# Only log a line if it can be decoded
try:
data = json.loads(line.decode())
data["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
except (json.JSONDecodeError, TypeError):
try:
# Convert decoded line into a dict
data = {"msg": line.decode()}
data["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
except TypeError:
return
self.json_log.info(json.dumps(data))
[docs]
def print_output(self, schedule_entry, pid):
"""
Print any output from an vm_resource to the log.
Args:
schedule_entry (ScheduleEntry): ``ScheduleEntry`` object specifying
the VM resource to run.
pid (int): The PID of the VM resource process within the VM.
"""
output = {}
output["name"] = schedule_entry.executable
output["pid"] = pid
# Print stdout output
try:
stdout = self.driver.get_stdout(pid)
except OSError:
stdout = None
if stdout:
self._print_stream(output, stdout, "stdout")
# Print stderr output
try:
stderr = self.driver.get_stderr(pid)
except OSError:
stderr = None
if stderr:
self._print_stream(output, stderr, "stderr")
[docs]
def _print_stream(self, output, stream, stream_name):
stream_text = stream.encode(sys.getdefaultencoding())
output["fd"] = stream_name
output["output"] = rf"{stream_text}"
self.log.info(output["output"])
self.log_json(stream_text)
[docs]
def preload_files(self):
"""
This method loads all VM Resources into the VM before the schedule
is executed. This minimizes the number of after-boot disk alterations.
This is particularly important for Windows VMs as, in our experience,
Windows does not appear to appreciate having files modified or created
on disk immediately after a reboot.
"""
temp_q = Queue()
self.condition.acquire()
if self.prior_q.empty():
self.condition.wait()
while not self.prior_q.empty():
start_time, event = self.prior_q.get()
if event.get_type() == ScheduleEventType.NEW_ITEM:
schedule_entry = event.get_data()
try:
self.driver.create_paths(schedule_entry)
except socket.timeout:
self.log.warning(
"There was a timeout when loading in a schedule entry. "
"Will reset the connection to the driver and try again. "
"The `ScheduleEntry` was %s",
event.get_data(),
)
self.connect_to_driver()
self.prior_q.put((start_time, event))
continue
except OSError as exp:
self.log.error(
"There was an error when loading in a schedule entry. "
"The `ScheduleEntry` was %s",
event.get_data(),
)
self.log.exception(exp)
try:
self.load_files_in_target(schedule_entry)
except socket.timeout:
self.log.warning(
"There was a timeout when loading in a schedule entry. "
"Will reset the connection to the driver and try again. "
"The `ScheduleEntry` was %s",
event.get_data(),
)
self.connect_to_driver()
self.prior_q.put((start_time, event))
continue
if schedule_entry.executable:
# Handle preloading the call arguments if this SE
# is meant to be executed
if not schedule_entry.data:
# If there isn't data then the CWD hasn't been
# created inside the VM, so create them here
self.log.info("creating directory since no file data")
ret_value = self.driver.create_directories(
str(schedule_entry.working_dir)
)
while not ret_value:
self.log.error(
"Unable to create directories to write call arguments"
)
time.sleep(self.load_balance_factor * 2)
ret_value = self.driver.create_directories(
str(schedule_entry.working_dir)
)
self.log.info("done creating directory")
ret_value = False
while not ret_value:
ret_value = self.driver.write(
str(schedule_entry.call_args_filename),
schedule_entry.call_arguments,
)
if not ret_value:
self.log.error("WRITE FAILED WHEN WRITING CALL ARGS")
self.connect_to_driver()
ret_value = False
while not ret_value:
try:
ret_value = self.driver.make_file_executable(
str(schedule_entry.call_args_filename)
)
except OSError:
ret_value = False
self.connect_to_driver()
schedule_entry.preloaded = True
temp_q.put((start_time, event))
# Reload the event queue
while not temp_q.empty():
start_time, event = temp_q.get()
self.prior_q.put((start_time, event))
self.condition.release()
self.log.info("Done preloading files")
[docs]
def get_events(self):
"""
Get all eligible events from the priority queue.
Returns:
list (ScheduleEvent): List of events that are ready to be processed.
"""
events = []
time_updated = False
# Grab the lock since this function modifies the queue
self.condition.acquire()
# If there isn't anything in the schedule, then just wait
# until something shows up
if self.prior_q.empty():
# If the schedule is all negative time vm_resources then when the
# schedule is empty, the VM is configured
if self.current_time < 0 and self.current_time > self.initial_time:
self.current_time = 0
self.set_current_time(self.current_time)
self.set_state("configured")
self.log.debug("Event queue is empty, WAITING")
self.condition.wait()
# Work through eligible events
while not self.prior_q.empty():
# Get event off the queue
start_time, event = self.prior_q.get()
# EXPERIMENT_START_TIME and EMPTY_SCHEDULE events always
# get processed immediately and shouldn't update current time
if (
event.get_type() == ScheduleEventType.EXPERIMENT_START_TIME_SET
or event.get_type() == ScheduleEventType.EMPTY_SCHEDULE
):
events.append(event)
continue
if not time_updated and start_time > self.current_time:
self.current_time = start_time
time_updated = True
self.set_current_time(self.current_time)
if start_time > self.current_time:
# Event's start time is past the current time
# so break and return eligible events
self.log.debug(
"Putting event back: %s, current time: %s",
start_time,
self.current_time,
)
self.prior_q.put((start_time, event))
break
# Wait for experiment start time before passing on
# positive time events
if start_time > 0 and not self.experiment_start_time:
self.log.debug("WAITING FOR START TIME")
# Put the event back since we can't process it yet
self.prior_q.put((start_time, event))
# All negative time vm_resources have been handled
self.set_state("configured")
self.current_time = 0
self.set_current_time(self.current_time)
# This will force a sleep until notified by the ScheduleUpdater
# that a new event has been enqueued
self.condition.wait()
continue
# If positive time, then return all events in the queue
if self.current_time > 0:
events.append(event)
continue
# If all checks passed, then time to process this event
time_updated = True
events.append(event)
# Release the lock
self.condition.release()
return events
[docs]
def load_files_in_target(self, schedule_entry):
"""
Copy the vm_resource file to the VM's local cache so that it is available
when the vm_resource gets called.
Args:
schedule_entry (ScheduleEntry): The schedule entry to be copied into the VM.
Returns:
bool: Success or failure of loading files into the VM.
"""
if not schedule_entry.data:
return True
with contextlib.suppress(AttributeError):
if schedule_entry.reboot is True:
return True
with contextlib.suppress(AttributeError):
if schedule_entry.preloaded:
return True
try:
while True:
try:
ret_value = self.driver.create_directories(
str(schedule_entry.working_dir)
)
if ret_value:
break
except OSError as exp:
self.log.error("There was an issue creating directories: %s", exp)
self.log.error(
"Unable to create directories while loading files into VM"
)
self.connect_to_driver()
ret_value = self.driver.create_directories(
str(schedule_entry.working_dir)
)
except AttributeError:
self.log.info("Not creating working directory")
for data in schedule_entry.data:
# Check if this file needs to be placed in a relative path
# or an absolute path in the VM. If we do not have the right
# key, we shouldn't load anything into the VM.
try:
target_path = Path(data["location"])
except KeyError:
self.log.warning(
"Processed event %s that may not have the right keys", data
)
continue
if not target_path.is_absolute():
if not schedule_entry.executable:
self.log.error(
"Files require absolute paths unless they are "
"for an vm_resource"
)
return False
target_path = schedule_entry.working_dir / target_path
if data.get("filename"):
attempts = 1
local_path = None
while attempts < 10:
try:
local_path = Path(
self.vm_resource_store.get_path(data["filename"])
)
if not local_path:
self.log.error("Unable to get file: %s", data["filename"])
attempts += 1
time.sleep(self.load_balance_factor * 2)
# If there were no errors then the file was successfully cached
else:
break
except (FileNotFoundError, RuntimeError) as exp:
self.log.exception(exp)
self.log.error("Unable to get file: %s", data["filename"])
attempts += 1
time.sleep(self.load_balance_factor * 2)
if not local_path:
self.log.error(
"Attempted 10 times to get file: %s", data["filename"]
)
return False
attempts = 1
while attempts < 10:
try:
if not self.driver.file_exists(str(target_path)):
self.log.debug(
"Writing file from: %s to %s", local_path, target_path
)
ret_value = False
while not ret_value:
ret_value = self.driver.write_from_file(
str(target_path), str(local_path)
)
if not ret_value:
self.log.error("UNABLE TO WRITE FILE")
else:
break
except OSError:
self.log.error(
"Unable to connect to the driver, reconnecting and trying again."
)
attempts += 1
time.sleep(self.load_balance_factor * 2)
self.connect_to_driver()
else:
return False
# Make the executable file executable on machines that aren't windows
if "executable" in data:
self.driver.make_file_executable(str(target_path))
elif "content" in data and isinstance(data["content"], str):
if not self.driver.file_exists(str(target_path)):
# Check that the parent directories exist
if not self.driver.file_exists(str(target_path.parent)):
success = self.driver.create_directories(
str(target_path.parent)
)
if not success:
self.log.error(
"Unable to create directory: %s", target_path.parent
)
return False
# File does not exist so write it
if "Windows" in self.target_os:
# Change newlines to windows newlines
data["content"].replace("\n", "\r\n")
self.log.debug("Writing content to %s", target_path)
ret_value = False
while not ret_value:
ret_value = self.driver.write(str(target_path), data["content"])
if not ret_value:
self.log.error("UNABLE TO WRITE CONTENT")
if "executable" in data:
self.driver.make_file_executable(str(target_path))
else:
self.log.error("Data entry for schedule entry is not a file or content")
self.log.error(schedule_entry)
return False
return True
[docs]
def reboot(self):
"""
Reboot the VM. Function doesn't return until the driver can communicate
with the VM again.
"""
self.log.debug("Rebooting")
try:
self.driver.reboot()
except Exception as exp: # noqa: BLE001
self.log.exception(exp)
# We now need to sleep a variable amount of time (25 - 45 seconds) after
sleep_time = self.load_balance_factor * random.SystemRandom().randint(
25, 45
)
# guest agent reconnects to a Windows VM too soon after being asked to
time.sleep(sleep_time)
if "Windows" in self.target_os:
sleep_time = self.load_balance_factor * random.SystemRandom().randint(
25, 45
)
self.log.info("Windows sleep: %s seconds", sleep_time)
time.sleep(sleep_time)
self.connect_to_driver()
[docs]
def import_driver(self):
"""
Walk through all the available drivers and find the one that
matches the type of VM that has been booted.
Returns:
object: The driver class that matches the VM's type
"""
drivers = self._import_drivers()
if not drivers:
self.log.error(
"Unable to find driver to communicate with VM: %s",
self.config["vm_name"],
)
# If there is no driver then we can't talk to the VM.
# Nothing left to do so exit
sys.exit(1)
# Walk the drivers looking for the one that has an engine
# that matches the config's type (i.e. QemuVM)
for driver in drivers:
if driver.get_engine() == self.config["engine"]:
return driver
self.log.error(
"Unable to find driver for type: %s for VM: %s",
self.config["engine"],
self.config["vm_name"],
)
# No driver found. Can't communicate with the VM so exit.
sys.exit(1)
[docs]
def _import_drivers(self):
"""
Import available drivers from the driver directory.
Walk the drivers directory looking for all available drivers that
can talk to a booted VM. A qualifying driver must be a subclass of
the abstract driver class, implementing all required methods of
the driver interface.
Returns:
set: A set of available driver classes.
"""
drivers = set()
for module_path in self.driver_directory.rglob("*.py"):
spec = importlib.util.spec_from_file_location(
module_path.name, str(module_path)
)
if spec:
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
except (FileNotFoundError, SyntaxError):
self.log.debug("Could not load module '%s'. Continuing", module)
for _, driver_cls in inspect.getmembers(module, self._check_driver):
drivers.add(driver_cls)
return drivers
[docs]
@staticmethod
def _check_driver(obj):
if inspect.isclass(obj):
return issubclass(obj, AbstractDriver) and not inspect.isabstract(obj)
return False
[docs]
def set_state(self, state):
"""
Tell the infrastructure the state of the VM.
If the state is 'configured', then check to see
if this VM is the last VM to be configured. If it
is the last VM to be configured, then set the
experiment start time.
Args:
state (str): State of the VM
"""
if self.state == state:
return
try:
self.log.debug("SETTING STATE: %s", state)
utils.set_vm_state(
self.config["vm_uuid"], state, mapping=self.vm_mapping, log=self.log
)
self.state = state
except RuntimeError as exp:
self.log.error("Error setting VM state. Can not set state to: %s", state)
self.log.exception(exp)
# Check to see if the experiment start time can be set
if state == "configured":
if self.experiment_start_time:
return
not_ready_count = utils.get_vm_count_not_ready(
mapping=self.vm_mapping, log=self.log
)
# All VMs are configured, so set the start time.
if not_ready_count == 0:
# Set the experiment start time
try:
self.log.debug("SETTING EXPERIMENT START TIME")
api.add_experiment_start_time()
except Exception as exp: # noqa: BLE001
self.log.error("Unable to set the start time")
self.log.exception(exp)
[docs]
def set_current_time(self, cur_time):
"""
Tell the infrastructure the current configuration time of the VM.
Args:
cur_time (int): Current VM schedule time
"""
try:
utils.set_vm_time(
self.config["vm_uuid"], cur_time, mapping=self.vm_mapping, log=self.log
)
except Exception: # noqa: BLE001
self.log.error("Error setting VM state. Can not set state to: %s", cur_time)
# ---------------------------------- __main__ ----------------------------------
if __name__ == "__main__":
# Get the vm config off the command line
vm_config = sys.argv[1]
vm_config = json.loads(vm_config)
resource_handler = VMResourceHandler(vm_config)
resource_handler.run()