"""
PEAT module for Fortinet Fortigate devices.
It has been tested with the Fortigate FG100F firewall.
Types of files that can be parsed:
- Config files (.conf)
- Diagnostic output ("Debug Logs")
- Events from Fortianalyzer
- Memory events
Authors
- Christopher Goes
- Danyelle Loffredo
- Juan Dorantes Cardenas
"""
from pathlib import Path
from scp import SCPClient
from peat import DeviceData, DeviceModule, datastore, exit_handler
from peat.api.identify_methods import IPMethod
from peat.protocols import HTTP, SSH
from .fortigate_conf import fg_conf_to_dict, process_fg_conf
from .fortigate_dbl import parse_fg_debug_log, process_fg_debug_log
from .fortigate_events import parse_fg_events, process_fg_events
[docs]
class Fortigate(DeviceModule):
"""
Fortinet Fortigate firewalls.
This module supports the FG100F firewall.
"""
device_type = "Firewall"
vendor_id = "Fortinet"
vendor_name = "Fortinet, Inc."
brand = "FortiGate"
# TODO: file fingerprinting by reading contents of text file
filename_patterns = [
"*Fortigate*.conf",
"*ortigate*.conf",
"FG100F*.log",
"fortianalyzer-event*.log",
"memory-event-*.log",
"sys_config",
]
can_parse_dir: bool = True
module_aliases = ["fg100", "fg"]
default_options = {
"fortigate": {
"pull_methods": [
"ssh",
"https",
],
"log_pull_timeout": 30.0,
},
"ssh": {"user": "", "pass": ""},
"web": {"user": "", "pass": ""},
}
@classmethod
def _parse(cls, file: Path, dev: DeviceData | None = None) -> DeviceData | None:
# Ideally, the user would pass a directory of files. Recurse the tree and parse
# all the fortigate files there, including log files. Use the .conf file to
# annotate info such as device name, etc.
# If a directory is NOT passed, then parse the file piecemeal.
if dev is None:
dev = DeviceData()
datastore.objects.append(dev)
# parse directory as a whole, conf first, to properly annotate events
if file.is_dir():
# Find all the files recursively
# Parse the config file(s) first
# then parse the log files
conf_files = file.rglob("*.conf")
log_files = file.rglob("*.log")
for c_path in conf_files:
cls._fg_parse_file(c_path, dev)
for l_path in log_files:
cls._fg_parse_file(l_path, dev)
# Single File
else:
cls._fg_parse_file(file, dev)
cls.update_dev(dev)
return dev
@classmethod
def _fg_parse_file(cls, path: Path, dev: DeviceData):
raw_data = path.read_text(encoding="utf-8")
if path.suffix == ".conf":
res = fg_conf_to_dict(raw_data)
process_fg_conf(res, dev)
dev.write_file(res, f"parsed_config_{path.stem}.json")
# TODO: peek at file contents to determine if it should be parsed
elif path.suffix == ".log" and "debug" in path.stem:
res = parse_fg_debug_log(raw_data)
process_fg_debug_log(res, dev)
dev.write_file(res, f"parsed_debug_log_{path.stem}.json")
# TODO: peek at file contents to determine if it should be parsed
elif path.suffix == ".log" and "memory-event-" in path.stem:
res = parse_fg_events(raw_data)
process_fg_events(res, dev)
dev.write_file(res, f"parsed_events_{path.stem}.json")
else:
cls.log.warning(f"Unknown file type for '{path}'")
cls.update_dev(dev)
@classmethod
def _pull(cls, dev: DeviceData) -> bool:
# Sanity checks in case users messed up config (since there isn't config validation yet)
if not dev.options["fortigate"]["pull_methods"]:
cls.log.error(f"The 'fortigate.pull_methods' option is empty or null for {dev.ip}")
return False
for method in dev.options["fortigate"]["pull_methods"]:
if method not in cls.default_options["fortigate"]["pull_methods"]:
cls.log.error(
f"Invalid 'fortigate.pull_methods' method '{method}' for {dev.ip}, it must "
f"be one of {cls.default_options['fortigate']['pull_methods']}"
)
return False
# Notify user about methods that were skipped
for method in cls.default_options["fortigate"]["pull_methods"]:
if method not in dev.options["fortigate"]["pull_methods"]:
cls.log.debug(
f"Skipping {method.upper()} pull from {dev.ip}, "
f"'{method}' not in 'fortigate.pull_methods'"
)
pull_results = {}
for method in dev.options["fortigate"]["pull_methods"]:
if dev.service_status({"protocol": method}) == "closed":
cls.log.error(
f"Failed to pull {method.upper()} on {dev.ip}: {method} port is closed"
)
pull_results[method] = False
continue
if not dev._is_verified:
verified = False
if method == "https" and cls._verify_https(dev):
verified = True
elif method == "ssh" and cls._verify_ssh(dev):
verified = True
if verified:
dev._is_verified = True
else:
cls.log.warning(
f"Failed to pull {method.upper()} on {dev.ip}: verification failed"
)
continue
cls.log.info(f"Pulling data via {method.upper()} from {dev.ip}")
if method == "https":
pull_results[method] = cls.pull_https(dev)
elif method == "ssh":
pull_results[method] = cls.pull_ssh(dev)
if not pull_results[method]:
cls.log.error(f"{method.upper()} pull failed on {dev.ip}")
else:
cls.log.debug(f"{method.upper()} pull successful for {dev.ip}")
return all(bool(result) for result in pull_results.values())
[docs]
@classmethod
def pull_ssh(cls, dev: DeviceData) -> bool:
"""
Retrieve the sys_config via SCP over SSH from ``/config/sys_config``.
.. warning::
SSH pull will only work if ``admin-scp`` option is enabled on the device.
SSH to device, then run ``config system global``, ``set admin-scp enable``,
``end``, and ``exit``.
"""
try:
if not dev._cache.get("ssh_session"):
ssh = SSH(
ip=dev.ip,
port=dev.options["ssh"]["port"],
timeout=dev.options["ssh"]["timeout"],
username=dev.options["ssh"]["user"],
password=dev.options["ssh"]["pass"],
)
else:
ssh = dev._cache["ssh_session"]
local_path = dev.get_out_dir() / "sys_config"
if not local_path.parent.exists():
local_path.parent.mkdir(exist_ok=True, parents=True)
remote_path = "/config/sys_config" # location of config file on machine
cls.log.info(f"Transferring sys_config from '{remote_path}' to '{local_path}'")
# Download the file
with SCPClient(ssh.comm.get_transport()) as scp_client:
scp_client.get(remote_path, str(local_path), preserve_times=True)
cls.log.info(f"Successfully copied '{remote_path}' to '{local_path}'")
# TODO: run CLI commands that are run by the debug log, e.g. "get hardware cpu"
ssh.disconnect()
exit_handler.unregister(ssh.disconnect, "CONNECTION")
# parse the result
cls.log.info(f"Parsing pulled config: {local_path.name}")
res = fg_conf_to_dict(local_path.read_text(encoding="utf-8"))
process_fg_conf(res, dev)
dev.write_file(res, "parsed_sys_config.json")
return True
except Exception as ex:
cls.log.exception(f"SSH pull failed from {dev.ip}: {ex}")
if "permission denied" in str(ex).lower():
cls.log.error(
"SSH pull likely failed due to SCP not being enabled. "
"To enable this, SSH into your device as an administrator, "
"run the following commands, then re-run the PEAT pull:"
"\n\tconfig system global\n\tset admin-scp enable"
"\n\tend\n\texit"
)
return False
[docs]
@classmethod
def pull_https(cls, dev: DeviceData) -> bool:
"""
Pull configuration and other data from the relay via HTTPS.
"""
if not dev._cache.get("https_session"):
http = HTTP(
ip=dev.ip,
port=dev.options["https"]["port"],
timeout=dev.options["https"]["timeout"],
dev=dev,
protocol="https",
)
else:
http = dev._cache["https_session"]
cls.log.info(f"Logging in to web interface on {dev.ip}:{http.port}")
# TODO: use http.url attribute instead of manually creating
login_resp = http.post(
url=f"https://{dev.ip}:{http.port}/logincheck",
dev=dev,
data={
"ajax": 1,
"username": dev.options["web"]["user"],
"secretkey": dev.options["web"]["pass"],
},
headers={
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36"
),
"If-Modified-Since": "Sat, 1 Jan 2000 00:00:00 GMT",
},
)
# Expected response.text: '1document.location="/prompt?viewOnly&redir=%2F";'
if "location" not in login_resp.text or login_resp.status_code != 200:
cls.log.warning(f"Web login failed to {dev.ip}:{http.port}")
return False
pull_timeout = dev.options["fortigate"]["log_pull_timeout"] # type: float
# Memory events. This is a ~9+MB file, needs time to download
cls.log.info(
f"Downloading all events from 'memory' log location on {dev.ip}, "
f"this may take up to {pull_timeout} seconds"
)
me_resp = http.get("/api/v2/log/memory/event/system/raw", timeout=pull_timeout)
if not me_resp:
cls.log.error(
f"Failed to pull events from 'memory' location on {dev.ip}. "
f"Try increasing the timeout via the 'fortigate.log_pull_timeout' "
f"YAML config option, current timeout is set to {pull_timeout}."
)
return False
me_parsed = parse_fg_events(me_resp.text)
process_fg_events(me_parsed, dev)
# Move the raw file to results dir, and write parsed config to JSON
me_files = list(dev.get_sub_dir("http_files").rglob("memory-event-*.log"))
if me_files:
me_path = me_files[0]
me_path.rename(dev.get_out_dir() / me_path.name)
dev.write_file(me_parsed, f"parsed_events_{me_path.stem}.json")
# Debug log. This is also a large file, needs time to download, hence timeout
cls.log.info(
f"Downloading 'debug' log from {dev.ip}, this may take up to {pull_timeout} seconds"
)
db_resp = http.get("/api/v2/monitor/system/debug/download", timeout=pull_timeout)
if not db_resp:
cls.log.error(
f"Failed to pull debug log from {dev.ip}. "
f"Try increasing the timeout via the 'fortigate.log_pull_timeout' "
f"YAML config option, current timeout is set to {pull_timeout}."
)
return False
db_parsed = parse_fg_debug_log(db_resp.text)
process_fg_debug_log(db_parsed, dev)
# Move the raw file to results dir, and write parsed config to JSON
db_files = list(dev.get_sub_dir("http_files").rglob("*_debug.log"))
if db_files:
db_path = db_files[0]
db_path.rename(dev.get_out_dir() / db_path.name)
dev.write_file(db_parsed, f"parsed_debug_log_{db_path.stem}.json")
cls.log.info(f"Finished web interface pull from {dev.ip}:{http.port}")
return True
@classmethod
def _verify_ssh(cls, dev: DeviceData) -> bool:
ssh = SSH(
ip=dev.ip,
port=dev.options["ssh"]["port"],
timeout=dev.options["ssh"]["timeout"],
username=dev.options["ssh"]["user"],
password=dev.options["ssh"]["pass"],
)
if ssh.comm:
dev._cache["ssh_session"] = ssh
exit_handler.register(dev._cache["ssh_session"].disconnect, "CONNECTION")
# TODO: actually fingerprint this with a command
return True
else:
ssh.disconnect()
return False
@classmethod
def _verify_https(cls, dev: DeviceData) -> bool:
port = dev.options["https"]["port"]
timeout = dev.options["https"]["timeout"]
cls.log.debug(f"Verifying FortiGate HTTPS for {dev.ip}:{port} (timeout: {timeout})")
http = HTTP(
ip=dev.ip,
port=port,
timeout=timeout,
dev=dev,
protocol="https",
)
resp = http.get("/")
if resp and "fortigate" in resp.text.lower():
dev._cache["https_session"] = http
return True
http.disconnect()
cls.log.debug(f"FortiGate HTTPS verification failed for {dev.ip}:{port}")
return False
Fortigate.ip_methods = [
IPMethod(
name="Fortigate SSH login",
description=str(Fortigate._verify_ssh.__doc__).strip(),
type="unicast_ip",
identify_function=Fortigate._verify_ssh,
reliability=6,
protocol="ssh",
transport="tcp",
default_port=22,
),
IPMethod(
name="Fortigate web page check",
description=str(Fortigate._verify_ssh.__doc__).strip(),
type="unicast_ip",
identify_function=Fortigate._verify_https,
reliability=8,
protocol="https",
transport="tcp",
default_port=443,
),
]
__all__ = ["Fortigate"]