[docs]classPlugin(AbstractPlugin):"""This Plugin parses the experiment graph to build necessary data structures which can be passed to `Discovery <https://github.com/sandia-minimega/discovery>`__. Discovery will then generate the necessary minimega commands and this Plugin will launch the experiment. """
[docs]definsert_vm_endpoint(self,vme_conf):""" For each VM, we will create a dictionary containing all information required to launch it. Then pass this to discovery's :py:meth:`insert <firewheel.lib.discovery.api.discoveryAPI.insert_endpoint>` endpoint. The NIC/interface information is not included as it will be handled with :py:meth:`make_endpoint_connections <minimega.parse_experiment_graph_plugin.Plugin.make_endpoint_connections>`. Args: vme_conf (dict): The generated minimega configuration dictionary created by :ref:`minimega.emulated_entities_mc`. Raises: RuntimeError: If minimega tags are used and a tag name conflicts with a required configuration key. Returns: dict: The newly created endpoint in the discovery graph. """data={}data["architecture"]=vme_conf["vm"]["architecture"]data["name"]=vme_conf["vm"]["name"]data["uuid"]=vme_conf["vm"]["uuid"]try:data["control_ip"]=vme_conf["aux"]["control_ip"]exceptKeyError:# A control IP is not necessarydata["control_ip"]=""data["type"]="qemu"data["cpu_model"]="host"data["memory"]=vme_conf["vm"]["memory"]# Setting various CPU valuesforsmp_keyin["smp_sockets","smp_cores","smp_threads"]:data[smp_key]=str(vme_conf["vm"][smp_key])# We need to manually set maxcpus to override default of 1vcpus=(int(data["smp_sockets"])*int(data["smp_cores"])*int(data["smp_threads"]))data["vcpus"]=str(vcpus)data["vga_model"]=vme_conf["vm"]["vga_model"]# NOTE: we assume the first disk is the imagedata["image"]=vme_conf["aux"]["disks"][0]["file"]disk_strs=[]fordriveinvme_conf["aux"]["disks"]:disk_str=",".join([drive["path"],drive["interface"],drive["cache"]])disk_strs.append(disk_str)data["disks"]=" ".join(disk_strs)# Currently, there is only support for a num_serial option# we need one for the qga socketdata["virtio_ports"]="org.qemu.guest_agent.0"# Get all the minimega tagsfortag_key,tag_valueinvme_conf["tags"].items():iftag_keyindata:raiseRuntimeError(f"Tag {tag_key} conflicts with required config key.""Please rename your tag.")data[tag_key]=tag_value# Add any other QEMU options here.data["qemu_append"]=vme_conf["aux"]["qemu_append_str"]# If there's any coscheduling parameters, propagate themdata["coschedule"]=str(vme_conf["coschedule"])# Create the endpoint in the discovery graph.mm_endpoint=self.discovery_api.insert_endpoint(mm_node_properties=data)[0]returnmm_endpoint
[docs]defmake_endpoint_connections(self,mm_endpoint,vme_conf,switch_name_to_nid):"""This method informs discovery about all the connections that need to happen to launch the experiment. Due to some limitations with discovery, we are unable to add an edge between the VM and a switch containing all of the necessary information in a single discovery API call. Instead, we need to do the following: 1. Create an edge between the VM and the switch via discovery's connect endpoint. This returns the updated VM endpoint, which has an empty dictionary added to its list of edges. 2. Create a dictionary containing the edge's attributes. We store this dictionary in a local list so that we can update all of the edges for this VM in a single update endpoint API call. 3. Once all edges are initialized in the discovery graph, we update the edges contained in our VM endpoint dictionary with the ones from our local edge list. We update the edges in discovery by passing this updated VM dictionary to discovery's update endpoint. Args: mm_endpoint (dict): The discovery endpoint dictionary created by :py:meth:`insert_vm_endpoint <minimega.parse_experiment_graph_plugin.Plugin.insert_vm_endpoint>`. vme_conf (dict): The generated minimega configuration dictionary created by :ref:`minimega.emulated_entities_mc`. switch_name_to_nid (dict): A dictionary of network IDs which are used by discovery. Returns: dict: The newly updated endpoint in the discovery graph. """try:assertisinstance(vme_conf["aux"]["nic"],list)exceptKeyError:returnNoneedges=[]fornicinvme_conf["aux"]["nic"]:# Create an edge between the VM and the switch using the network identifier# for that switch that we created earlier with the insert network API call.switch_name=nic["switch_name"]switch_network_id=switch_name_to_nid[switch_name]mm_endpoint=self.discovery_api.connect_endpoint(mm_endpoint["NID"],switch_network_id)# Create a dictionary containing this edge's attributes and hold on to it until# we have finished inserting all edges.edge_data={"mac":nic["mac"],"driver":nic["driver"]}edge_data["bridge"]=fw_config["minimega"]["control_bridge"]# QoS attributes e.g. loss, delay, rateforqos_key,qos_valueinnic["qos"].items():ifqos_value:edge_data[qos_key]=str(qos_value)edge={"N":switch_network_id,"D":edge_data}edges.append(edge)# Now that we have (1) initialized all of the edges and (2) created a local list of updated# edges, we can send an updated dictionary for our endpoint containing the updated list# of edges to the discovery's update endpoint.foreid,edgeinenumerate(edges):assertmm_endpoint["Edges"][eid]["N"]==edge["N"]mm_endpoint["Edges"][eid]["D"]=edge["D"]mm_endpoint=self.discovery_api.update_endpoint(mm_node_properties=mm_endpoint)returnmm_endpoint
[docs]defrun(self):"""This method contains the primary logic to launch an experiment. It has several objectives: #. Add all VMs and connections to Discovery's graph. #. Invoke discovery to output this data as minimega commands via the ``minemiter`` command. See the Discovery source code for more information about `minemiter <https://github.com/sandia-minimega/discovery/tree/master/src/cmds/minemiter>`_. #. Have minimega read the output to launch the VMs. #. Finish setting up the control network (if any exists). #. Launch a :ref:`vm-resource-handler` for each VM using minimega to start the process. """self.discovery_api=discoveryAPI()switch_names=set()forvertexinself.g.get_vertices():ifvertex.is_decorated_by(MinimegaEmulatedEntity):ifvertex.is_decorated_by(VMEndpoint):try:forifaceinvertex.interfaces.interfaces:switch_name=iface["switch"].nameswitch_names.add(switch_name)exceptAttributeError:self.log.debug("%s doesn't have any interfaces.",vertex.name)switch_name_to_nid={}switch_names=set(switch_names)original_switch_len=len(switch_names)# if there is a control network, insert onetry:assertself.g.control_net["name"]assertself.g.control_net["host_addr"]self.control_net=self.g.control_net["name"]except(AttributeError,KeyError,AssertionError):self.control_net=Falseifself.control_netandself.control_netinswitch_names:network_ids=self.discovery_api.insert_network()network_id=network_ids[0]["NID"]switch_name_to_nid[self.control_net]=network_idswitch_names.remove(self.control_net)assertlen(switch_names)==original_switch_len-1forsw_nameinswitch_names:network_ids=self.discovery_api.insert_network()network_id=network_ids[0]["NID"]switch_name_to_nid[sw_name]=network_idnum_vms=0vme_confs={}forvertexinself.g.get_vertices():ifvertex.is_decorated_by(MinimegaEmulatedEntity):ifvertex.is_decorated_by(VMEndpoint):# For each VM, we will add it to the discovery graph# using the insert_endpoint API call.vme_conf=vertex.generate_minimega_config()vme_confs[vertex.name]=vme_confnum_vms+=1mm_endpoint=self.insert_vm_endpoint(vme_conf)# Then we will insert edges for each of its NICs.mm_endpoint=self.make_endpoint_connections(mm_endpoint,vme_conf,switch_name_to_nid)self.discovery_api.set_config("queueing","true")minemiter_path=os.path.join(fw_config["discovery"]["install_dir"],"bin","minemiter")template_path=os.path.join(fw_config["discovery"]["install_dir"],"templates")minimega_bin_path=os.path.join(fw_config["minimega"]["install_dir"],"bin","minimega")fw2mm_path=os.path.join(fw_config["system"]["default_output_dir"],"fw2mm.mm")# Note that this uses the FIREWHEEL configuration to launch a new process# if there are not access controls on this file or the user accounts there# could be security concerns.minemiter_ret=subprocess.check_call(# nosec[minemiter_path,"-path",template_path,"-w",fw2mm_path,"-server",self.discovery_api.bind_addr,])self.log.debug("minemiter_ret=%s",minemiter_ret)# Note that this uses the FIREWHEEL configuration to launch a new process# if there are not access controls on this file or the user accounts there# could be security concerns.launch_vms_ret=subprocess.check_call(# nosec[minimega_bin_path,f"-base={fw_config['minimega']['base_dir']}","-e","read",fw2mm_path,])self.log.debug("launch_vms_ret=%s",launch_vms_ret)# Wait for all VMs to launch.vm_map={}mm_api=minimegaAPI()all_vms_launched=Falsecore_vms=Noneforiinrange(100):sleep(0.5)try:core_vms=mm_api.mm_vms()found_vms=len(core_vms)all_vms_launched=found_vms==num_vmsself.log.debug("Iteration num=%s. Waiting for vm configs from minimega: ""found=%s, expected=%s",i,found_vms,num_vms,)# Unknown errors could be returned from minimega# so catch all possibilities.exceptException:self.log.debug("Iteration num=%s. Waiting for vm configs from minimega: exception",i,)self.log.exception()continueifall_vms_launched:breakassertall_vms_launched# If there is a control_net, tap itifself.control_net:mm_network_name=f"network-{switch_name_to_nid[self.control_net]}"mm_api.mm.tap_create_ip(mm_network_name,self.g.control_net["host_addr"])forvm_name,vmincore_vms.items():vm_map[vm_name]=vm["hostname"]defupdate_socket_path(process_config):original_socket_path=process_config["path"]socket_filename=os.path.basename(original_socket_path)mm_id=core_vms[process_config["vm_name"]]["id"]full_socket_path=os.path.join(mm_api.mm_base,mm_id,socket_filename)process_config["path"]=full_socket_path# Launch the vm_resource_handlers.launch_cmds=[]forvm_name,vme_confinvme_confs.items():try:process_config=vme_conf["aux"]["handler_process"]exceptKeyError:self.log.debug("no process_config for %s",vm_name)continuehostname=vm_map[vm_name]# Run the VM Resource Handler with the correct python pathbinary_name=sys.executablehandler_path=process_config["binary_name"]update_socket_path(process_config)handler_args=json.dumps(process_config,separators=(",",":"))ifmm_api.cluster_head_node==hostname:# need to escape once on local commandshandler_args=handler_args.replace('"','\\"')launch_cmd=f"background {binary_name}{handler_path} '{handler_args}'"else:# need to escape twice on mesh send commandshandler_args=handler_args.replace('"','\\\\"')launch_cmd=str(f"mesh send {hostname} background {binary_name} "f"{handler_path} '{handler_args}'")launch_cmds.append(launch_cmd)launch_cmds_path=os.path.join(fw_config["system"]["default_output_dir"],"launch_cmds.mm")withopen(launch_cmds_path,"w",encoding="UTF-8")asf_hand:forlaunch_cmdinlaunch_cmds:f_hand.write(launch_cmd+"\n")# Note that this uses the FIREWHEEL configuration to launch a new process# if there are not access controls on this file or the user accounts there# could be security concerns.launch_handlers_ret=subprocess.check_call(# nosec[minimega_bin_path,f"-base={fw_config['minimega']['base_dir']}","-e","read",launch_cmds_path,])self.log.debug("launch_handlers_ret=%s",launch_handlers_ret)