Source code for peat.heat.ftp_extractor
"""
HEAT protocol extractor for the FTP protocol for SEL devices.
Authors
- Walter Weiffenbach
- Christopher Goes
"""
import json
import os
import re
import shutil
import subprocess
from datetime import datetime
from pathlib import Path
from peat import Elastic, Interface, config, datastore, log, state, utils
from peat.modules.sel.relay_parse import (
parse_cser,
parse_ser,
process_events,
process_info_into_dev,
)
from .heat_classes import FTPHeatArtifact, HeatArtifact, HeatProtocol
[docs]
class FTPExtractor(HeatProtocol):
"""
HEAT protocol extractor for the FTP protocol for SEL devices.
"""
def __init__(self, es_obj: Elastic):
self.logdirs = []
self.pcap_filenames = []
# find zeek binary, and if not found, fallback
# to the hardcoded path.
self.zeek_bin_path = shutil.which("zeek")
if not self.zeek_bin_path:
self.zeek_bin_path = "/opt/zeek/bin/zeek"
super().__init__(es_obj)
[docs]
def get_log_dir(self, f: str) -> str:
"""get directory path for a pcap's zeek logs."""
return f"{config.ZEEK_LOGDIR}/zeek_{f.replace('.', '_')}"
[docs]
def get_data(self) -> None:
if not config.NO_RUN_ZEEK:
# get pcap directory from argument
pcaps_dir = config.PCAPS
if not pcaps_dir:
log.warning("No PCAP directory specified (--pcaps), defaulting to './pcaps'")
pcaps_dir = Path("./pcaps").resolve()
if not pcaps_dir.is_dir():
log.error(
f"PCAPS directory {pcaps_dir} doesn't exist or isn't a directory. Aborting..."
)
state.error = True
return
pcap_files = os.listdir(pcaps_dir)
if not pcap_files:
log.error(f"PCAP folder '{pcaps_dir}' is empty. Aborting...")
state.error = True
return
# get pcaps
log.info(f"Reading pcap files from {pcaps_dir}")
for f in pcap_files:
if ".pcap" in f:
# store the pcap information
# we have to use elastic_data here to the heat_protocol
# superclass does not cause execution to halt.
self.elastic_data.insert(0, str(pcaps_dir) + "/" + f)
self.pcap_filenames.insert(0, f)
self.logdirs.insert(0, self.get_log_dir(f))
log.info(f"Extracting artifacts from {len(self.elastic_data)} PCAP files")
# run zeek to get data logs
# clean zeek log dir
if not config.ZEEK_LOGDIR.exists():
config.ZEEK_LOGDIR.mkdir()
# for each pcap, process it
for i in range(len(self.elastic_data)):
# if the zeek data already exists, don't run zeek because it is expensive
if os.path.exists(self.logdirs[i]):
log.debug(f"Zeek already ran for {self.logdirs[i]}. Skipping...")
continue
# self.elastic_data[i] is the absolute path to the PCAP file
log.info(f"Running Zeek on {self.elastic_data[i]}")
# init the logdir for this pcap
logdir = self.logdirs[i]
os.mkdir(logdir)
# TODO: copy the PCAP file that was parsed to results
# execute zeek
# the zeek script configures file extraction
zeek_result = subprocess.run(
args=[
self.zeek_bin_path,
"-C",
"-r",
self.elastic_data[i],
f"default_logdir={logdir}",
"LogAscii::use_json=T",
"/PEAT/peat/heat/ftp_extractor_zeek_script.zeek",
],
check=False,
capture_output=True,
)
# if the command failed, skip this pcap and error
if zeek_result.returncode != 0:
state.error = True
log.error(
f"Unable to execute Zeek for {self.elastic_data[i]} "
f"(exit code: {zeek_result.returncode})"
)
log.debug(f"zeek stdout: {zeek_result.stdout}")
log.debug(f"zeek stderr: {zeek_result.stderr}")
continue
# if files were extracted, move them into the log dir
if os.path.exists("extract_files"):
shutil.move("extract_files", f"{logdir}/extract_files/")
try:
shutil.move("conn.log", logdir)
shutil.move("files.log", logdir)
shutil.move("ftp.log", logdir)
except FileNotFoundError as e:
log.error(f"Unable to move extracted files to correct dir: {e}")
state.error = True
return
else:
log.error(f"No files were extracted from {self.elastic_data[i]}")
# Cleanup empty zeek directories
utils.clean_empty_dirs(config.ZEEK_LOGDIR)
else:
if not config.ZEEK_DIR:
log.error("ZEEK_DIR (--zeek-dir) not specified. Aborting...")
state.error = True
return
if not config.ZEEK_DIR.is_dir():
log.error(f"{config.ZEEK_DIR} doesn't exist or is not a directory. Aborting...")
state.error = True
return
self.elastic_data.insert(0, config.ZEEK_DIR)
log.info(f"Extracting artifacts from {config.ZEEK_DIR} as Zeek directory")
[docs]
def nth_index(self, haystack, needle, n) -> int:
# get the nth index of a needle string in a haystack string
start = haystack.find(needle)
while start >= 0 and n > 1:
start = haystack.find(needle, start + len(needle))
n -= 1
return start
[docs]
def make_json(self, file) -> list | None:
# read a zeek json file into a python array of dicts
if not os.path.exists(file):
log.error("Path does not exist")
return None
log_json = []
with open(file) as log_fp:
for line in log_fp.readlines():
log_json.insert(0, json.loads(line))
return log_json
[docs]
def extract_blocks(self) -> None:
process_dirs = []
if not config.NO_RUN_ZEEK:
for pcap_file in self.pcap_filenames:
process_dirs.insert(0, self.get_log_dir(pcap_file))
else:
process_dirs.insert(0, config.ZEEK_DIR)
prev_num_artifacts = 0
# for each pcap
for logdir in process_dirs:
# init dirs, files, and log data
files_logfile = f"/{logdir}/files.log"
conn_logfile = f"/{logdir}/conn.log"
ftp_logfile = f"/{logdir}/ftp.log"
files_log = self.make_json(files_logfile)
conn_log = self.make_json(conn_logfile)
ftp_log = self.make_json(ftp_logfile)
# if we successfully got the logs
if files_log is not None and conn_log is not None and ftp_log is not None:
# for each file in the file log
for file_log_json in files_log:
conn_json = None
ftp_json = None
# if the file is from FTP
if file_log_json["source"] == "FTP_DATA":
# search for the connection id associated with the
# extracted file and the ftp log entry associated
conn_id = None
for ftp_log_json in ftp_log:
if (
ftp_log_json["command"] == "RETR"
or ftp_log_json["command"] == "STOR"
) and ftp_log_json["fuid"] == file_log_json["fuid"]:
conn_id = ftp_log_json["uid"]
ftp_json = ftp_log_json
break
if conn_id is None:
# we only care about files from STOR and RETR command
# because other extracted files may not be actually
# artifacts we want.
log.debug(
f"No upload or download found for "
f"zeek file {file_log_json['extracted']}. "
f"This may be output from a non STOR or RETR command."
)
continue
# search for the connection log entry by the connection id
for conn_log_json in conn_log:
if conn_log_json["uid"] == conn_id:
conn_json = conn_log_json
break
# move on to next file if conn_log_json is none
if conn_json is None:
break
# assemble the artifact
# TODO: fix OUIs
start_time = datetime.fromtimestamp(conn_json["ts"])
end_time = datetime.fromtimestamp(conn_json["ts"] + conn_json["duration"])
dev_mac = None
stat_mac = None
try:
dev_mac = conn_json["resp_l2_addr"]
stat_mac = conn_json["orig_l2_addr"]
except KeyError:
dev_mac = ""
stat_mac = ""
artifact = FTPHeatArtifact(
device_ip=conn_json["id.resp_h"],
device_mac=dev_mac,
device_oui=_cleanup_oui(dev_mac),
station_ip=conn_json["id.orig_h"],
station_mac=stat_mac,
station_oui=_cleanup_oui(stat_mac),
start_time=start_time,
end_time=end_time,
duration=conn_json["duration"],
artifact_name=ftp_json["arg"][
self.nth_index(ftp_json["arg"], ".", 4) + 2 :
],
zeek_name=f"{logdir}/extract_files/{file_log_json['extracted']}",
)
if ftp_log_json["command"] == "STOR":
artifact.direction = "UPLOAD"
else:
artifact.direction = "DOWNLOAD"
artifact.file_name = (
f"[{artifact.device_ip}_{artifact.station_ip}]_"
f"{artifact.direction}_{str(start_time).replace(' ', '_')}"
f"+{int(artifact.duration)}_"
f"{artifact.artifact_name.replace('/', '_')}"
)
artifact.file_name = str.replace(artifact.file_name, ":", "_")
self.artifacts.insert(0, artifact)
else:
if files_log is None:
log.warning(f"Failed to load files.log from {logdir}")
if conn_log is None:
log.warning(f"Failed to load conn.log from {logdir}")
if ftp_log is None:
log.warning(f"Failed to load ftp.log from {logdir}")
continue
if not config.NO_RUN_ZEEK:
log.info(
f"Found {len(self.artifacts) - prev_num_artifacts} "
f"FTP artifacts in {pcap_file}"
)
else:
log.info(
f"Found {len(self.artifacts) - prev_num_artifacts} "
f"FTP artifacts zeek log {config.ZEEK_DIR}"
)
prev_num_artifacts = len(self.artifacts)
[docs]
def assemble_artifacts(self) -> None:
# since we assemble artifacts in extract_blocks,
# this is unnecessary but must be implemented.
# log.info(f"Assembling {len(self.artifacts)} artifacts...")
return
[docs]
def export_artifacts(self) -> None:
log.info(f"Exporting {len(self.artifacts)} artifacts...")
if not config.HEAT_ARTIFACTS_DIR.exists():
config.HEAT_ARTIFACTS_DIR.mkdir()
# regex to ensure the file is a txt file
txtRegex = re.compile(r".*\.(txt|TXT)")
# for each artifact
for artifact in self.artifacts:
log.info(f"[{artifact.id} Exporting to {artifact.file_name}")
artifact.file_path = config.HEAT_ARTIFACTS_DIR / artifact.file_name
# if the file is a text file, use the utils to write the file
if txtRegex.search(artifact.artifact_name):
with open(artifact.zeek_name) as zeek_file:
content = zeek_file.read()
utils.write_file(content, artifact.file_path, overwrite_existing=False)
else:
# otherwise, use this logic to do an equivalent action for a non-string file
fp = utils.dup_path(artifact.file_path)
if os.path.exists(artifact.file_path):
log.debug(
f"File {artifact.file_path} already exists. Writing to {fp} instead."
)
shutil.copy(artifact.zeek_name, artifact.file_path)
[docs]
def parse_artifacts(self) -> None:
log.info(f"Parsing {len(self.artifacts)} artifacts using PEAT...")
# Don't lookup IPs in host's DNS, pointless and can leak information
config.RESOLVE_HOSTNAME = False
config.RESOLVE_IP = False
config.RESOLVE_MAC = False
txtRegex = re.compile(r"\.(txt|TXT)")
setorserRegex = re.compile(r"(((SET|set)_.*)|(c?ser|C?SER))\.(txt|TXT)")
for artifact in self.artifacts:
if txtRegex.search(artifact.artifact_name):
if setorserRegex.search(artifact.artifact_name):
self._parse_artifact(artifact)
else:
log.debug(f"Unable to parse non-settings artifact {artifact.artifact_name}")
else:
try:
log.debug(
f"Unable to parse artifact with unsupported "
f"extension: {artifact.file_name}"
)
except ValueError:
log.debug(
f"Unable to parse artifact with unknown format: {artifact.file_name}"
)
def _parse_artifact(self, artifact: HeatArtifact) -> None:
log.info(f"Parsing artifact {artifact.file_name}")
# Device (the relay)
# NOTE: it's possible to have multiple project files for same IP!
# Therefore, we use .add() instead of .get() to avoid annotating
# the same DataManager object.
dev = datastore.create(artifact.device_ip, "ip")
dev._is_verified = True
device_iface = Interface(
type="ethernet",
mac=artifact.device_mac,
ip=artifact.device_ip,
)
dev.store("interface", device_iface)
dev.populate_fields()
serRegex = re.compile(r"(ser|SER)\.(txt|TXT)")
cserRegex = re.compile(r"(cser|CSER)\.(TXT|txt)")
# Do this so SELRelay isn't required for basic artifact extraction
from peat import SELRelay
if cserRegex.search(artifact.artifact_name):
with open(artifact.file_path) as f:
cser_data = f.read().strip()
events, info = parse_cser(cser_data.splitlines())
if not events:
log.warning(f"No CSER.TXT events from {dev.ip}")
if info:
process_info_into_dev(info, dev)
if events:
process_events(events, dev, dataset="cser")
elif serRegex.search(artifact.artifact_name):
with open(artifact.file_path) as f:
ser_data = f.read().strip()
events, info = parse_ser(ser_data.splitlines())
if not events:
log.warning(f"No SER.TXT events from {dev.ip}")
if info:
process_info_into_dev(info, dev)
if events:
process_events(events, dev, dataset="ser")
else:
try:
if config.HEAT_ARTIFACTS_DIR and artifact.file_path:
SELRelay.parse(to_parse=artifact.file_path, dev=dev)
else:
SELRelay.parse(to_parse=artifact.reconstructed_artifact, dev=dev)
except Exception:
log.exception(
f"[{artifact.id}] Failed to parse artifact due to an unhandled exception"
)
state.error = True
dev.related.ip.add(artifact.station_ip)
if dev.logic.author:
dev.related.user.add(dev.logic.author)
SELRelay.update_dev(dev)
# The Station which programs the device
# Generally a Engineering Workstation or a SCADA system
# TODO: merge data for station (use datastore.get()),
# since it's likely the same device?
station = datastore.create(artifact.station_ip, "ip")
station_iface = Interface(
type="ethernet",
mac=artifact.station_mac,
ip=artifact.station_ip,
)
station.store("interface", station_iface)
# TODO: set station vendor ID to the short manuf string
# (e.g. "Dell" instead of "Dell, Inc.")
station.description.vendor.name = artifact.station_oui
station.description.description = (
f"Host that programmed the device at {artifact.device_ip}. "
f"Likely a engineering workstation or SCADA server."
)
station.type = "PC"
station.related.ip.add(artifact.device_ip)
if dev.logic.author:
station.related.user.add(dev.logic.author)
station.populate_fields()
if config.DEVICE_DIR:
station.export_to_files()
# TODO: "heat_results" file with all results, keyed by file?
# Export parsed data to Elasticsearch
if state.elastic:
dev.export_to_elastic()
station.export_to_elastic()
def _cleanup_oui(mac: str) -> str:
# Older tshark (3.0.14) makes vendor names like "HewlettP_c0:b9:20"
# Modern tshark (3.2.3) is like "Hewlett Packard"
if mac.count("_") == 1 and mac.count(":") == 2:
return mac.split("_", maxsplit=1)[0]
return mac
__all__ = ["FTPExtractor"]