Source code for peat.modules.woodward.micronet

"""
Interact with Woodward MicroNet Plus control systems.

Listening network services

- FTP (TCP 21)

Authors

- Christopher Goes
- Patrica Schulz
- Jessica Robinson
"""

import ftplib
import pathlib
from pathlib import Path

from dateutil import parser

from peat import CommError, DeviceData, DeviceModule, IPMethod
from peat.protocols import FTP

from . import parse_micronet


[docs] class MicroNet(DeviceModule): """Woodward MicroNet Plus.""" device_type = "Control System" vendor_id = "Woodward" vendor_name = vendor_id brand = "MicroNet" model = "Plus" default_options = {"ftp": {"timeout": 10, "user": "ServiceUser", "pass": "ServiceUser"}} @classmethod def _verify_ftp(cls, dev: DeviceData) -> bool: """ Verify that this device is a Micronet. The text in the pulled file Registry.txt should contain the string "Micronet". """ verified = False port = int(dev.options["ftp"]["port"]) timeout = float(dev.options["ftp"]["timeout"]) try: with FTP(dev.ip, port, timeout=timeout) as f: cls.log.info("Attempting to verify") f.login(user=dev.options["ftp"]["user"], passwd=dev.options["ftp"]["pass"]) # attempt to pull boot file via FTP f.cwd("HD1Flash/Registry") filename = "Registry.txt" boot_text = f.download_binary(filename) # check if "micronet" is in the pulled boot file to confirm device if boot_text: boot_text_decoded = boot_text.decode() if "micronet" in boot_text_decoded.lower(): verified = True cls.log.info("Device verified as Micronet") else: cls.log.warning("Unable to verify device via FTP") except (OSError, CommError) as err: cls.log.error(f"Failed to connect to {dev.ip}: {err}") raise err from None return verified @classmethod def _pull(cls, dev: DeviceData) -> bool: """ If there is a file specified in command line args, pull that file only. Otherwise, pull all files """ # TODO: configurable timeout with FTP(dev.ip, dev.options["ftp"]["port"], timeout=100) as ftp: ftp.login(dev.options["ftp"]["user"], dev.options["ftp"]["pass"]) # find the top-level directory last_dir = "" try: # for some reason, f.pwd() isn't working, so we have to use this # approach instead while last_dir != ftp.cmd("PWD").split('"')[1]: try: last_dir = ftp.cmd("PWD").split('"')[1] ftp.cwd("..") except ftplib.all_errors as e: if "No such file or directory" not in str(e): cls.log.exception("Failed FTP pull") except ftplib.all_errors: cls.log.exception("Failed FTP pull") return False # get all the files and folders down the tree cls.pull_all_files(dev, last_dir, dev._out_dir, ftp) cls.get_gap_info(dev) return True
[docs] @classmethod def is_file(cls, filename: str, ftp: FTP) -> bool: """Check if a given file is a file or directory""" try: lst = ftp.nlst(filename) if lst and ftp.file_size(filename): if "total" in lst[0] and lst[1] == filename: return True elif lst[0] == filename: return True else: return False else: return False except ftplib.all_errors: return False
[docs] @classmethod def pull_all_files(cls, dev: DeviceData, directory: str, output_dir: pathlib.Path, ftp: FTP): """ Pull all files available recursively Args: directory: the next directory to investigate (current directory when this function is called) output_dir: the last path (path to direcrtory), which will be added on to each function call """ # skip these special directories if directory and directory not in {".", ".."}: path = "" curr_dir = ftp.cmd("PWD").split('"')[1] # We want to mirror the directory structure of the target # first check if this is the top level dir, # this if just avoids having two path objects with separate anchors if directory != "/": path = output_dir / directory else: path = output_dir # Now pull all the files and make a list of all the folders dir_res = ftp.dir() if not dir_res: cls.log.error("ftp dir failed") return folder_list = [] file_list = dir_res[0] # if there are files in the directory, iterate through them if file_list: for filename in file_list: if cls.is_file(filename, ftp): # if file, download and save locally try: ftp.download_binary(filename, dev.get_out_dir() / path / filename) # get file hashes and store in dev.extra timestamp = ftp.cmd(f"MDTM {filename}").split()[1] date_time = parser.parse(timestamp) parse_micronet.create_artifact( dev, dev.get_out_dir() / path / filename, date_time.isoformat(), ) except ftplib.all_errors: cls.log.error(f"Unable to retrieve {filename}") raise ftplib.all_errors from None else: # this is a directory, store in list of folders try: ftp.cwd(filename) folder_list.append(filename) ftp.cwd(curr_dir) except ftplib.all_errors as err: cls.log.error(f"Failed pull: {err}") # look for files we want to parse if "vxWorks" in filename: cls.log.info("Attempting to extract info from firmware") # get last modified timestamp from ftp server timestamp = ftp.cmd(f"MDTM {filename}").split()[1] date_time = parser.parse(timestamp) cls.log.info(f"Firmware last modified at {date_time}") filepath = Path(dev.get_out_dir(), path, filename) parse_micronet.get_info_firmware(dev, filepath, date_time) if "bootrom" in filename.lower(): cls.log.info("Attempting to extract info from boot firmware") # get last modified timestamp from ftp server timestamp = ftp.cmd(f"MDTM {filename}").split()[1] date_time = parser.parse(timestamp) cls.log.info(f"Boot firmware last modified at {date_time}") filepath = Path(dev.get_out_dir(), path, filename) parse_micronet.get_info_bootrom(dev, filepath, date_time) if "Log.txt" in filename[0:7] and "HD1Flash" in str(path): cls.log.info(f"Attempting to extract info from {filename}") parse_micronet.parse_log(dev, dev.get_out_dir() / path / filename) # Now recurse over the folders for folder in folder_list: ftp.cwd(folder) cls.pull_all_files(dev, folder, path, ftp) ftp.cwd(curr_dir) cls.update_dev(dev)
[docs] @classmethod def get_gap_info(cls, dev: DeviceData) -> None: """ Determine most recent GAP file used from dev.event, Get file info on that GAP file, store in Logic """ for event in reversed(dev.event): if "Loading" in event.message and "gap" in event.message: filename = event.message.split(" ")[1] break # only get most recent event for gap file load # get GAP file info from log file (or just dev.event?) for artifact in dev.extra.get("file_artifacts"): if artifact["filename"] == filename: parse_micronet.get_gap_info( dev, filename, artifact["filepath"], artifact["timestamp"] ) break
MicroNet.ip_methods = [ IPMethod( name="Micronet FTP login", description=str(MicroNet._verify_ftp.__doc__).strip(), type="unicast_ip", identify_function=MicroNet._verify_ftp, reliability=8, protocol="ftp", transport="tcp", default_port=21, ) ] __all__ = ["MicroNet"]