Source code for peat.heat.umas_extractor

"""
HEAT protocol extractor for the Schneider UMAS protocol.

UMAS packets in Elasticsearch will have the 'type' field set to 'umas'.

UMAS protocol fields

- modbus.transaction_identifier
- umas.connection_id
- umas.function_code
- umas.function_name
- umas.function_description
- umas.direction
- umas.data
- umas.block_id (only if umas.function_name is UPLOAD_BLOCK or DOWNLOAD_BLOCK)
- umas.block_len (only if umas.function_name is UPLOAD_BLOCK)
- umas.max_packet_size (only if umas.function_name is INITIALIZE_DOWNLOAD)
- umas.blocks_transferred (only if umas.function_name is END_UPLOAD or END_DOWNLOAD)

Authors

- Christopher Goes
- John Jacobellis
- Ryan Adams
"""

import binascii
import itertools
from collections import defaultdict

from peat import Interface, Service, config, datastore, log, state, utils
from peat.protocols.common import mac_to_vendor

from .heat_classes import HeatArtifact, HeatProtocol


[docs] class UmasExtractor(HeatProtocol): """HEAT protocol extractor for the Schneider UMAS protocol.""" START_FUNCS = {"INITIALIZE_DOWNLOAD", "INITIALIZE_UPLOAD"} BLOCK_FUNCS = {"DOWNLOAD_BLOCK", "UPLOAD_BLOCK"} END_FUNCS = {"END_DOWNLOAD", "END_UPLOAD"} RESP_FUNCS = {"RESPONSE_OK", "RESPONSE_ERROR"} FUNCS = [*START_FUNCS, *BLOCK_FUNCS, *END_FUNCS, *RESP_FUNCS]
[docs] def get_data(self) -> None: query = { "bool": { "must": [{"term": {"type": "modbus"}}], "filter": [{"terms": {"umas.function_name": self.FUNCS}}], } } body = { "query": query, # Sort in ascending order by modbus transaction ID "sort": [{"mbtcp.transaction_identifier": "asc"}], } self.elastic_data = self._search_es(body)
[docs] def extract_blocks(self) -> None: log.info("Extracting artifact blocks...") ip_groups = defaultdict(list) for packet in self.elastic_data: # Set the device IP if packet["umas"]["direction"].lower() == "request": key = packet["destination"]["ip"] else: key = packet["source"]["ip"] ip_groups[key].append(packet) # Extract artifacts from groups artifact_buckets = defaultdict(list) for ip, group_pkts in ip_groups.items(): log.info(f"Processing {len(group_pkts)} packets for {ip}") # Bucket artifacts by start and end block to handle # multiple artifacts from the same device start_locations = [ loc for loc, pkt in enumerate(group_pkts) if pkt["umas"]["function_name"] in self.START_FUNCS ] stop_locations = [ loc for loc, pkt in enumerate(group_pkts) if pkt["umas"]["function_name"] in self.END_FUNCS ] log.debug( f"[{ip}] {len(start_locations)} start locations, " f"{len(stop_locations)} stop locations" ) if len(start_locations) != len(stop_locations): log.error( f"[{ip}] Lengths of start locations and stop locations " f"don't match. This indicates either a start or stop " f"packet for the upload/download is missing. Skipping all " f"packets for {ip} ({len(group_pkts)} packets)." ) continue for start_loc, stop_loc in zip(start_locations, stop_locations, strict=False): # bucket[0]: INITIALIZE_DOWNLOAD or INITIALIZE_UPLOAD # bucket[-1]: END_DOWNLOAD or END_UPLOAD (hence 'stop_loc + 1') bkt_iter = itertools.islice(group_pkts, start_loc, stop_loc + 1) bucket = list(bkt_iter) if not bucket: log.warning( f"Empty artifact packets for {ip}. start: {start_loc}, stop: {stop_loc}" ) continue artifact_buckets[ip].append(bucket) # Print debugging info log.trace(f"[{ip}] Bucket size: {len(bucket)}") if config.DEBUG >= 2: for bkt in bucket: log.trace2( f"\t({bkt['@timestamp']}) " f"{bkt['mbtcp']['transaction_identifier']} - " f"{bkt['umas']['function_name']}" ) log.info(f"Done with {len(ip_groups)} IP groups") for ip, artifact_bucket in artifact_buckets.items(): for packets in artifact_bucket: # The partially constructed artifact + metadata # TODO: begin constructing artifact in a earlier for loop? # Don't bucket by IP. Instead, bucket by artifact r_pkt = next( (p for p in packets if p["umas"]["function_name"] in self.RESP_FUNCS), None, ) if not r_pkt: log.warning(f"No response packets found for {ip}") continue start_time = utils.parse_date(packets[0]["@timestamp"]) end_time = utils.parse_date(packets[-1]["@timestamp"]) artifact = HeatArtifact( packets=packets, device_ip=r_pkt["source"]["ip"], device_mac=r_pkt["source"].get("mac", ""), device_oui=_cleanup_oui(r_pkt["source"].get("vendor", "")), station_ip=r_pkt["destination"]["ip"], station_mac=r_pkt["destination"].get("mac", ""), station_oui=_cleanup_oui(r_pkt["destination"].get("vendor", "")), start_time=start_time, end_time=end_time, duration=(end_time - start_time).total_seconds(), ) # TODO: cleanup/deduplicate this logic # TODO: add vendor_id (manuf) and vendor_name (manuf_long) if artifact.device_mac: dev_vend = mac_to_vendor(artifact.device_mac) if dev_vend: if dev_vend.manuf_long: artifact.device_oui = dev_vend.manuf_long elif dev_vend.manuf and not artifact.device_oui: artifact.device_oui = dev_vend.manuf if artifact.station_mac: station_vend = mac_to_vendor(artifact.station_mac) if station_vend: if station_vend.manuf_long: artifact.station_oui = station_vend.manuf_long elif station_vend.manuf and not artifact.station_oui: artifact.station_oui = station_vend.manuf response_data = {} for pkt in packets: transaction_id = pkt["mbtcp"]["transaction_identifier"] if pkt["umas"]["function_name"] == "RESPONSE_ERROR": log.warning( f"[{ip}] RESPONSE_ERROR (transaction " f"ID: {transaction_id}, timestamp: " f"{pkt['@timestamp']})" ) # Extract data from response oks elif pkt["umas"]["function_name"] == "RESPONSE_OK": response_data[transaction_id] = pkt["umas"].get("data", "") # Lookup to associate data in the download block from the response for pkt in packets: block_id: int | None = pkt["umas"].get("block_id") func: str = pkt["umas"]["function_name"] if func in self.END_FUNCS: artifact.expected_blocks = int(pkt["umas"]["blocks_transferred"]) continue # Skip blocks without data (e.g. INITIALIZE_DOWNLOAD) if func not in self.BLOCK_FUNCS: continue # Check for null/non-positive block IDs (should never happen) # NOTE: block IDs always start at 1, not 0 if not block_id: log.error(f"[{artifact.id}] Bad {func} ID: {block_id}") continue # Set transfer type for the artifact (DOWNLOAD or UPLOAD) if not artifact.direction: artifact.direction = func.split("_", maxsplit=1)[0] # Prevent duplicate blocks. block_id = int(block_id) if block_id in artifact.block_ids: # The first block of a upload is sent twice, and # I believe this can happen with a download as well. # So, don't warn if it's the first block ID. if block_id != 1: log.warning( f"[{artifact.id}] Duplicate block ID for {func}: {block_id}" ) continue artifact.block_ids.add(block_id) if func == "DOWNLOAD_BLOCK": # !! hack to exclude first 4 bytes (zero pad + len) !! data = response_data[pkt["mbtcp"]["transaction_identifier"]].replace( ":", "" ) if len(data) >= 8: data = data[8:] # 4 bytes => 8 nibbles else: # UPLOAD_BLOCK data = pkt["umas"]["data"].replace(":", "") block = { "block_id": block_id, "data": data, } artifact.blocks.append(block) log.trace( f"[{artifact.id}] {artifact.direction}: expected " f"{artifact.expected_blocks} blocks, got " f"{len(artifact.blocks)} blocks", ) log.trace2( f"[{artifact.id}] Block IDs for {artifact.direction}: {artifact.block_ids}", ) # Verify the number of blocks transferred matches the # amount of blocks expected, as indicated in the # END_DOWNLOAD or END_UPLOAD packet. if not artifact.expected_blocks: log.warning( f"[{artifact.id}] Unable to find END_DOWNLOAD or " f"END_UPLOAD, expected number of blocks is unknown" ) elif artifact.expected_blocks != len(artifact.blocks): log.warning( f"[{artifact.id}] Number of blocks extracted does not " f"match the number of blocks expected from the END " f"packet! Got {len(artifact.blocks)}, expected " f"{artifact.expected_blocks}" ) # Sort by block ID artifact.blocks.sort(key=lambda x: x["block_id"]) self.artifacts.append(artifact) log.info("Finished artifact block extraction for all devices")
[docs] def assemble_artifacts(self) -> None: log.info(f"Assembling {len(self.artifacts)} artifacts...") for artifact in self.artifacts: chunks = bytearray() for block in artifact.blocks: chunks.extend(binascii.unhexlify(block["data"])) artifact.reconstructed_artifact = bytes(chunks) start = artifact.start_time.strftime("%Y-%m-%d_%H-%M-%S") artifact.file_name = ( f"{artifact.device_ip}_{artifact.station_ip}_" f"{artifact.direction}_{start}+{int(artifact.duration)}.apx" )
[docs] def export_artifacts(self) -> None: log.info(f"Exporting {len(self.artifacts)} artifacts...") for artifact in self.artifacts: log.info(f"[{artifact.id} Exporting to {artifact.file_name}") artifact.file_path = config.HEAT_ARTIFACTS_DIR / artifact.file_name utils.write_file( artifact.reconstructed_artifact, artifact.file_path, overwrite_existing=False, )
[docs] def parse_artifacts(self) -> None: log.info(f"Parsing {len(self.artifacts)} artifacts using PEAT...") # Don't lookup IPs in host's DNS, pointless and can leak information config.RESOLVE_HOSTNAME = False config.RESOLVE_IP = False config.RESOLVE_MAC = False for artifact in self.artifacts: self._parse_artifact(artifact)
def _parse_artifact(self, artifact: HeatArtifact) -> None: log.info(f"Parsing artifact {artifact.id}") # Device (the PLC) # NOTE: it's possible to have multiple project files for same IP! # Therefore, we use .add() instead of .get() to avoid annotating # the same DeviceData object. dev = datastore.create(artifact.device_ip, "ip") dev._is_verified = True device_iface = Interface( type="ethernet", mac=artifact.device_mac, ip=artifact.device_ip, ) dev.store("interface", device_iface) dev.populate_fields() mb_svc = Service( port=502, # TODO: add a "application" field to service model for "umas"? protocol="modbus_tcp", transport="tcp", status="verified", ) dev.store("service", mb_svc, interface_lookup={"ip": artifact.device_ip}) dev.populate_fields() # Do this so m340 isn't required for basic artifact extraction from peat import M340 try: if config.HEAT_ARTIFACTS_DIR and artifact.file_path: M340.parse(to_parse=artifact.file_path, dev=dev) else: M340.parse(to_parse=artifact.reconstructed_artifact, dev=dev) except Exception: log.exception( f"[{artifact.id}] Failed to parse artifact due to an unhandled exception" ) state.error = True dev.related.ip.add(artifact.station_ip) if dev.logic.author: dev.related.user.add(dev.logic.author) M340.update_dev(dev) # The Station which programs the device, usually Unity Pro (or PEAT) # Generally a Engineering Workstation or a SCADA system # TODO: merge data for station (use datastore.get()), # since it's likely the same device? station = datastore.create(artifact.station_ip, "ip") station_iface = Interface( type="ethernet", mac=artifact.station_mac, ip=artifact.station_ip, ) station.store("interface", station_iface) # TODO: set station vendor ID to the short manuf string # (e.g. "Dell" instead of "Dell, Inc.") station.description.vendor.name = artifact.station_oui uv = dev.extra.get("project_file_metadata", {}).get("unity_version", "") if not uv: uv = " or similar software" station.description.description = ( f"Host that programmed the device at {artifact.device_ip}. " f"Likely a engineering workstation or SCADA server running " f"Unity Pro {uv}." ) station.type = "PC" station.related.ip.add(artifact.device_ip) if dev.logic.author: station.related.user.add(dev.logic.author) station.populate_fields() if config.DEVICE_DIR: station.export_to_files() # TODO: "heat_results" file with all results, keyed by file? # Export parsed data to Elasticsearch if state.elastic: dev.export_to_elastic() station.export_to_elastic() # Generate OpenPLC project for every unique device IP if config.HEAT_ARTIFACTS_DIR: if not dev.logic.formats.get("tc6"): log.warning( f"[{artifact.id}] No TC6 was generated, skipping generation of OpenPLC project" ) else: dir_name = ( f"openplc-project_{dev.logic.name.replace(' ', '-')}_" f"{dev.ip}_{artifact.end_time.timestamp()}" ) dir_path = config.HEAT_ARTIFACTS_DIR / dir_name dev.options["m340"]["generate_openplc_project"] = dir_path proj_path = M340.generate_openplc_project(dev) if not proj_path: log.error(f"[{artifact.id}] Failed OpenPLC project generation")
def _cleanup_oui(mac: str): # Older tshark (3.0.14) makes vendor names like "HewlettP_00:00:00" # Modern tshark (3.2.3) is like "Hewlett Packard" if mac.count("_") == 1 and mac.count(":") == 2: return mac.split("_", maxsplit=1)[0] return mac __all__ = ["UmasExtractor"]