Source code for layer2.tap_plugin

from netaddr import IPNetwork
from layer2.tap import Tap
from base_objects import Switch, FalseEdge, VMEndpoint

from firewheel.control.experiment_graph import Vertex, AbstractPlugin


[docs] class InsertTaps(AbstractPlugin): """ This plugin inserts a passive tap on designated edges and tunnels all mirrored traffic to the "collector" (i.e. Splunk, Bro, etc) specified on the :py:class:`Edge <firewheel.control.experiment_graph.Edge>`. Each "collector" gets an additional IP network in order to have the mirrored traffic from the taps GRE tunneled to it. Each tunnel gets its own interface of the form ``tap<integer>`` where integer is the GRE key. For example, if there is a tunnel between the "collector" and a tap using a GRE key of 1000 then the "collector" will have an interface named ``tap1000``. The ``tapX`` interfaces should then be listened on by the collecting software (i.e Bro). """
[docs] def run(self, collector_network="10.100.0.0/16"): # noqa: DOC502 """ Walk the graph and drop in passive taps on links that have been specified to be tapped. Args: collector_network (str, optional): IP space to pull subnets from. The subnets are added to the various collectors and associated taps in order to tunnel mirrored traffic to the collector. Defaults to ``'10.100.0.0/16'``. Raises: RuntimeError: If the collector specified on the :py:class:`Edge <firewheel.control.experiment_graph.Edge>` is not a name of the collector :py:class:`Vertex <firewheel.control.experiment_graph.Vertex>` nor the actual :py:class:`Vertex <firewheel.control.experiment_graph.Vertex>` object. """ collector_networks = IPNetwork(collector_network) # Pull /24 subnets out of the larger network subnets = collector_networks.subnet(24) def _has_collectors(edge): # Collectors will be saved as the `tap` attribute of the edge return bool(getattr(edge, "tap", False)) # Get all original edges with collectors (before modifying any edges) for edge in list(filter(_has_collectors, self.g.get_edges())): network = next(subnets) collectors = edge.tap _EdgeTapper(edge, network).tap_edge(collectors)
[docs] class _EdgeTapper: """ A transient object used to tap an :py:class:`Edge <firewheel.control.experiment_graph.Edge>`. """ _gre_key = 1000
[docs] def __init__(self, edge, network): """Initialize the Object. Arguments: edge (Edge): The edge to tap. network (netaddr.IPNetwork): The network used by the tap VM and the associated collectors. Attributes: _gre_key (int): The initial GRE tunnel key to use. _g (ExperimentGraph): The NetworkX graph for the given edge. tapped_edge (Edge): The edge to tap. network (netaddr.IPNetwork): The network used by the tap VM and the associated collectors. _ips (iter): An iterator for all the IP addresses in ``network``. _bridge_name (str): The default name of the bridge. Initially ``"br0"``. _tunnel_params (list): Any additional GRE tunnel parameters that are needed/used. """ self._g = edge.source.g self.tapped_edge = edge self.network = network self._ips = network.iter_hosts() # Add network info for the tap self._bridge_name = "br0" self._tunnel_params = []
[docs] def tap_edge(self, collectors): """ Tap the edge using all of the collectors. For each tapped edge, break the current link and drop in the passive tap. Then hook up the link through the tap. Each tap then mirrors the traffic through a GRE tunnel back to the collector that was specified on the edge. Args: collectors (list): A list of `Vertex` objects designated to tap the edge. """ # Ensure that the collectors argument is a list of `Vertex` objects / VM endpoints if not isinstance(collectors, (list, tuple)): collectors = [collectors] collectors = [self._validate_collector(_) for _ in collectors] # Determine the original switch and endpoint to be tapped orig_switch, endpoint = self._determine_edge_switch_and_endpoint() # Create the passive tap (requring an extra switch, since the # original link is being broken into two separate links) tap = self._create_tap(f"tap-{endpoint.name}") tap_switch = self._create_switch(f"{tap.name}.switch") tap_collector_switch = self._create_switch(f"{tap.name}-collectors.switch") # Reconstruct the physical connections to go through the tap self._reconstruct_edge(endpoint, orig_switch, tap, tap_switch) # Assign IP addresses and mirror traffic to the collectors tap_ip = next(self._ips) collector_ips = {collector: next(self._ips) for collector in collectors} self._mirror_traffic(tap, tap_ip, tap_collector_switch, collector_ips)
[docs] def _validate_collector(self, collector): """ Ensure that a given collector is a VM endpoint (or look it up). Args: collector (Vertex): A collector to be validated (or, if the collector is provided as a name, find the corresponding :py:class:`Vertex <firewheel.control.experiment_graph.Vertex>`. Returns: Vertex: The validated collector vertex. Raises: RuntimeError: If the collector specified is not a name of the collector :py:class:`Vertex <firewheel.control.experiment_graph.Vertex>` nor the actual :py:class:`Vertex <firewheel.control.experiment_graph.Vertex>` object. """ if isinstance(collector, str): collector = self._g.find_vertex(collector) if not collector.is_decorated_by(VMEndpoint): raise RuntimeError( "The collector specified on an `Edge` must be either the " "name of the collector vertex or the `Vertex` object." ) return collector
[docs] def _determine_edge_switch_and_endpoint(self): """ Determine the switch and the endpoint of the edge. Returns: tuple(Switch, Vertex): A :py:data:`tuple` that contains the :py:class:`Edge's <firewheel.control.experiment_graph.Edge>` terminal :py:class:`Vertex <firewheel.control.experiment_graph.Vertex>` that is a :py:class:`base_objects.Switch` and the :py:class:`Edge's <firewheel.control.experiment_graph.Edge>` terminal :py:class:`Vertex <firewheel.control.experiment_graph.Vertex>` that is a VM. """ if self.tapped_edge.source.is_decorated_by(Switch): switch = self.tapped_edge.source endpoint = self.tapped_edge.destination else: switch = self.tapped_edge.destination endpoint = self.tapped_edge.source return switch, endpoint
[docs] def _create_tap(self, tap_name): """Create a new node that is a :py:class:`layer2.tap.Tap`. Args: tap_name (str): The name of the new :py:class:`layer2.tap.Tap`. Returns: Tap: The newly created :py:class:`layer2.tap.Tap`. """ # Create the tap with the given name tap = Vertex(self._g, tap_name) tap.decorate(Tap) return tap
[docs] def _create_switch(self, switch_name): """Create a new :py:class:`base_objects.Switch`. Args: switch_name (str): The name of the new :py:class:`base_objects.Switch`. Returns: base_objects.Switch: The new :py:class:`base_objects.Switch`. """ # Create a tap switch with the given name. switch = Vertex(self._g, switch_name) switch.decorate(Switch) return switch
[docs] def _reconstruct_edge(self, endpoint, orig_switch, tap, tap_switch): """ Reconstruct the original edge. Using the the new tap switch, reconstruct the original edge so that it now connects the original switch to the endpoint via the tap and the tap switch. Args: endpoint (Vertex): The VM endpoint of the edge to be reconstructed. orig_switch (Vertex): The original switch terminating the edge to be reconstructed. tap (Vertex): The new VM endpoint to add into the reconstructed edge segments. tap_switch (Vertex): The new switch to add into the reconstructed edge segments. """ _, tap_to_orig_switch_edge = tap.l2_connect(orig_switch) _, tap_to_tap_switch_edge = tap.l2_connect(tap_switch) new_edge = self._refresh_endpoint_interface(endpoint, tap_switch) # Copy over any qos details new_edge.qos = getattr(self.tapped_edge, "qos", None) # Make the new edges "False" tap_to_orig_switch_edge.decorate(FalseEdge) tap_to_tap_switch_edge.decorate(FalseEdge) new_edge.decorate(FalseEdge)
[docs] def _refresh_endpoint_interface(self, endpoint, tap_switch): """ Refresh the endpoint interface. Args: endpoint (Vertex): The VM endpoint to have it's interface refreshed. tap_switch (Vertex): The new tap switch now connected to the VM endpoint. Returns: Edge: The new edge created by connecting the endpoint to the tap switch. Raises: RuntimeError: If an interface cannot be found for tapping the endpoint. """ interface = None for endpoint_interface in endpoint.interfaces.interfaces: if endpoint_interface.get("address") == self.tapped_edge.dst_ip: interface = endpoint_interface break else: raise RuntimeError( f"Could not find interface for tapping on endpoint: {endpoint.name}" ) # Find the original interface, delete it, re-add with info here endpoint.interfaces.del_interface(interface["name"]) new_interface_name, new_edge = endpoint.connect( tap_switch, interface["address"], interface["netmask"], ) # Keep the original interface name. This keeps other dictionaries # in the vertex that depend on interface names consistent new_interface = endpoint.interfaces.get_interface(new_interface_name) new_interface["name"] = interface["name"] return new_edge
[docs] def _mirror_traffic(self, tap, tap_ip, tap_collector_switch, collector_ips): """ Mirror traffic along the original tapped edge to the collectors. Connect the tap to a "monitor" network so that mirrored traffic can be tunneled to the collector. In theory this could go over the same network that already exists, but you run the risk of tapping other mirrored traffic at upstream taps, therefore it's best to isolate mirrored traffic to its own network. Args: tap (Vertex): The tap VM from which traffic is mirrored. tap_ip (netaddr.IPAddress): The IP address of the tap VM on the collector subnet. tap_collector_switch (Vertex): The switch connecting the tap VM to the collectors. collector_ips (dict): A dictionary mapping collector vertices to their IP address in the subnet defined for this tap. """ tap.l2_mitm(self._bridge_name) tap.connect(tap_collector_switch, tap_ip, self.network.netmask) # Connect each collector into the specified tap subnet self._tunnel_params = [] for collector, collector_ip in collector_ips.items(): collector.connect(tap_collector_switch, collector_ip, self.network.netmask) # Set up the GRE tunnel endpoint on the collector self._set_up_gre_tunnel_endpoint(collector, collector_ip, tap_ip) tap.mirror_traffic(self._bridge_name, *self._tunnel_params)
[docs] def _set_up_gre_tunnel_endpoint(self, collector, collector_ip, tap_ip): """ Add the tap via the GRE tunnel endpoint. Args: collector (Vertex): The collector on which to set the tap. collector_ip (netaddr.IPAddress): The IP of the collector vertex in the subnet defined for this tap. tap_ip (netaddr.IPAddress): The IP of the tapping VM in the subnet defined for this tap. """ collector.run_executable( -100, "ip", f"link add tap{self._gre_key} type gretap key {self._gre_key} " f"local {collector_ip} remote {tap_ip} ttl 255", ) collector.run_executable(-99, "ip", f"link set dev tap{self._gre_key} up") collector.run_executable(-98, "ip", f"link set tap{self._gre_key} promisc on") self._tunnel_params.append((collector_ip, self._gre_key)) # Increment the GRE tunnel key self._gre_key += 1