Source code for caida.load_plugin

import json
from pprint import pprint

from netaddr import IPNetwork
from base_objects import Switch, VMEndpoint
from generic_vm_objects import GenericRouter

from firewheel.control.experiment_graph import Vertex, AbstractPlugin


[docs] class Load(AbstractPlugin): """ Load a CAIDA network topology from a JSON file and process its vertices. """
[docs] def run(self, filename): """ Load a CAIDA network topology from a JSON file and process its vertices. The JSON file should be created via the :ref:`caida.save_mc`. Populates the `self.switches` and `self.routers` dictionaries per the topology and configures BGP for routers in a second pass over the data. Args: filename (str): The path to the JSON file containing the topology data. Raises: TypeError: If the filename is not provided. """ if not filename: raise TypeError( "Must provide a filename to ``caida.load`` for the JSON input." "This should be the output of ``caida.save``." ) with open(filename, "r", encoding="utf-8") as json_topology: topology = json.load(json_topology) topology = topology["vertices"] self.switches = {} self.routers = {} for vtx in topology: if "name" not in vtx: print("Unable to load JSON vertex with no name:") pprint(vtx) continue if "type" not in vtx: print("Unable to load JSON vertex with no type:") pprint(vtx) continue if vtx["type"] == "host": self.handle_host(vtx) elif vtx["type"] == "router": self.handle_router(vtx) else: print(f"No load handler for vertex of type: {vtx['name']}") pprint(vtx) # Need all routers created before BGP is configured. # This forces a second pass over the data for vtx in topology: if "type" in vtx and vtx["type"] == "router": self.handle_bgp(vtx)
[docs] def handle_host(self, vtx): """ Creates a host vertex in the graph and adds its interfaces. Args: vtx (dict): The vertex dictionary containing host information. """ host = Vertex(self.g, vtx["name"]) host.decorate(VMEndpoint) try: self.add_interfaces(host, vtx["interfaces"]) except KeyError: print(f"No interfaces for host: {host.name}")
[docs] def add_interfaces(self, vertex, interfaces): """ Connects the vertex to switches in the graph based on the interface data. Args: vertex (Vertex): The vertex to which interfaces will be added. interfaces (list): A list of interface dictionaries. """ if not interfaces: print(f"No interfaces found for vertex: {vertex.name}") print("NOT able to connect it into the graph.") for interface in interfaces: switch = self.get_switch(interface["switch"]) vertex.connect(switch, interface["address"], interface["netmask"])
[docs] def handle_router(self, vtx): """ Creates a router vertex in the graph, adds its interfaces, and sets up BGP if specified. This method currently does not handle OSPF links, which will need to happen prior to BGP so that it can properly advertise the BGP links. Args: vtx (dict): The vertex dictionary containing router information. """ router = Vertex(self.g, vtx["name"]) router.decorate(GenericRouter) self.routers[router.name] = router self.add_interfaces(router, vtx["interfaces"]) # Can't link BGP routers without AS set, so it has to be # done in the first pass if "routing" not in vtx or "bgp" not in vtx["routing"]: return if ( "parameters" not in vtx["routing"]["bgp"] or "router-as" not in vtx["routing"]["bgp"]["parameters"] ): print(f"BGP Router does not have AS number: {router.name}") return router.set_bgp_as(vtx["routing"]["bgp"]["parameters"]["router-as"])
[docs] def handle_bgp(self, vtx): """ Sets up BGP networks and neighbors for a router vertex. Args: vtx (dict): The vertex dictionary containing BGP configuration. """ if vtx["name"] not in self.routers: print(f"No router named: {vtx['name']}") return router = self.routers[vtx["name"]] try: bgp = vtx["routing"]["bgp"] except KeyError: return try: for network in bgp["networks"]: router.add_bgp_network(IPNetwork(network)) except KeyError: # Not all BGP routers advertise their own networks pass try: for peer in bgp["neighbors"]: switch = self.get_switch(bgp["neighbors"][peer]) if peer not in self.routers: print(f"Cannot find BGP peer: {peer}") continue router.link_bgp(self.routers[peer], switch) except KeyError: pass
[docs] def get_switch(self, switch_name): """ Adds the switch to the `self.switches` dictionary if it is newly created. Args: switch_name (str): The name of the switch. Returns: Vertex: The switch vertex. """ if switch_name not in self.switches: # If it isn't in the cache, then it is unlikely # that it is in the graph, but in case this plugin # gets chained in unexpected ways, be safe and check switch = self.g.find_vertex(switch_name) if not switch: switch = Vertex(self.g, switch_name) switch.decorate(Switch) self.switches[switch_name] = switch return switch return self.switches[switch_name]