Source code for firewheel.vm_resource_manager.vm_resource_handler
#!/usr/bin/env python"""This module contains the class enable the ``vm_resource_handler`` torun. This runs as a process for each VM that is launched with FIREWHEEL andcontrols the interaction with the QEMU Guest Agent."""importosimportsysimportjsonimporttimeimportrandomimportsocketimportasyncioimportinspectimportcontextlibimportimportlib.utilfromqueueimportQueue,PriorityQueuefrompathlibimportPath,PureWindowsPathfromdatetimeimportdatetime,timedeltafromthreadingimportTimer,Thread,Conditionfromfirewheel.configimportconfigasglobal_configfromfirewheel.lib.logimportUTCLogfromfirewheel.lib.minimega.apiimportminimegaAPIfromfirewheel.vm_resource_managerimportapi,utilsfromfirewheel.control.repository_dbimportRepositoryDbfromfirewheel.vm_resource_manager.vm_mappingimportVMMappingfromfirewheel.vm_resource_manager.schedule_dbimportScheduleDbfromfirewheel.vm_resource_manager.schedule_eventimport(ScheduleEvent,ScheduleEventType,)fromfirewheel.vm_resource_manager.abstract_driverimportAbstractDriverfromfirewheel.vm_resource_manager.schedule_updaterimportScheduleUpdaterfromfirewheel.vm_resource_manager.vm_resource_storeimportVmResourceStore
[docs]classVMResourceHandler:""" Main class for communicating with a VM. Kicks off the ScheduleUpdater thread and handles all ScheduleEvents and their side effects. """
[docs]def__init__(self,config,check_interval=10):""" Args: config (dict): VM configuration. check_interval (int): Seconds between checks for an updated schedule This gets passed to the ScheduleUpdater. Defaults to 10. """# Class variable initializationself.log_directory=(Path(global_config["logging"]["root_dir"])/global_config["logging"]["vmr_log_dir"])self.driver_directory=Path(__file__).resolve().parent/"drivers"self.driver=Noneself.experiment_start_time=Noneself.current_time=Noneself.state=None# Make sure the directory for logs existsself.log_directory.mkdir(exist_ok=True,parents=True)self.config=config# Set up the logging fileself.log_filename=self.log_directory/(f"{self.config['vm_name']}.log")self.json_log_filename=str(self.log_directory/(f"{self.config['vm_name']}.json"))# We want VM Resource logs to be in UTClog_format="[%(asctime)s%(levelname)s] %(message)s"log_level=global_config["logging"]["level"]self.log=UTCLog("VMResourceHandler",log_file=self.log_filename,log_format=log_format,log_level=log_level,).logself.json_log=UTCLog("VMResourceHandlerJSON",log_file=self.json_log_filename,log_format="%(message)s",log_level=log_level,).logself.log.info("Starting RESOURCE HANDLER")self.log.info("Using Python %s",sys.version)self.log.info(self.config)# Start the current time to be the most negativeself.initial_time=-(sys.maxsize-1)self.current_time=self.initial_time# Get a handle to the vm_resources storeself.vm_resource_store=VmResourceStore()# Get a handle to the ScheduleDbself.schedule_db=ScheduleDb(log=self.log)# Get a handle to vm_mappingself.vm_mapping=VMMapping()# Get a handle to repository_dbself.repository_db=RepositoryDb()# Priority Queue to hold on to ScheduleEventsself.prior_q=PriorityQueue()self.mma=minimegaAPI()self.load_balance_factor=self.mma.get_cpu_commit_ratio()+1self.log.info("Using load_balance_factor of %s",self.load_balance_factor)# Kick off the schedule updater thread that# periodically checks for schedule updates and# the experiment start time. Updates get placed# in the priority queueself.condition=Condition()self.schedule_updater=ScheduleUpdater(self.config,self.prior_q,self.condition,self.vm_resource_store,self.schedule_db,self.repository_db,self.log,self.log_filename,self.load_balance_factor,check_interval,)# Make sure the path is availablesocket_path=Path(self.config["path"])try:os.stat(socket_path)exceptFileNotFoundError:self.log.debug("Waiting for path: %s",socket_path)time.sleep(self.load_balance_factor*1)exceptPermissionError:self.log.info("PermissionError: Trying to update permissions for %s through minimega",socket_path,)parent_dir=socket_path.parentself.mma.set_group_perms(parent_dir)self.log.debug("Found path")# load the driver for the virtualization enginetry:self.driver_class=self.import_driver()exceptExceptionasexp:# noqa: BLE001self.log.exception(exp)connected=self.connect_to_driver()ifconnected:self.set_state("configuring")else:sys.exit(1)# Don't kick off the updater until we're connected to the VMself.schedule_updater.start()# Grab the target OSself.target_os=self.driver.get_os()# Windows allow QGA connections before the system is actually fully functional.# Because the resource handler starts executing tasks on connect, it is important# that the VM is fully functional first. Without this, Windows VMs will not# be ready to have processes modifying disk and issues could arise.if"Windows"inself.target_os:time.sleep(self.load_balance_factor*10)self.log.info("Setting time")# Set the time in the VMself.driver.set_time()self.log.info("Done setting time")
[docs]defconnect_to_driver(self):""" Instantiate the driver class to communicate to the VM Returns: bool: True if the driver is connected, False otherwise. """ifself.driver:self.driver.close()whileTrue:try:ifnotself.driver:# Establish connectivity to the VMself.log.info("New driver connection")self.driver=self.driver_class(self.config,self.log)else:# Reestablish connectivity to the VMself.log.info("Resetting driver connection")sync=self.driver.connect()self.log.info("Synced: %s",sync)returnTrueexceptExceptionasexp:# noqa: BLE001# Sleep another timeout amount of timeself.log.exception(exp)time.sleep(self.load_balance_factor*random.SystemRandom().randint(3,10))
[docs]defrun(self):""" Run the VMResourceHandler. """try:self.log.info("VmResourceHandler: Starting the _run function")self._run()self.log.info("VmResourceHandler: Finished the _run function.")exceptExceptionasexp:# noqa: BLE001self.log.info("VmResourceHandler: Stopping due to an exception.")self.log.exception(exp)finally:self.log.info("VmResourceHandler: Exiting.")
[docs]def_run(self):""" This is the main processing loop for the VMResourceHandler. It kicks off the ScheduleUpdater thread and threads to run the individual vm_resources. """self.preload_files()whileTrue:# This function will block if there are not events# in the priority queueevents=self.get_events()threads=[]reboot_queue=Queue()foreventinevents:ifevent.get_type()==ScheduleEventType.EXPERIMENT_START_TIME_SET:self.log.debug("PROCESSING EXPERIMENT START EVENT")# Set the experiment start timeself.experiment_start_time=event.get_data()elifevent.get_type()==ScheduleEventType.EMPTY_SCHEDULE:# If there is an empty schedule returned by the downloader# at the beginning of the downloaded ordered schedule list,# then we have no negative time vm_resources at this point and# need to pass the global barrierself.log.debug("PROCESSING NO SCHEDULE EVENT")self.current_time=0self.set_state("configured")elifevent.get_type()==ScheduleEventType.NEW_ITEM:self.log.debug("PROCESSING NEW ITEM EVENT")schedule_entry=event.get_data()# Determine the paths inside the VM for this vm_resourcetry:self.driver.create_paths(schedule_entry)exceptOSErrorasexp:self.log.exception(exp)success=self.load_files_in_target(schedule_entry)ifnotsuccess:self.log.error("Unable to load files into the VM: %s",schedule_entry)# Ignoring a failure typically will happen by a helper# (e.g. push file) where we don't want the VM resource# handler to exit on failureifnotschedule_entry.ignore_failure:self.set_state("FAILED")sys.exit(1)ifnotschedule_entry.executable:# No executable means that we're done# once the data is loaded into the VMcontinue# Handle negative time vm_resources by kicking them off# immediatelyifschedule_entry.start_time<0:args={"schedule_entry":schedule_entry,"queue":reboot_queue}# rate limit by some small random timetime.sleep(self.load_balance_factor*random.SystemRandom().randint(1,5))thread=Thread(target=self.run_vm_resource,kwargs=args)# Keep track of negative time vm_resource threadsthreads.append(thread)# Start the vm_resourcethread.start()elifschedule_entry.start_time>0:ifnotself.experiment_start_time:self.log.error("Processing positive time vm_resource ""but no experiment start time!")continue# Determine when to fire the timerruntime=self.experiment_start_time+timedelta(seconds=schedule_entry.start_time)curtime=datetime.utcnow()delay=(runtime-curtime).total_seconds()start_seconds=(self.experiment_start_time-curtime).total_seconds()self.log.debug("Experiment will start in %s seconds",start_seconds)self.log.debug("The `ScheduleEntry` '%s' with start time %s will start in %s seconds",schedule_entry.executable,schedule_entry.start_time,delay,)# Set a timer to kick off the vm_resource runner with# the vm_resourcethread=Timer(delay,self.run_vm_resource,args=(schedule_entry,))# Positive time vm_resources don't get held onto since# they can be long runningthread.start()elifevent.get_type()==ScheduleEventType.TRANSFER:schedule_entry=event.get_data()data=schedule_entry.dataifschedule_entry.start_time<0:forentryindata:entry["name"]=self.config["vm_name"]thread=Thread(target=self.transfer_data,kwargs=entry)thread.start()else:ifnotself.experiment_start_time:self.log.error("Processing positive time file transfer ""but no experiment start time!")continue# Determine when to fire the timerruntime=self.experiment_start_time+timedelta(seconds=schedule_entry.start_time)curtime=datetime.utcnow()delay=(runtime-curtime).total_seconds()start_seconds=(self.experiment_start_time-curtime).total_seconds()self.log.debug("Experiment will start in %s seconds",start_seconds)self.log.debug("Transfer of `%s` with start time %s will start in %s seconds.",schedule_entry.executable,schedule_entry.start_time,delay,)forentryindata:entry["name"]=self.config["vm_name"]# Set a timer to kick off the transferthread=Timer(delay,self.transfer_data,kwargs=entry)thread.start()elifevent.get_type()==ScheduleEventType.EXIT:self.stop(event.get_data())# Wait for all threads to finishforthreadinthreads:thread.join()# Check for reboot requests before moving on# Don't need to lock on reboot_queue since all threads have# finished by this point.ifnotreboot_queue.empty():self.log.debug("Reboot has been requested")self.reboot()# Do need to lock on the main priority queue of eventsself.condition.acquire()whilenotreboot_queue.empty():schedule_entry=reboot_queue.get()self.log.debug("Putting vm_resource back in event queue: %s",schedule_entry.executable,)event=ScheduleEvent(ScheduleEventType.NEW_ITEM,schedule_entry)# recycle vm_resource into the main processing queueself.prior_q.put((schedule_entry.start_time,event))self.condition.release()
[docs]defstop(self,exitcode):""" Stop the vm resource handler. This is used for unit testing. Args: exitcode (int): exit code to use when exiting the program """self.schedule_updater.stop_thread()self.log.debug("Wait for join")self.schedule_updater.join()self.driver.close()self.log.debug("Exiting: %d",exitcode)sys.exit(exitcode)
[docs]defrun_vm_resource(self,schedule_entry,queue=None):""" Wrapper around the logic of running an vm_resource. The new thread calls this wrapper, which can output errors to the log file. This gives a convenient way to pass information back to the user via the log (with few good alternatives). Args: schedule_entry (ScheduleEntry): ``ScheduleEntry`` object specifying the VM resource to run. queue (Queue): Queue to pass messages, specifically reboot requests, back to the main processing loop. Defaults to ``None``. """try:self._run_vm_resource(schedule_entry,queue)exceptExceptionasexp:# noqa: BLE001self.log.exception(exp)
[docs]def_run_vm_resource(self,schedule_entry,queue=None):""" Function to handle starting a VM resource. Args: schedule_entry (ScheduleEntry): ``ScheduleEntry`` object specifying the VM resource to run. queue (Queue): Queue to pass messages, specifically reboot requests, back to the main processing loop. Defaults to ``None``. """ifnothasattr(schedule_entry,"reboot"):schedule_entry.reboot=False# Create the call arguments script in the VMifnotschedule_entry.reboot:try:preload=schedule_entry.preloadedexceptAttributeError:preload=Falseifnotschedule_entry.dataandnotpreload:# If there isn't data then the CWD hasn't been# created inside the VM, so create them hereself.log.info("FILES NOT PRELOADED, CREATING DIRS")ret_value=self.driver.create_directories(str(schedule_entry.working_dir))whilenotret_value:self.log.error("Unable to create directories to write call arguments")self.connect_to_driver()ret_value=self.driver.create_directories(str(schedule_entry.working_dir))ret_value=Falsewhilenotret_valueandnotpreload:self.log.info("FILES NOT PRELOADED, WRITING CALL ARGS")ret_value=self.driver.create_directories(str(schedule_entry.working_dir))ret_value=self.driver.write(str(schedule_entry.call_args_filename),schedule_entry.call_arguments,)ifnotret_value:self.log.error("WRITE FAILED WHEN WRITING CALL ARGS")time.sleep(self.load_balance_factor*1)ifnotpreload:self.driver.make_file_executable(str(schedule_entry.call_args_filename))elifschedule_entry.reboot:# Clear the reboot flagschedule_entry.reboot=False# Delete the reboot filewhileTrue:ret=self.driver.delete_file(str(schedule_entry.reboot_file))ifretisTrue:breakifretisFalse:self.log.error("Unable to delete reboot file")self.connect_to_driver()# Call the call_arguments file to execute the vm_resourcewhileTrue:start=time.time()self.log.info("CALL ARGS: %s",schedule_entry.call_args_filename)pid=self.driver.async_exec(str(schedule_entry.call_args_filename))self.log.info("PID returned: %s",pid)ifnotpid:self.log.info("No PID, resetting driver")self.connect_to_driver()continue# Wait for the VM resource to finishtry:exitcode=self.driver.get_exitcode(pid)whileexitcodeisNone:# Print streaming output while the command is runningself.print_output(schedule_entry,pid)time.sleep(self.load_balance_factor*2)exitcode=self.driver.get_exitcode(pid)# Retry on errors and timeouts (`TimeoutError` is a subclass of `OSError`)# - NOTE: Python 3.11 aliases `asyncio.TimeoutError` to `TimeoutError` which# will eventually make the second check redundantexcept(OSError,asyncio.TimeoutError):self.log.error("FAILED: Unable to get exitcode of running process; retry command")self.connect_to_driver()continueend=time.time()ifexitcode!=0:self.log.warning("%s (%s) exited after %05f seconds with code: %s",schedule_entry.executable,pid,end-start,exitcode,)else:self.log.debug("%s (%s) exited after %05f seconds with code: %s",schedule_entry.executable,pid,end-start,exitcode,)if"powershell"inschedule_entry.executable.lower()and(end-start)<2:self.log.error("Powershell took less than two seconds, this is most likely"" an error, retrying")time.sleep(self.load_balance_factor*5)continue# Handle stdout and stderrself.print_output(schedule_entry,pid)break# Determine if a reboot is requiredneed_reboot=Noneifexitcode==10:self.log.info("Rebooting based on exit code")need_reboot=Trueelse:need_reboot=self.check_for_reboot(str(schedule_entry.reboot_file))# Execute the reboot if requiredifneed_reboot:schedule_entry.reboot=Trueifnotqueue:self.log.error("Can not handle reboots since the Queue"" was not passed to the vm_resource runner")returnqueue.put(schedule_entry)
[docs]defcheck_for_reboot(self,reboot_filepath):""" Check if the reboot file exists and a reboot is needed. Args: reboot_filepath (str): The path to the reboot file on the VM. Returns: bool: True if the reboot file exists and a reboot is required, otherwise False. """need_reboot=self.driver.file_exists(reboot_filepath)whileneed_rebootisNone:self.log.error("Unable to check existence of the reboot file")self.connect_to_driver()time.sleep(self.load_balance_factor*1)need_reboot=self.driver.file_exists(reboot_filepath)returnneed_reboot
[docs]deftransfer_data(self,name,location,interval=None,destination=None):""" Transfer data from the VM to the physical host based on the input parameters. Args: name (str): The name of file to transfer. location (str): The full absolute path of where the file is located on the VM. interval (int): How often to transfer the data in seconds. destination (str): The full, absolute path of where to put the file on the physical host. """try:self._transfer_data(name,location,interval,destination)exceptExceptionasexp:# noqa: BLE001self.log.exception(exp)
[docs]def_transfer_data(self,name,location,interval=None,destination=None):# noqa: DOC503""" The helper function which transfers data from the VM to the physical host based on the input parameters. Args: name (str): The name of file to transfer. location (str): The full absolute path of where the file is located on the VM. interval (int): How often to transfer the data in seconds. destination (str): The full, absolute path of where to put the file on the physical host. Raises: RuntimeError: If the transfer path is not absolute. RuntimeError: If it is unable to list files at the location. """ifdestinationisnotNone:destination=Path(destination)local_path=Path(destination)/nameelse:destination=Path(global_config["logging"]["root_dir"])local_path=Path(destination/"transfers"/name)local_time=Noneif"Windows"inself.target_os:target_path=PureWindowsPath(location)ifnottarget_path.is_absolute()andnotPath(location).is_absolute():raiseRuntimeError(f"Transfer paths must be absolute! Cannot transfer: {location}")else:target_path=Path(location)ifnottarget_path.is_absolute():raiseRuntimeError(f"Transfer paths must be absolute! Cannot transfer: {location}")whileTrue:result=self.driver.file_exists(str(target_path))ifresultisNone:ifnotintervalornotisinstance(interval,int):return# An error occurred, reconnect to driver and try againself.log.debug("An error occurred checking if the file=%s exists.""Reconnecting to the driver, sleeping for %s seconds, and retrying.",str(target_path),interval,)self.connect_to_driver()time.sleep(interval)continueself.log.debug("The file '%s' exists: %s",target_path,result)ifresultisFalse:ifnotintervalornotisinstance(interval,int):returnself.log.debug("The file=%s was not found, sleeping for %s seconds and retrying.",str(target_path),interval,)time.sleep(interval)continue# Get eligible file namesfilenames=self.driver.get_files(str(target_path),local_time)iffilenamesisNone:raiseRuntimeError(f"Unable to list files at location: {target_path}")self.log.debug("Found the following filenames for transfer: %s",filenames)iffilenames:self.log.debug("Getting files: %s",filenames)forfilenameinfilenames:if"Windows"inself.target_os:# Make a legitimate posix path using the windows pathfname=Path(local_path)/PureWindowsPath(filename).as_posix()else:fname=Path(local_path)/filename.strip("/")# Only pull files if they've changedsuccess=self.driver.read_file(filename,fname)ifnotsuccessornotfname.exists():continue# Set permissions on all the directories and files that# have been pulled from the VM so the user doesn't have# to be root to read themtouched=fnamewhilestr(touched)!="/":iftouched.is_dir():self.log.debug("Changing permissions for '%s' to 777.",touched)touched.chmod(0o777)else:self.log.debug("Changing permissions for '%s' to 666.",touched)touched.chmod(0o666)iftouched.parent==destination:self.log.debug("Finished changing permissions")breaktouched=touched.parentifnotintervalornotisinstance(interval,int):return# Update the local VM time to only grab# updated files next timelocal_time=self.driver.get_time()self.log.debug("New local_time is %s -- pausing for %d seconds before next file transfer",str(datetime.fromtimestamp(local_time)),int(interval),)time.sleep(interval)
[docs]deflog_json(self,content):""" Print any JSON line in vm_resource output to the json log. Args: content (str or dict): Buffer from agent output. """# Handle content that is already a dictionary, i.e. from the handlerifisinstance(content,dict):try:# Only log a line if it is a JSON object.content["timestamp"]=datetime.now().strftime("%Y-%m-%d %H:%M:%S")self.json_log.info(json.dumps(content))exceptTypeError:self.log.debug("Could not parse '%s' into JSON formatting.",content)else:try:# Buffer can contain multiple lines of output.split_lines=[s.strip()forsincontent.splitlines()]exceptAttributeError:returnforlineinsplit_lines:# Only log a line if it can be decodedtry:data=json.loads(line.decode())data["timestamp"]=datetime.now().strftime("%Y-%m-%d %H:%M:%S")except(json.JSONDecodeError,TypeError):try:# Convert decoded line into a dictdata={"msg":line.decode()}data["timestamp"]=datetime.now().strftime("%Y-%m-%d %H:%M:%S")exceptTypeError:returnself.json_log.info(json.dumps(data))
[docs]defprint_output(self,schedule_entry,pid):""" Print any output from an vm_resource to the log. Args: schedule_entry (ScheduleEntry): ``ScheduleEntry`` object specifying the VM resource to run. pid (int): The PID of the VM resource process within the VM. """output={}output["name"]=schedule_entry.executableoutput["pid"]=pid# Print stdout outputtry:stdout=self.driver.get_stdout(pid)exceptOSError:stdout=Noneifstdout:self._print_stream(output,stdout,"stdout")# Print stderr outputtry:stderr=self.driver.get_stderr(pid)exceptOSError:stderr=Noneifstderr:self._print_stream(output,stderr,"stderr")
[docs]defpreload_files(self):""" This method loads all VM Resources into the VM before the schedule is executed. This minimizes the number of after-boot disk alterations. This is particularly important for Windows VMs as, in our experience, Windows does not appear to appreciate having files modified or created on disk immediately after a reboot. """temp_q=Queue()self.condition.acquire()ifself.prior_q.empty():self.condition.wait()whilenotself.prior_q.empty():start_time,event=self.prior_q.get()ifevent.get_type()==ScheduleEventType.NEW_ITEM:schedule_entry=event.get_data()try:self.driver.create_paths(schedule_entry)exceptsocket.timeout:self.log.warning("There was a timeout when loading in a schedule entry. ""Will reset the connection to the driver and try again. ""The `ScheduleEntry` was %s",event.get_data(),)self.connect_to_driver()self.prior_q.put((start_time,event))continueexceptOSErrorasexp:self.log.error("There was an error when loading in a schedule entry. ""The `ScheduleEntry` was %s",event.get_data(),)self.log.exception(exp)try:self.load_files_in_target(schedule_entry)exceptsocket.timeout:self.log.warning("There was a timeout when loading in a schedule entry. ""Will reset the connection to the driver and try again. ""The `ScheduleEntry` was %s",event.get_data(),)self.connect_to_driver()self.prior_q.put((start_time,event))continueifschedule_entry.executable:# Handle preloading the call arguments if this SE# is meant to be executedifnotschedule_entry.data:# If there isn't data then the CWD hasn't been# created inside the VM, so create them hereself.log.info("creating directory since no file data")ret_value=self.driver.create_directories(str(schedule_entry.working_dir))whilenotret_value:self.log.error("Unable to create directories to write call arguments")time.sleep(self.load_balance_factor*2)ret_value=self.driver.create_directories(str(schedule_entry.working_dir))self.log.info("done creating directory")ret_value=Falsewhilenotret_value:ret_value=self.driver.write(str(schedule_entry.call_args_filename),schedule_entry.call_arguments,)ifnotret_value:self.log.error("WRITE FAILED WHEN WRITING CALL ARGS")self.connect_to_driver()ret_value=Falsewhilenotret_value:try:ret_value=self.driver.make_file_executable(str(schedule_entry.call_args_filename))exceptOSError:ret_value=Falseself.connect_to_driver()schedule_entry.preloaded=Truetemp_q.put((start_time,event))# Reload the event queuewhilenottemp_q.empty():start_time,event=temp_q.get()self.prior_q.put((start_time,event))self.condition.release()self.log.info("Done preloading files")
[docs]defget_events(self):""" Get all eligible events from the priority queue. Returns: list (ScheduleEvent): List of events that are ready to be processed. """events=[]time_updated=False# Grab the lock since this function modifies the queueself.condition.acquire()# If there isn't anything in the schedule, then just wait# until something shows upifself.prior_q.empty():# If the schedule is all negative time vm_resources then when the# schedule is empty, the VM is configuredifself.current_time<0andself.current_time>self.initial_time:self.current_time=0self.set_current_time(self.current_time)self.set_state("configured")self.log.debug("Event queue is empty, WAITING")self.condition.wait()# Work through eligible eventswhilenotself.prior_q.empty():# Get event off the queuestart_time,event=self.prior_q.get()# EXPERIMENT_START_TIME and EMPTY_SCHEDULE events always# get processed immediately and shouldn't update current timeif(event.get_type()==ScheduleEventType.EXPERIMENT_START_TIME_SETorevent.get_type()==ScheduleEventType.EMPTY_SCHEDULE):events.append(event)continueifnottime_updatedandstart_time>self.current_time:self.current_time=start_timetime_updated=Trueself.set_current_time(self.current_time)ifstart_time>self.current_time:# Event's start time is past the current time# so break and return eligible eventsself.log.debug("Putting event back: %s, current time: %s",start_time,self.current_time,)self.prior_q.put((start_time,event))break# Wait for experiment start time before passing on# positive time eventsifstart_time>0andnotself.experiment_start_time:self.log.debug("WAITING FOR START TIME")# Put the event back since we can't process it yetself.prior_q.put((start_time,event))# All negative time vm_resources have been handledself.set_state("configured")self.current_time=0self.set_current_time(self.current_time)# This will force a sleep until notified by the ScheduleUpdater# that a new event has been enqueuedself.condition.wait()continue# If positive time, then return all events in the queueifself.current_time>0:events.append(event)continue# If all checks passed, then time to process this eventtime_updated=Trueevents.append(event)# Release the lockself.condition.release()returnevents
[docs]defload_files_in_target(self,schedule_entry):""" Copy the vm_resource file to the VM's local cache so that it is available when the vm_resource gets called. Args: schedule_entry (ScheduleEntry): The schedule entry to be copied into the VM. Returns: bool: Success or failure of loading files into the VM. """ifnotschedule_entry.data:returnTruewithcontextlib.suppress(AttributeError):ifschedule_entry.rebootisTrue:returnTruewithcontextlib.suppress(AttributeError):ifschedule_entry.preloaded:returnTruetry:whileTrue:try:ret_value=self.driver.create_directories(str(schedule_entry.working_dir))ifret_value:breakexceptOSErrorasexp:self.log.error("There was an issue creating directories: %s",exp)self.log.error("Unable to create directories while loading files into VM")self.connect_to_driver()ret_value=self.driver.create_directories(str(schedule_entry.working_dir))exceptAttributeError:self.log.info("Not creating working directory")fordatainschedule_entry.data:# Check if this file needs to be placed in a relative path# or an absolute path in the VM. If we do not have the right# key, we shouldn't load anything into the VM.try:target_path=Path(data["location"])exceptKeyError:self.log.warning("Processed event %s that may not have the right keys",data)continueifnottarget_path.is_absolute():ifnotschedule_entry.executable:self.log.error("Files require absolute paths unless they are ""for an vm_resource")returnFalsetarget_path=schedule_entry.working_dir/target_pathifdata.get("filename"):attempts=1local_path=Nonewhileattempts<10:try:local_path=Path(self.vm_resource_store.get_path(data["filename"]))ifnotlocal_path:self.log.error("Unable to get file: %s",data["filename"])attempts+=1time.sleep(self.load_balance_factor*2)# If there were no errors then the file was successfully cachedelse:breakexcept(FileNotFoundError,RuntimeError)asexp:self.log.exception(exp)self.log.error("Unable to get file: %s",data["filename"])attempts+=1time.sleep(self.load_balance_factor*2)ifnotlocal_path:self.log.error("Attempted 10 times to get file: %s",data["filename"])returnFalseattempts=1whileattempts<10:try:ifnotself.driver.file_exists(str(target_path)):self.log.debug("Writing file from: %s to %s",local_path,target_path)ret_value=Falsewhilenotret_value:ret_value=self.driver.write_from_file(str(target_path),str(local_path))ifnotret_value:self.log.error("UNABLE TO WRITE FILE")else:breakexceptOSError:self.log.error("Unable to connect to the driver, reconnecting and trying again.")attempts+=1time.sleep(self.load_balance_factor*2)self.connect_to_driver()else:returnFalse# Make the executable file executable on machines that aren't windowsif"executable"indata:self.driver.make_file_executable(str(target_path))elif"content"indataandisinstance(data["content"],str):ifnotself.driver.file_exists(str(target_path)):# Check that the parent directories existifnotself.driver.file_exists(str(target_path.parent)):success=self.driver.create_directories(str(target_path.parent))ifnotsuccess:self.log.error("Unable to create directory: %s",target_path.parent)returnFalse# File does not exist so write itif"Windows"inself.target_os:# Change newlines to windows newlinesdata["content"].replace("\n","\r\n")self.log.debug("Writing content to %s",target_path)ret_value=Falsewhilenotret_value:ret_value=self.driver.write(str(target_path),data["content"])ifnotret_value:self.log.error("UNABLE TO WRITE CONTENT")if"executable"indata:self.driver.make_file_executable(str(target_path))else:self.log.error("Data entry for schedule entry is not a file or content")self.log.error(schedule_entry)returnFalsereturnTrue
[docs]defreboot(self):""" Reboot the VM. Function doesn't return until the driver can communicate with the VM again. """self.log.debug("Rebooting")try:self.driver.reboot()exceptExceptionasexp:# noqa: BLE001self.log.exception(exp)# We now need to sleep a variable amount of time (25 - 45 seconds) aftersleep_time=self.load_balance_factor*random.SystemRandom().randint(25,45)# guest agent reconnects to a Windows VM too soon after being asked totime.sleep(sleep_time)if"Windows"inself.target_os:sleep_time=self.load_balance_factor*random.SystemRandom().randint(25,45)self.log.info("Windows sleep: %s seconds",sleep_time)time.sleep(sleep_time)self.connect_to_driver()
[docs]defimport_driver(self):""" Walk through all the available drivers and find the one that matches the type of VM that has been booted. Returns: object: The driver class that matches the VM's type """drivers=self._import_drivers()ifnotdrivers:self.log.error("Unable to find driver to communicate with VM: %s",self.config["vm_name"],)# If there is no driver then we can't talk to the VM.# Nothing left to do so exitsys.exit(1)# Walk the drivers looking for the one that has an engine# that matches the config's type (i.e. QemuVM)fordriverindrivers:ifdriver.get_engine()==self.config["engine"]:returndriverself.log.error("Unable to find driver for type: %s for VM: %s",self.config["engine"],self.config["vm_name"],)# No driver found. Can't communicate with the VM so exit.sys.exit(1)
[docs]def_import_drivers(self):""" Import available drivers from the driver directory. Walk the drivers directory looking for all available drivers that can talk to a booted VM. A qualifying driver must be a subclass of the abstract driver class, implementing all required methods of the driver interface. Returns: set: A set of available driver classes. """drivers=set()formodule_pathinself.driver_directory.rglob("*.py"):spec=importlib.util.spec_from_file_location(module_path.name,str(module_path))ifspec:module=importlib.util.module_from_spec(spec)try:spec.loader.exec_module(module)except(FileNotFoundError,SyntaxError):self.log.debug("Could not load module '%s'. Continuing",module)for_,driver_clsininspect.getmembers(module,self._check_driver):drivers.add(driver_cls)returndrivers
[docs]defset_state(self,state):""" Tell the infrastructure the state of the VM. If the state is 'configured', then check to see if this VM is the last VM to be configured. If it is the last VM to be configured, then set the experiment start time. Args: state (str): State of the VM """ifself.state==state:returntry:self.log.debug("SETTING STATE: %s",state)utils.set_vm_state(self.config["vm_uuid"],state,mapping=self.vm_mapping,log=self.log)self.state=stateexceptRuntimeErrorasexp:self.log.error("Error setting VM state. Can not set state to: %s",state)self.log.exception(exp)# Check to see if the experiment start time can be setifstate=="configured":ifself.experiment_start_time:returnnot_ready_count=utils.get_vm_count_not_ready(mapping=self.vm_mapping,log=self.log)# All VMs are configured, so set the start time.ifnot_ready_count==0:# Set the experiment start timetry:self.log.debug("SETTING EXPERIMENT START TIME")api.add_experiment_start_time()exceptExceptionasexp:# noqa: BLE001self.log.error("Unable to set the start time")self.log.exception(exp)
[docs]defset_current_time(self,cur_time):""" Tell the infrastructure the current configuration time of the VM. Args: cur_time (int): Current VM schedule time """try:utils.set_vm_time(self.config["vm_uuid"],cur_time,mapping=self.vm_mapping,log=self.log)exceptException:# noqa: BLE001self.log.error("Error setting VM state. Can not set state to: %s",cur_time)
# ---------------------------------- __main__ ----------------------------------if__name__=="__main__":# Get the vm config off the command linevm_config=sys.argv[1]vm_config=json.loads(vm_config)resource_handler=VMResourceHandler(vm_config)resource_handler.run()