import os
import sys
import json
import subprocess
from time import sleep
from base_objects import VMEndpoint
from minimega.emulated_entities import MinimegaEmulatedEntity
from firewheel.config import config as fw_config
from firewheel.lib.minimega.api import minimegaAPI
from firewheel.lib.discovery.api import discoveryAPI
from firewheel.control.experiment_graph import AbstractPlugin
[docs]
class Plugin(AbstractPlugin):
"""This Plugin parses the experiment graph to build necessary data structures which
can be passed to `Discovery <https://github.com/sandia-minimega/discovery>`__.
Discovery will then generate the necessary minimega commands and this Plugin will
launch the experiment.
"""
[docs]
def insert_vm_endpoint(self, vme_conf):
"""
For each VM, we will create a dictionary containing all information required to
launch it. Then pass this to discovery's
:py:meth:`insert <firewheel.lib.discovery.api.discoveryAPI.insert_endpoint>` endpoint.
The NIC/interface information is not included as it will be handled with
:py:meth:`make_endpoint_connections <minimega.parse_experiment_graph_plugin.Plugin.make_endpoint_connections>`.
Args:
vme_conf (dict): The generated minimega configuration dictionary created by
:ref:`minimega.emulated_entities_mc`.
Raises:
RuntimeError: If minimega tags are used and a tag name conflicts with
a required configuration key.
Returns:
dict: The newly created endpoint in the discovery graph.
"""
data = {}
data["architecture"] = vme_conf["vm"]["architecture"]
data["name"] = vme_conf["vm"]["name"]
data["uuid"] = vme_conf["vm"]["uuid"]
try:
data["control_ip"] = vme_conf["aux"]["control_ip"]
except KeyError:
# A control IP is not necessary
data["control_ip"] = ""
data["type"] = "qemu"
data["cpu_model"] = "host"
data["memory"] = vme_conf["vm"]["memory"]
# Setting various CPU values
for smp_key in ["smp_sockets", "smp_cores", "smp_threads"]:
data[smp_key] = str(vme_conf["vm"][smp_key])
# We need to manually set maxcpus to override default of 1
vcpus = (
int(data["smp_sockets"]) * int(data["smp_cores"]) * int(data["smp_threads"])
)
data["vcpus"] = str(vcpus)
data["vga_model"] = vme_conf["vm"]["vga_model"]
# NOTE: we assume the first disk is the image
data["image"] = vme_conf["aux"]["disks"][0]["file"]
disk_strs = []
for drive in vme_conf["aux"]["disks"]:
disk_str = ",".join([drive["path"], drive["interface"], drive["cache"]])
disk_strs.append(disk_str)
data["disks"] = " ".join(disk_strs)
# Currently, there is only support for a num_serial option
# we need one for the qga socket
data["virtio_ports"] = "org.qemu.guest_agent.0"
# Get all the minimega tags
for tag_key, tag_value in vme_conf["tags"].items():
if tag_key in data:
raise RuntimeError(
f"Tag {tag_key} conflicts with required config key."
"Please rename your tag."
)
data[tag_key] = tag_value
# Add any other QEMU options here.
data["qemu_append"] = vme_conf["aux"]["qemu_append_str"]
# If there's any coscheduling parameters, propagate them
data["coschedule"] = str(vme_conf["coschedule"])
# Create the endpoint in the discovery graph.
mm_endpoint = self.discovery_api.insert_endpoint(mm_node_properties=data)[0]
return mm_endpoint
[docs]
def make_endpoint_connections(self, mm_endpoint, vme_conf, switch_name_to_nid):
"""This method informs discovery about all the connections that need to happen to launch
the experiment. Due to some limitations with discovery, we are unable to add an edge
between the VM and a switch containing all of the necessary information in a single
discovery API call. Instead, we need to do the following:
1. Create an edge between the VM and the switch via discovery's connect endpoint.
This returns the updated VM endpoint, which has an empty dictionary added
to its list of edges.
2. Create a dictionary containing the edge's attributes. We store this
dictionary in a local list so that we can update all of the edges for
this VM in a single update endpoint API call.
3. Once all edges are initialized in the discovery graph, we update the edges
contained in our VM endpoint dictionary with the ones from our local edge list.
We update the edges in discovery by passing this updated VM dictionary to
discovery's update endpoint.
Args:
mm_endpoint (dict): The discovery endpoint dictionary created by
:py:meth:`insert_vm_endpoint <minimega.parse_experiment_graph_plugin.Plugin.insert_vm_endpoint>`.
vme_conf (dict): The generated minimega configuration dictionary created by
:ref:`minimega.emulated_entities_mc`.
switch_name_to_nid (dict): A dictionary of network IDs which are used by discovery.
Returns:
dict: The newly updated endpoint in the discovery graph.
"""
try:
assert isinstance(vme_conf["aux"]["nic"], list)
except KeyError:
return None
edges = []
for nic in vme_conf["aux"]["nic"]:
# Create an edge between the VM and the switch using the network identifier
# for that switch that we created earlier with the insert network API call.
switch_name = nic["switch_name"]
switch_network_id = switch_name_to_nid[switch_name]
mm_endpoint = self.discovery_api.connect_endpoint(
mm_endpoint["NID"], switch_network_id
)
# Create a dictionary containing this edge's attributes and hold on to it until
# we have finished inserting all edges.
edge_data = {"mac": nic["mac"], "driver": nic["driver"]}
edge_data["bridge"] = fw_config["minimega"]["control_bridge"]
# QoS attributes e.g. loss, delay, rate
for qos_key, qos_value in nic["qos"].items():
if qos_value:
edge_data[qos_key] = str(qos_value)
edge = {"N": switch_network_id, "D": edge_data}
edges.append(edge)
# Now that we have (1) initialized all of the edges and (2) created a local list of updated
# edges, we can send an updated dictionary for our endpoint containing the updated list
# of edges to the discovery's update endpoint.
for eid, edge in enumerate(edges):
assert mm_endpoint["Edges"][eid]["N"] == edge["N"]
mm_endpoint["Edges"][eid]["D"] = edge["D"]
mm_endpoint = self.discovery_api.update_endpoint(mm_node_properties=mm_endpoint)
return mm_endpoint
[docs]
def run(self):
"""This method contains the primary logic to launch an experiment.
It has several objectives:
#. Add all VMs and connections to Discovery's graph.
#. Invoke discovery to output this data as minimega commands via the ``minemiter`` command.
See the Discovery source code for more information about
`minemiter <https://github.com/sandia-minimega/discovery/tree/master/src/cmds/minemiter>`_.
#. Have minimega read the output to launch the VMs.
#. Finish setting up the control network (if any exists).
#. Launch a :ref:`vm-resource-handler` for each VM using minimega to start the process.
"""
self.discovery_api = discoveryAPI()
switch_names = set()
for vertex in self.g.get_vertices():
if vertex.is_decorated_by(MinimegaEmulatedEntity):
if vertex.is_decorated_by(VMEndpoint):
try:
for iface in vertex.interfaces.interfaces:
switch_name = iface["switch"].name
switch_names.add(switch_name)
except AttributeError:
self.log.debug("%s doesn't have any interfaces.", vertex.name)
switch_name_to_nid = {}
switch_names = set(switch_names)
original_switch_len = len(switch_names)
# if there is a control network, insert one
try:
assert self.g.control_net["name"]
assert self.g.control_net["host_addr"]
self.control_net = self.g.control_net["name"]
except (AttributeError, KeyError, AssertionError):
self.control_net = False
if self.control_net and self.control_net in switch_names:
network_ids = self.discovery_api.insert_network()
network_id = network_ids[0]["NID"]
switch_name_to_nid[self.control_net] = network_id
switch_names.remove(self.control_net)
assert len(switch_names) == original_switch_len - 1
for sw_name in switch_names:
network_ids = self.discovery_api.insert_network()
network_id = network_ids[0]["NID"]
switch_name_to_nid[sw_name] = network_id
num_vms = 0
vme_confs = {}
for vertex in self.g.get_vertices():
if vertex.is_decorated_by(MinimegaEmulatedEntity):
if vertex.is_decorated_by(VMEndpoint):
# For each VM, we will add it to the discovery graph
# using the insert_endpoint API call.
vme_conf = vertex.generate_minimega_config()
vme_confs[vertex.name] = vme_conf
num_vms += 1
mm_endpoint = self.insert_vm_endpoint(vme_conf)
# Then we will insert edges for each of its NICs.
mm_endpoint = self.make_endpoint_connections(
mm_endpoint, vme_conf, switch_name_to_nid
)
self.discovery_api.set_config("queueing", "true")
minemiter_path = os.path.join(
fw_config["discovery"]["install_dir"], "bin", "minemiter"
)
template_path = os.path.join(fw_config["discovery"]["install_dir"], "templates")
minimega_bin_path = os.path.join(
fw_config["minimega"]["install_dir"], "bin", "minimega"
)
fw2mm_path = os.path.join(fw_config["system"]["default_output_dir"], "fw2mm.mm")
# Note that this uses the FIREWHEEL configuration to launch a new process
# if there are not access controls on this file or the user accounts there
# could be security concerns.
minemiter_ret = subprocess.check_call( # nosec
[
minemiter_path,
"-path",
template_path,
"-w",
fw2mm_path,
"-server",
self.discovery_api.bind_addr,
]
)
self.log.debug("minemiter_ret=%s", minemiter_ret)
# Note that this uses the FIREWHEEL configuration to launch a new process
# if there are not access controls on this file or the user accounts there
# could be security concerns.
launch_vms_ret = subprocess.check_call( # nosec
[
minimega_bin_path,
f"-base={fw_config['minimega']['base_dir']}",
"-e",
"read",
fw2mm_path,
]
)
self.log.debug("launch_vms_ret=%s", launch_vms_ret)
# Wait for all VMs to launch.
vm_map = {}
mm_api = minimegaAPI()
all_vms_launched = False
core_vms = None
for i in range(100):
sleep(0.5)
try:
core_vms = mm_api.mm_vms()
found_vms = len(core_vms)
all_vms_launched = found_vms == num_vms
self.log.debug(
"Iteration num=%s. Waiting for vm configs from minimega: "
"found=%s, expected=%s",
i,
found_vms,
num_vms,
)
# Unknown errors could be returned from minimega
# so catch all possibilities.
except Exception:
self.log.debug(
"Iteration num=%s. Waiting for vm configs from minimega: exception",
i,
)
self.log.exception()
continue
if all_vms_launched:
break
assert all_vms_launched
# If there is a control_net, tap it
if self.control_net:
mm_network_name = f"network-{switch_name_to_nid[self.control_net]}"
mm_api.mm.tap_create_ip(mm_network_name, self.g.control_net["host_addr"])
for vm_name, vm in core_vms.items():
vm_map[vm_name] = vm["hostname"]
def update_socket_path(process_config):
original_socket_path = process_config["path"]
socket_filename = os.path.basename(original_socket_path)
mm_id = core_vms[process_config["vm_name"]]["id"]
full_socket_path = os.path.join(mm_api.mm_base, mm_id, socket_filename)
process_config["path"] = full_socket_path
# Launch the vm_resource_handlers.
launch_cmds = []
for vm_name, vme_conf in vme_confs.items():
try:
process_config = vme_conf["aux"]["handler_process"]
except KeyError:
self.log.debug("no process_config for %s", vm_name)
continue
hostname = vm_map[vm_name]
# Run the VM Resource Handler with the correct python path
binary_name = sys.executable
handler_path = process_config["binary_name"]
update_socket_path(process_config)
handler_args = json.dumps(process_config, separators=(",", ":"))
if mm_api.cluster_head_node == hostname:
# need to escape once on local commands
handler_args = handler_args.replace('"', '\\"')
launch_cmd = f"background {binary_name} {handler_path} '{handler_args}'"
else:
# need to escape twice on mesh send commands
handler_args = handler_args.replace('"', '\\\\"')
launch_cmd = str(
f"mesh send {hostname} background {binary_name} "
f"{handler_path} '{handler_args}'"
)
launch_cmds.append(launch_cmd)
launch_cmds_path = os.path.join(
fw_config["system"]["default_output_dir"], "launch_cmds.mm"
)
with open(launch_cmds_path, "w", encoding="UTF-8") as f_hand:
for launch_cmd in launch_cmds:
f_hand.write(launch_cmd + "\n")
# Note that this uses the FIREWHEEL configuration to launch a new process
# if there are not access controls on this file or the user accounts there
# could be security concerns.
launch_handlers_ret = subprocess.check_call( # nosec
[
minimega_bin_path,
f"-base={fw_config['minimega']['base_dir']}",
"-e",
"read",
launch_cmds_path,
]
)
self.log.debug("launch_handlers_ret=%s", launch_handlers_ret)