Source code for caida.save_plugin

import json
from pprint import pprint

from base_objects import VMEndpoint

from firewheel.control.experiment_graph import AbstractPlugin


[docs] class Save(AbstractPlugin): """ Save the current experiment network topology to a JSON file. """
[docs] def run(self, filename): """ Save the current experiment network topology to a JSON file. Switches are not included in the output as they can be inferred from :py:class:`base_objects.VMEndpoint` interfaces. Args: filename (str): The path to the JSON file where the topology data will be saved. Raises: TypeError: If the filename is not provided. """ if not filename: raise TypeError( "Must provide a filename to ``caida.save`` for the JSON output." ) output = {"vertices": []} for vertex in self.g.get_vertices(): if not vertex.is_decorated_by(VMEndpoint): # Don't need switches since they can be backed out from # VMEndpoint interfaces continue attributes = {} for obj in vertex.__dict__: if obj in {"valid", "skip_list", "graph_id"}: continue if self.is_jsonable(vertex.__dict__[obj]): attributes[obj] = vertex.__dict__[obj] try: attributes["interfaces"] = [] for interface in vertex.interfaces.interfaces: iface = {} for key in interface: if key == "switch": iface[key] = interface[key].name else: iface[key] = str(interface[key]) attributes["interfaces"].append(iface) except KeyError: pass try: # Trigger KeyError to skip if this vertex # isn't configured for BGP routing vertex.routing["bgp"] # Pick up the routing dictionary that was # skipped above due to unserializable values attributes["routing"] = vertex.routing try: networks = [] for network in attributes["routing"]["bgp"]["networks"]: networks.append(str(network)) attributes["routing"]["bgp"]["networks"] = networks except KeyError: # Not every BGP router advertises its own networks pass try: neighbors = {} for n in attributes["routing"]["bgp"]["neighbors"]: neighbor = self.find_router_by_as(n["remote-as"]) if not neighbor: print( "Could not find neighbor with AS: %s" % n["remote-as"] ) continue switch = self.find_switch(vertex, neighbor) if not switch: print( "Could not find switch between: %s <-> %s" % (vertex.name, neighbor.name) ) continue neighbors[neighbor.name] = switch attributes["routing"]["bgp"]["neighbors"] = neighbors except KeyError: print("BGP router has no neighbors: %s" % vertex.name) except AttributeError: # This is an out if this vertex isn't a router and therefore # does not have a routing attribute pass except KeyError: # This is an out for routers that don't have BGP configured pass except Exception: # noqa: BLE001 print("Could not handle routing parameters:") pprint(vertex.routing) del attributes["routing"] output["vertices"].append(attributes) with open(filename, "w", encoding="utf-8") as f: json.dump(output, f, indent=4)
[docs] def is_jsonable(self, obj): """ Check if an object can be serialized to JSON. Args: obj (Vertex): The object to check. Returns: bool: :py:data:`True` if the object can be serialized to JSON, :py:data:`False` otherwise. """ try: json.dumps(obj) return True except TypeError: return False
[docs] def find_router_by_as(self, bgp_as): """ Find a router vertex by its BGP AS number. Args: bgp_as (int): The BGP AS number to search for. Returns: Vertex: The router vertex with the specified BGP AS number, or None if not found. """ for v in self.g.get_vertices(): if v.type != "router": continue try: remote_as = v.routing["bgp"]["parameters"]["router-as"] if remote_as == bgp_as: return v except AttributeError: # Handle routers with no routing information continue except KeyError: # Handle non-BGP routers continue return None
[docs] def find_switch(self, v1, v2): """ Find a switch vertex that connects two router vertices. If multiple switches are found, a warning is printed and only one the switches is returned. Args: v1 (Vertex): The first router vertex. v2 (Vertex): The second router vertex. Returns: Vertex: The switch vertex that connects the two routers, or None if not found. """ v1_switches = set() for interface in v1.interfaces.interfaces: v1_switches.add(interface["switch"].name) v2_switches = set() for interface in v2.interfaces.interfaces: v2_switches.add(interface["switch"].name) result = v1_switches & v2_switches if not result: return None if len(result) > 1: print( "Found multiple switches between routers: %s <-> %s" % (v1.name, v2.name) ) # only return a single switch return result.pop()