Source code for peat.heat.telnet_extractor
"""
HEAT protocol extractor for SEL relay Telnet protocol.
Telnet packets in Elasticsearch will have the 'type' field set to 'telnet'.
TCP packets in Elasticsearch will have the 'type' field set to 'TCP'.
Authors
- Walter Weiffenbach
"""
import re
from peat import Interface, config, datastore, log, state, utils
from .heat_classes import HeatArtifact, HeatProtocol, TelnetHeatArtifact
def _cleanup_oui(mac: str) -> str:
# Older tshark (3.0.14) makes vendor names like "HewlettP_c0:b9:20"
# Modern tshark (3.2.3) is like "Hewlett Packard"
if mac.count("_") == 1 and mac.count(":") == 2:
return mac.split("_", maxsplit=1)[0]
return mac
[docs]
class TelnetExtractor(HeatProtocol):
"""
HEAT protocol extractor for SEL relay Telnet protocol.
- Step 1: get packet data for each telnet stream
- Step 2: reconstruct each TCP data stream for each file
- Step 3: identify commands to indicate the start of a file transfer
- Step 4: reconstruct each artifact from those byte streams
- Step 5: parse the artifact
"""
# CRC table for YMODEM taken from rbsb.c
# source: https://stuff.mit.edu/afs/sipb/user/paradis/zmodem/rbsb.c
# table calculated by Mark G. Mendel, Network Systems Corporation
crctab = [
0x0000,
0x1021,
0x2042,
0x3063,
0x4084,
0x50A5,
0x60C6,
0x70E7,
0x8108,
0x9129,
0xA14A,
0xB16B,
0xC18C,
0xD1AD,
0xE1CE,
0xF1EF,
0x1231,
0x0210,
0x3273,
0x2252,
0x52B5,
0x4294,
0x72F7,
0x62D6,
0x9339,
0x8318,
0xB37B,
0xA35A,
0xD3BD,
0xC39C,
0xF3FF,
0xE3DE,
0x2462,
0x3443,
0x0420,
0x1401,
0x64E6,
0x74C7,
0x44A4,
0x5485,
0xA56A,
0xB54B,
0x8528,
0x9509,
0xE5EE,
0xF5CF,
0xC5AC,
0xD58D,
0x3653,
0x2672,
0x1611,
0x0630,
0x76D7,
0x66F6,
0x5695,
0x46B4,
0xB75B,
0xA77A,
0x9719,
0x8738,
0xF7DF,
0xE7FE,
0xD79D,
0xC7BC,
0x48C4,
0x58E5,
0x6886,
0x78A7,
0x0840,
0x1861,
0x2802,
0x3823,
0xC9CC,
0xD9ED,
0xE98E,
0xF9AF,
0x8948,
0x9969,
0xA90A,
0xB92B,
0x5AF5,
0x4AD4,
0x7AB7,
0x6A96,
0x1A71,
0x0A50,
0x3A33,
0x2A12,
0xDBFD,
0xCBDC,
0xFBBF,
0xEB9E,
0x9B79,
0x8B58,
0xBB3B,
0xAB1A,
0x6CA6,
0x7C87,
0x4CE4,
0x5CC5,
0x2C22,
0x3C03,
0x0C60,
0x1C41,
0xEDAE,
0xFD8F,
0xCDEC,
0xDDCD,
0xAD2A,
0xBD0B,
0x8D68,
0x9D49,
0x7E97,
0x6EB6,
0x5ED5,
0x4EF4,
0x3E13,
0x2E32,
0x1E51,
0x0E70,
0xFF9F,
0xEFBE,
0xDFDD,
0xCFFC,
0xBF1B,
0xAF3A,
0x9F59,
0x8F78,
0x9188,
0x81A9,
0xB1CA,
0xA1EB,
0xD10C,
0xC12D,
0xF14E,
0xE16F,
0x1080,
0x00A1,
0x30C2,
0x20E3,
0x5004,
0x4025,
0x7046,
0x6067,
0x83B9,
0x9398,
0xA3FB,
0xB3DA,
0xC33D,
0xD31C,
0xE37F,
0xF35E,
0x02B1,
0x1290,
0x22F3,
0x32D2,
0x4235,
0x5214,
0x6277,
0x7256,
0xB5EA,
0xA5CB,
0x95A8,
0x8589,
0xF56E,
0xE54F,
0xD52C,
0xC50D,
0x34E2,
0x24C3,
0x14A0,
0x0481,
0x7466,
0x6447,
0x5424,
0x4405,
0xA7DB,
0xB7FA,
0x8799,
0x97B8,
0xE75F,
0xF77E,
0xC71D,
0xD73C,
0x26D3,
0x36F2,
0x0691,
0x16B0,
0x6657,
0x7676,
0x4615,
0x5634,
0xD94C,
0xC96D,
0xF90E,
0xE92F,
0x99C8,
0x89E9,
0xB98A,
0xA9AB,
0x5844,
0x4865,
0x7806,
0x6827,
0x18C0,
0x08E1,
0x3882,
0x28A3,
0xCB7D,
0xDB5C,
0xEB3F,
0xFB1E,
0x8BF9,
0x9BD8,
0xABBB,
0xBB9A,
0x4A75,
0x5A54,
0x6A37,
0x7A16,
0x0AF1,
0x1AD0,
0x2AB3,
0x3A92,
0xFD2E,
0xED0F,
0xDD6C,
0xCD4D,
0xBDAA,
0xAD8B,
0x9DE8,
0x8DC9,
0x7C26,
0x6C07,
0x5C64,
0x4C45,
0x3CA2,
0x2C83,
0x1CE0,
0x0CC1,
0xEF1F,
0xFF3E,
0xCF5D,
0xDF7C,
0xAF9B,
0xBFBA,
0x8FD9,
0x9FF8,
0x6E17,
0x7E36,
0x4E55,
0x5E74,
0x2E93,
0x3EB2,
0x0ED1,
0x1EF0,
]
[docs]
def get_data(self) -> None:
# get packet data
# build query to get aggregate data
sources = {"terms": {"field": "source.ip"}}
streams = {
"terms": {"field": "tcp.stream"},
"aggs": {"sources": sources},
}
telnet = {"terms": {"field": "type"}, "aggs": {"streams": streams}}
pcaps = {"terms": {"field": "pcap"}, "aggs": {"telnet": telnet}}
aggs = {"pcaps": pcaps}
body = {"size": 0, "aggs": aggs}
search_args = {"size": 0, "index": "packets-*"}
search_args["body"] = body
# get aggregate data to build requests for each packet stream
aggregates = self.es_obj.raw_search(search_args)
pcaps = {}
num_requests = 0
# build pcaps structure for ease of use later
for pcap in aggregates["aggregations"]["pcaps"]["buckets"]:
for protocol in pcap["telnet"]["buckets"]:
if protocol["key"] == "telnet":
streams = {}
for stream in protocol["streams"]["buckets"]:
sources = []
for source in stream["sources"]["buckets"]:
sources.insert(0, source["key"])
num_requests += 1
streams[stream["key"]] = sources
pcaps[pcap["key"]] = streams
log.info(f"Found {num_requests} TELNET streams in Elasticsearch database")
# dict of packet data for different telnet streams
data = {}
i = 0
# build requests to get the packet data for each telnet stream
for pcap in pcaps:
for stream in pcaps[pcap]:
for source in pcaps[pcap][stream]:
query = {
"bool": {
"must": [
{"term": {"pcap": pcap}},
{"term": {"type": "telnet"}},
{"term": {"tcp.stream": stream}},
{"term": {"source.ip": source}},
{"exists": {"field": "telnet.data_raw"}},
]
}
}
body = {
"query": query,
"sort": [{"tcp.sequence": {"unmapped_type": "long"}}],
}
log.info(
f"Searching for TELNET Stream {i} - PCAP: {pcap}, "
f"TCP Stream: {stream}, Source IP: {source}"
)
# get packet data for telnet stream i
data[i] = self._search_es(body)
# check if query failed and try again
if len(data[i]) == 0:
log.warning("Query failed - trying again")
del data[i]
log.info(
f"Searching for TELNET Stream {i} - PCAP: {pcap}, "
f"TCP Stream: {stream}, Source IP: {source}"
)
# get packet data for telnet stream i
data[i] = self._search_es(body)
if len(data[i]) == 0:
log.error(f"Elasticsearch query failed: {body}")
del data[i]
i += 1
self.elastic_data = data
[docs]
def get_list_text(self, data: list | str) -> bytearray:
text = bytearray()
if isinstance(data, list):
for string in data:
text += bytearray.fromhex(string)
else:
text += bytearray.fromhex(data)
return text
def _getStream(self, i: int) -> bytearray:
stream = bytearray()
# first tcp sequence number for the stream
next_packet = int(self.elastic_data[i][0]["tcp"]["seq"])
# read data for each packet into stream variable
for p in range(len(self.elastic_data[i])):
packet = self.elastic_data[i][p]
if int(packet["tcp"]["seq"]) == next_packet:
try:
# read packet hex data as bytes from data_raw list
stream += self.get_list_text(packet["telnet"]["data_raw"])
except ValueError as e:
if "data_raw" in packet["telnet"]:
log.error(
f"Failed to read packet from pcap {packet['pcap']} - "
f"data:\tsrc: {packet['source']['ip']}\t"
f"TCP stream/sequence number: "
f"{packet['tcp']['stream']}/{packet['tcp']['seq']}"
)
else:
raise e
except KeyError:
if "data_raw" in packet["telnet"] and "data" not in packet["telnet"]:
log.error("Telnet data not in hex format")
else:
pass
except Exception as e:
raise e
next_packet = int(packet["tcp"]["nxtseq"])
return stream
def _genArtifact(
self,
file_name: str,
fileData: bytearray,
selcommand: str,
startoffset: int,
stopoffset: int,
filedirection: str,
streamid: int,
) -> None:
artifact = TelnetHeatArtifact(
packets=self.elastic_data[streamid],
source_ip=self.elastic_data[streamid][0]["source"]["ip"],
source_mac=self.elastic_data[streamid][0]["source"].get("mac", ""),
source_oui=_cleanup_oui(self.elastic_data[streamid][0]["source"].get("vendor", "")),
dest_ip=self.elastic_data[streamid][0]["destination"]["ip"],
dest_mac=self.elastic_data[streamid][0]["destination"].get("mac", ""),
dest_oui=_cleanup_oui(self.elastic_data[streamid][0]["destination"].get("vendor", "")),
start_time=self.start_times[streamid],
end_time=self.end_times[streamid],
duration=(self.start_times[streamid] - self.end_times[streamid]).total_seconds(),
direction=filedirection,
bytestream=self.bytestreams[streamid],
start=startoffset,
stop=stopoffset,
command=selcommand,
reconstructed_artifact=fileData.decode("ascii"),
artifact_file_name=file_name,
)
if filedirection == "UPLOAD":
artifact.device_ip = artifact.dest_ip
artifact.device_mac = artifact.dest_mac
artifact.device_oui = artifact.dest_oui
artifact.station_ip = artifact.source_ip
artifact.station_mac = artifact.source_mac
artifact.station_oui = artifact.source_oui
else:
artifact.device_ip = artifact.source_ip
artifact.device_mac = artifact.source_mac
artifact.device_oui = artifact.source_oui
artifact.station_ip = artifact.dest_ip
artifact.station_mac = artifact.dest_mac
artifact.station_oui = artifact.dest_oui
self.artifacts.append(artifact)
[docs]
def ymodem_crc(self, data: bytearray) -> int:
"""
CRC calculation derived from prior work by Stephen Satchell, Satchell
Evaluations and Chuck Forsberg, Omen Technology
especially Forsberg's 1988 article XMODEM/YMODEM PROTOCOL REFERENCE: A compendium of
documents describing the XMODEM and YMODEM File Transfer Protocols
and Satchell's 1986 article regarding updcrc.
Effectively calculates the CRC for a data block iteratively using the macro
``#define updcrc(cp, crc) ( crctab[((crc >> 8) & 255)] ^ (crc << 8) ^ cp)``
from "rbsb.c"
"""
crc = 0
for cp in data:
# implements crc = updcrc(cp, crc)
cc = 0xFF & cp
tmp = (crc >> 8) ^ cc
crc = (crc << 8) ^ self.crctab[tmp & 0xFF]
crc = crc & 0xFFFF
return crc
[docs]
def parseStream(self, stream: bytearray, streamid: int) -> None:
fileReadRegex = re.compile(r"File? Read [!-~]*\.[a-zA-Z]{3,4}\r\n")
fileWriteRegex = re.compile(r"File? Write [!-~]*\.[a-zA-Z]{3,4}\r\x01")
direction = "UNKNOWN"
mode = "TELNET"
selcommand = ""
i = 0
# operates in two modes: TELNET SEL Ascii and YMODEM file transfers
while i < len(stream):
# Telnet mode
if mode == "TELNET":
command = ""
j = 0
# search for telnet commands
# This is slightly broken right now because \r\n is not the ending for echoed
# commands, which we need to detect uploads
# fortunately the regex helps address this for UPLOADS in
# the if/else block below by resetting the index appropriately
while command[-2:] != "\r\n":
if i + j >= len(stream):
break
try:
command += bytes([stream[i + j]]).decode()
except UnicodeDecodeError:
command += "\ufffd"
j += 1
i += j
# if it is a file transfer command
readMatch = fileReadRegex.search(command)
writeMatch = fileWriteRegex.search(command)
# bad xor, but having both match on one command is undefined behavior
if readMatch or (writeMatch and not (readMatch and writeMatch)):
if readMatch and not writeMatch:
direction = "DOWNLOAD"
selcommand = command[readMatch.span()[0] : readMatch.span()[1]]
else:
direction = "UPLOAD"
selcommand = command[writeMatch.span()[0] : writeMatch.span()[1]]
i -= j
i += writeMatch.span()[1]
# reset index to be at the carriage return so that YMODEM
# starts processing from the SOH character
i -= 2
# change modes to YMODEM
mode = "YMODEM"
# YMODEM MODE
elif mode == "YMODEM":
"""
YMODEM Protocol Spec:
Each block begins with <SOH> (or <STX>, which is SEL custom, I think),
afterwhich is the block ID and it's 1's complement
Then there is either 128 bytes or 1024 bytes of data (it can dynamically switch
between them, <STX> prefixes 1024 byte blocks and <SOH> does 128)
Afterwards, there is 1 or 2 error detection bytes. CRC 16 uses 2 bytes.
Arithmetic checksum uses 1 byte.
There is negotiation over whether to use CRC or not (the <C> character), this
implementation only supports CRC because that is
what SEL uses by default, though it is possible the negotiation could fail and
this parser would be invalid, though this is
extremely unlikely on modern systems
CRC Option:
Taken directly from Chuck Forsberg's XMODEM/YMODEM PROTOCOL REFERENCE
http://pauillac.inria.fr/~doligez/zmodem/ymodem.txt
SENDER RECEIVER
<--- <C>
<soh> 01 FE -data- <xxxx> --->
<--- <ack>
<soh> 02 FD -data- <xxxx> ---> (data gets line hit)
<--- <nak>
<soh> 02 FD -data- <xxxx> --->
<--- <ack>
<soh> 03 FC -data- <xxxx> --->
(ack gets garbaged) <--- <ack>
times out after 10 seconds,
<--- <nak>
<soh> 03 FC -data- <xxxx> --->
<--- <ack>
<eot> --->
<--- <ack>
Sender does not support CRC, uses arithmetic checksum:
SENDER RECEIVER
<--- <C>
times out after 3 seconds,
<--- <C>
times out after 3 seconds,
<--- <C>
times out after 3 seconds,
<--- <C>
times out after 3 seconds,
<--- <nak>
<soh> 01 FE -data- <xx> --->
<--- <ack>
<soh> 02 FD -data- <xx> ---> (data gets line hit)
<--- <nak>
<soh> 02 FD -data- <xx> --->
<--- <ack>
<soh> 03 FC -data- <xx> --->
(ack gets garbaged) <--- <ack>
times out after 10 seconds,
<--- <nak>
<soh> 03 FC -data- <xx> --->
<--- <ack>
<eot> --->
<--- <ack>
"""
if direction == "DOWNLOAD":
# ignore "#000 Ready to send file"
i += 25
# SEL by default tries to use CRC option YMODEM
# TODO: support fallback to default YMODEM and arithmetic checksum
fileData = bytearray()
dataBlocks = {}
numReadBlocks = b"\x00"
startoffset = i
# read all blocks
while mode == "YMODEM":
# block starts with <SOH>=b'\x01' for 128 bytes
# but <STX>=b'\x02' for 1024 bytes
# this is hypothesized since it is nonstandard
blockLength = bytes([stream[i]])
if blockLength not in {b"\x01", b"\x02"}:
log.warning(
f"Block header does not match spec - "
f"skipping file with command {selcommand}"
)
mode = "TELNET"
break
# read block
blockID = bytes([stream[i + 1]])
# 0xff is encoded as 0xffff by SEL's ascii protocol for command specification
# so it has no bearing on the YMODEM transfer
if blockID == b"\xff":
i += 1
block1sComp = bytes([stream[i + 2]])
# 0xff is encoded as 0xffff by SEL's ascii protocol for command specification
# so it has no bearing on the YMODEM transfer
if block1sComp == b"\xff":
i += 1
i += 3
if int(block1sComp.hex(), 16) != 255 - int(blockID.hex(), 16):
log.warning(
f"Block ID does not match 1's Complement - "
f"BlockID: 0x{blockID.hex()}"
f"\t 1sComplement: 0x{block1sComp.hex()}"
)
if blockID.hex() != numReadBlocks.hex():
log.warning(
f"Block ID is out of order - Expected 0x{numReadBlocks.hex()} "
f"but got 0x{blockID.hex()} instead"
)
data = bytearray()
# since the start byte varries by length
# (but not in YMODEM spec...) we know how many bytes to read
size = 128
if blockLength == b"\x02":
size = 1024
# read data bytes
for j in range(size):
data.append(stream[i + j])
i += j + 1
# after reading 128 bytes or 1024 bytes, get the CRC
crc = 0
crc += stream[i] << 8
# 0xff is encoded as 0xffff by SEL's ascii protocol for command specification
# so it has no bearing on the YMODEM transfer
if bytes([stream[i]]) == b"\xff":
i += 1
crc += stream[i + 1]
# 0xff is encoded as 0xffff by SEL's ascii protocol for command specification
# so it has no bearing on the YMODEM transfer
if bytes([stream[i + 1]]) == b"\xff":
i += 1
i += 2
# verify CRC is correct
calc_crc = self.ymodem_crc(data)
if calc_crc != crc:
log.warning(
f"CRC16 obtained from transmissiom ({crc}) does not "
f"match calculated CRC16 ({calc_crc}) on YMODEM "
f"chunk {blockID}. Expecting next chunk to resend this chunk..."
)
continue
endByte = bytes([stream[i]])
dataBlocks[blockID] = data
numReadBlocks = bytes([int(numReadBlocks.hex(), 16) + 1])
# if we are at the end of transmission
if endByte == b"\x04":
fileName = ""
mode = "TELNET"
# assemble the file
for blk_id, b_data in dataBlocks:
if blk_id == b"\x00":
for c in range(len(b_data)):
if bytes([b_data[c]]) == b"\x00":
break
fileName += bytes([b_data[c]]).decode("ascii")
else:
fileData.extend(b_data.rstrip(b"\x00").rstrip(b"\x1a"))
dataBlocks = {} # reset dict
# generate the artifact
self._genArtifact(
fileName,
fileData,
selcommand,
startoffset,
i,
direction,
streamid,
)
i += 1
[docs]
def extract_blocks(self) -> None:
"""extract bytes into ``self.artifacts``."""
# extract byte stream from telnet packet data
self.bytestreams = dict(bytearray())
self.blocks = {}
self.command = {}
self.direction = {}
self.files = {}
self.start_times = {}
self.end_times = {}
# for each telnet stream
for i in range(len(self.elastic_data)):
self.start_times[i] = utils.parse_date(self.elastic_data[i][0]["@timestamp"], False)
self.end_times[i] = utils.parse_date(self.elastic_data[i][-1]["@timestamp"], False)
# build byte stream from packet data
stream = self._getStream(i)
# save byte data
self.bytestreams[i] = stream # .encode()
before = len(self.artifacts)
self.parseStream(self.bytestreams[i], i)
log.info(f"Found {len(self.artifacts) - before} artifacts in stream {i}")
[docs]
def assemble_artifacts(self) -> None:
for artifact in self.artifacts:
start = artifact.start_time.strftime("%Y-%m-%d_%H-%M-%S")
artifact.file_name = (
f"{artifact.source_ip}_{artifact.dest_ip}_"
f"{artifact.direction}_{artifact.artifact_file_name}_"
f"{start}+{int(artifact.duration)}_"
f"[{artifact.start}:{artifact.stop}] \
{artifact.artifact_file_name[artifact.artifact_file_name.index('.') :]}"
)
artifact.file_name = str.replace(artifact.file_name, ":", "_")
[docs]
def export_artifacts(self) -> None:
log.info(f"Exporting {len(self.artifacts)} artifacts...")
for artifact in self.artifacts:
log.info(f"[{artifact.id} Exporting to {artifact.file_name}")
artifact.file_path = config.HEAT_ARTIFACTS_DIR / artifact.file_name
utils.write_file(
artifact.reconstructed_artifact,
artifact.file_path,
overwrite_existing=False,
)
[docs]
def parse_artifacts(self) -> None:
log.info(f"Parsing {len(self.artifacts)} artifacts using PEAT...")
# Don't lookup IPs in host's DNS, pointless and can leak information
config.RESOLVE_HOSTNAME = False
config.RESOLVE_IP = False
config.RESOLVE_MAC = False
txtRegex = re.compile(r".*\.(txt|TXT)")
for artifact in self.artifacts:
if txtRegex.search(artifact.artifact_file_name):
self._parse_artifact(artifact)
else:
ext = artifact.artifact_file_name[artifact.artifact_file_name.index(".") :]
log.warning(
f"Unable to parse file with unsupported extension "
f"{ext}: {artifact.artifact_file_name}"
)
def _parse_artifact(self, artifact: HeatArtifact) -> None:
log.info(f"Parsing artifact {artifact.id}")
# Device (the relay)
# NOTE: it's possible to have multiple project files for same IP!
# Therefore, we use .add() instead of .get() to avoid annotating
# the same DataManager object.
dev = datastore.create(artifact.device_ip, "ip")
dev._is_verified = True
device_iface = Interface(
type="ethernet",
mac=artifact.device_mac,
ip=artifact.device_ip,
)
dev.store("interface", device_iface)
dev.populate_fields()
# Do this so SELRelay isn't required for basic artifact extraction
from peat import SELRelay
try:
if config.HEAT_ARTIFACTS_DIR and artifact.file_path:
SELRelay.parse(to_parse=artifact.file_path, dev=dev)
else:
SELRelay.parse(to_parse=artifact.reconstructed_artifact, dev=dev)
except Exception:
log.exception(
f"[{artifact.id}] Failed to parse artifact due to an unhandled exception"
)
state.error = True
dev.related.ip.add(artifact.station_ip)
if dev.logic.author:
dev.related.user.add(dev.logic.author)
SELRelay.update_dev(dev)
# The Station which programs the device
# Generally a Engineering Workstation or a SCADA system
# TODO: merge data for station (use datastore.get()),
# since it's likely the same device?
station = datastore.create(artifact.station_ip, "ip")
station_iface = Interface(
type="ethernet",
mac=artifact.station_mac,
ip=artifact.station_ip,
)
station.store("interface", station_iface)
# TODO: set station vendor ID to the short manuf string
# (e.g. "Dell" instead of "Dell, Inc.")
station.description.vendor.name = artifact.station_oui
station.description.description = (
f"Host that programmed the device at {artifact.device_ip}. "
f"Likely a engineering workstation or SCADA server."
)
station.type = "PC"
station.related.ip.add(artifact.device_ip)
if dev.logic.author:
station.related.user.add(dev.logic.author)
station.populate_fields()
if config.DEVICE_DIR:
station.export_to_files()
# TODO: "heat_results" file with all results, keyed by file?
# Export parsed data to Elasticsearch
if state.elastic:
dev.export_to_elastic()
station.export_to_elastic()