Source code for peat.protocols.ftp

import ftplib
import re
import traceback
from io import BufferedReader, BytesIO, TextIOWrapper
from pathlib import Path, PurePosixPath
from typing import BinaryIO, Optional

import peat  # Avoid circular imports
from peat import CommError, config, log, utils


[docs] class FTP: """ Generic wrapper for File Transfer Protocol (FTP) functionality. """ def __init__(self, ip: str, port: int = 21, timeout: float = 5.0) -> None: self.ip: str = ip self.port: int = port self.timeout: float = timeout self.log = log.bind( classname=self.__class__.__name__, target=f"{self.ip}:{self.port}", ) self.all_output: list[tuple | str] = [] self._ftp: ftplib.FTP | None = None self.log.trace2(f"Initialized {repr(self)}") @property def ftp(self) -> ftplib.FTP: """ :class:`ftplib.FTP` instance used for interacting with the server. """ if not self._ftp: try: # Since "connect" shouldn't be reused on instances, # we create a new instance for every new connection. self.log.debug( f"Attempting connection to {self.ip}:{self.port} (timeout: {self.timeout})" ) self._ftp = ftplib.FTP() self._ftp.connect(self.ip, self.port, self.timeout) except Exception as ex: self._ftp = None raise CommError(f"({self.ip}:{self.port}) {ex}") from ex # TODO: figure out a way to log FTP protocol debugging messages # to a file (like we're doing for peat.protocols.Telnet) if config.DEBUG == 2: # Moderate amount of output, generally a single line per request # NOTE: this output goes directly to stdout! self._ftp.set_debuglevel(1) elif config.DEBUG >= 3: # Maximum output, each line sent/received on the control connection self._ftp.set_debuglevel(2) self.log.info(f"Connected to {self.ip}:{self.port}") return self._ftp def __enter__(self) -> "FTP": return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.disconnect() if exc_type: self.log.debug(f"Unhandled exception while exiting - {exc_type.__name__}: {exc_val}") self.log.trace( f"Exception traceback\n" f"{''.join(traceback.format_tb(exc_tb))}" f"{exc_type.__name__}: {exc_val}" ) def __str__(self) -> str: return self.ip def __repr__(self) -> str: return f"{self.__class__.__name__}({self.ip}, {self.port}, {self.timeout})" @ftp.setter def ftp(self, obj: ftplib.FTP) -> None: self._ftp = obj @property def address(self) -> str: """ Alias for ``ip`` to make code cleaner for some PEAT modules. """ return self.ip
[docs] def disconnect(self) -> None: """ Attempt to cleanly disconnect from the device. """ if self._ftp is not None: try: # Try to do it cleanly self._ftp.quit() except Exception as exquit: self.log.trace2(f"Unclean quit(): {exquit}") try: # If that can't be done, close it normally self._ftp.close() except Exception as exclose: self.log.trace2(f"Unclean close(): {exclose}") self._ftp = None self.log.debug(f"Disconnected from {self.ip}:{self.port}") if self.all_output: # Save the raw output to disk as an artifact try: dev = peat.data.datastore.get(self.ip) dev.write_file(self.all_output, "raw-ftp-output.json", merge_existing=True) except Exception as ex: self.log.warning(f"Failed to write raw output to file: {ex}")
[docs] def login(self, user: str = "", passwd: str = "") -> bool: """ Login to the FTP server. .. note:: This function should be called only once for each instance, per the documentation for :mod:`ftplib`. Args: user: Username to login with (default: ``anonymous``) passwd: Password to login with (default: ``anonymous@``) Returns: If the login was successful """ self.log.debug(f"Attempting login as user '{user}'") try: self.ftp.login(user, passwd) except ftplib.all_errors as ex: err_str = str(ex) if "530" in err_str: err_str = "access was denied or server does not allow anonymous login" self.log.debug(f"Login failed as user '{user}': {err_str}") return False self.log.info(f"Successfully logged in as user '{user}'") return True
[docs] def download_binary( self, filename: str, save_path: Path | None = None, save_to_file: bool = True ) -> bytes: """ Download a binary file from the FTP server. Args: filename: Name or path of the file to download. This path must be valid on the server. Relative paths generally work best. save_path: Path on the local system to save the file to. If not specified then it's saved to the device's standard output directory and filename. save_to_file: If the data should be automatically written to a file as defined by ``save_path`` Returns: The binary file data as :class:`bytes` """ file_obj = BytesIO() cmd = f"RETR {filename}" self.log.trace2(f"download_binary command string: '{cmd}'") # TODO: workaround for odd behavior seen in some SEL relays # from _ssl import _SSLSocket # def retrbinary(obj: ftplib.FTP, command, callback, blocksize=8192, rest=None): # obj.voidcmd('TYPE I') # resp = obj.voidresp() # with obj.transfercmd(command, rest) as conn: # while 1: # data = conn.recv(blocksize) # if not data: # break # callback(data) # # shutdown ssl layer # if _SSLSocket is not None and isinstance(conn, _SSLSocket): # conn.unwrap() # return resp # _old_retr = self.ftp.retrbinary # self.ftp.retrbinary = retrbinary # self.ftp.retrbinary(self.ftp, cmd, file_obj.write) # self.ftp.retrbinary = _old_retr self.ftp.retrbinary(cmd, file_obj.write) data = file_obj.getvalue() if not data: self.log.warning(f"No data for binary file '{filename}' on {self.ip} (command: {cmd})") elif not isinstance(data, bytes): self.log.error( f"download_binary: data has type '{data.__class__.__name__}', not 'bytes'" ) # Save the raw file to disk as an artifact elif data and save_to_file: if save_path: utils.write_file(data, save_path) else: dev = peat.data.datastore.get(self.ip) if filename.startswith("/"): filename = filename[1:] # Prevent files being saved in "/" dev.write_file(data=data, filename=filename, out_dir=dev.get_sub_dir("ftp_files")) dev.related.files.add(filename) return data
[docs] def download_text( self, filename: str, save_path: Path | None = None, save_to_file: bool = True ) -> str: """ Download a text file from the FTP server. Args: filename: Name or path of the file to download. This path must be valid on the server. Relative paths generally work best. save_path: Path on the local system to save the file to. If not specified then it's saved to the device's standard output directory and filename. save_to_file: If the data should be automatically written to a file as defined by ``save_path`` Returns: Text file data as a :class:`str` """ text = "" lines = [] cmd = f"RETR {filename}" self.log.trace2(f"download_text command string: '{cmd}'") self.ftp.retrlines(cmd, lines.append) if lines: text = "\n".join(lines) if save_to_file: # Save the raw file to disk as an artifact if save_path: utils.write_file(text, save_path) else: dev = peat.data.datastore.get(self.ip) if filename.startswith("/"): filename = filename[1:] # Prevent files being saved in "/" dev.write_file( data=text, filename=filename, out_dir=dev.get_sub_dir("ftp_files"), ) dev.related.files.add(filename) else: self.log.warning(f"No data for text file '{filename}' on {self.ip} (command: {cmd})") return text
[docs] def find_file(self, check_for: str, ext: str, directory: str | None = None) -> str | None: for filename in self.nlst_files(directory): if check_for in filename and filename.endswith(ext): return filename self.log.debug( f"Could not find file {check_for} ending with {ext} " f"in the output of the 'NLST' command" ) return None
[docs] def upload_text(self, filename: str, content: str | bytes | TextIOWrapper | BinaryIO) -> None: # We make a new variable to avoid over-writing the argument reference if isinstance(content, str): file_obj = BytesIO(content.encode("utf-8")) # str to bytes elif isinstance(content, bytes): file_obj = BytesIO(content) else: file_obj = content self.ftp.storlines(f"STOR {filename}", file_obj)
[docs] def upload_binary(self, filename: str, content: bytes | BufferedReader) -> None: # We make a new variable to avoid over-writing the argument reference if isinstance(content, bytes): file_obj = BytesIO(content) else: file_obj = content self.ftp.storbinary(f"STOR {filename}", file_obj)
[docs] def cmd(self, command: str) -> str | None: """Execute a raw FTP command.""" if isinstance(command, bytes): command = command.decode("utf-8") try: resp = self.ftp.sendcmd(command) self.all_output.append((command, resp)) return resp except ftplib.all_errors as ex: self.log.debug(f"Command '{command}' failed: {ex}") return None
[docs] def cd(self, directory: str) -> bool: """Change directory on the server (``cwd``).""" self.log.debug(f"Changing directory to '{directory}'") return True if self._do("cwd", directory) is not None else False
[docs] def cwd(self, directory: str) -> bool: return self.cd(directory)
[docs] def pwd(self) -> str | None: """Get the current working directory on the server.""" return self._do("pwd")
[docs] def mkdir(self, directory: str) -> bool: """Create a directory on the server.""" return True if self._do("mkd", directory) is not None else False
[docs] def rmdir(self, directory: str) -> bool: """Remove a directory on the server.""" return True if self._do("rmd", directory) is not None else False
[docs] def file_size(self, filename: str) -> int | None: """Get the size of a file on the server.""" return self._do("size", filename)
[docs] def file_delete(self, filename: str) -> int | None: """Remove a file from the server.""" return self._do("delete", filename)
[docs] def file_rename(self, filename: str, new_name: str) -> int | None: """Rename a file on the server.""" return self._do("rename", filename, new_name)
[docs] def dir(self, directory: str | None = None) -> tuple[list[str], list[dict]] | None: """ List files on the FTP server, including file metadata (``dir`` command). This returns two objects: - List of filenames - List of dicts with detailed information about each file, including type of file, modification time, and size. Returns: :class:`tuple` with list of filenames and list of dicts with file metadata, or :obj:`None` if the command failed. """ file_names = [] file_metadata = [] # Approximate header structure: # TYPE PERMS ? USER GROUP SIZE DATE-MODIFIED FILENAME # # Examples: # '-rwxrwxrwx 1 0 0 437824 JAN 21 2011 FILE.SYS' # '---------- 0 0 0 0 JAN 01 1970 null' # 'drwxrwxrwx 5 0 0 8192 JAN 01 1970 OS' def _append_func(line: str) -> None: """ Python's ftplib takes a function when calling LIST, and will call the function with the result of each line of data returned from LIST. """ self.all_output.append(line) parts = [x.strip() for x in line.strip().split(" ") if x.strip()] name = " ".join(parts[8:]) # skip "." and ".." since they're not files if name in [".", ".."]: return file_names.append(name) if directory: path = str(PurePosixPath(directory, name)) parent = directory if not parent.endswith("/"): parent += "/" else: # Relative to current directory (directory arg is None) path = str(PurePosixPath(name)) # NOTE: it's impossible to know parent without running # "pwd" if directory is None (a relative dir command). parent = "" metadata = { "type": "dir" if parts[0][0] == "d" else "file", "permissions": parts[0][1:], "size": int(parts[4]), "modified": utils.parse_date(" ".join(parts[5:8])), "name": name, "path": path, "parent": parent, } file_metadata.append(metadata) try: if directory is None: self.ftp.dir(_append_func) else: self.ftp.dir(directory, _append_func) except ftplib.all_errors as ex: self.log.warning(f"'dir' failed for {directory}: {ex}") return None return file_names, file_metadata
[docs] def rdir( self, directory: str | None = None, _paths_done: set | None = None, ) -> tuple[list[str], list[dict]] | None: """ Recursively lists files on the server and parses metadata about those files (the ``dir`` command). This calls recursively calls :func:`~peat.protocols.FTP.dir` on any directories, and returns the consolidated output of the calls. Refer to that method's docstring for further details about the returned data. Returns: Tuple with list of filenames and list of dicts with file metadata, or :obj:`None` if the command failed. """ dir_result = self.dir(directory) if not dir_result: return None # this is all filenames and directories, without path filenames = dir_result[0] metadata = dir_result[1] if _paths_done is None: _paths_done = set() for file_dict in metadata: if file_dict["type"] == "dir": # Don't do the same directory twice if file_dict["path"] in _paths_done: continue _paths_done.add(file_dict["path"]) sub_result = self.rdir(file_dict["path"], _paths_done) if not sub_result: continue # Sanity checks for duplicates for filename in sub_result[0]: if filename not in filenames: filenames.append(filename) # The _paths_done checks should prevent this from ever happening, # but better to be safe than sorry like in the past... for meta in sub_result[1]: if meta not in metadata: metadata.append(meta) else: self.log.warning(f"Duplicate metadata: {meta}") return filenames, metadata
[docs] def download_files(self, local_dir: Path, files: list[dict]): """ Download files from a device to a directory on the local system. The file structure of the local files will match that on the device, if possible. .. warning:: On Windows, there are restrictions on characters allowed in paths, so the paths may vary on that platform. Args: local_dir: Path to local directory to save downloaded files to. files: Listing of files to download, as returned from :func:`~peat.protocols.FTP.dir` or :func:`~peat.protocols.FTP.rdir`. """ to_download = [f for f in files if f["type"] == "file"] self.log.info(f"Downloading {len(to_download)} files to '{local_dir}'") if not local_dir.exists(): local_dir.mkdir(parents=True, exist_ok=True) good = 0 failed = 0 for entry in to_download: save_path = Path(local_dir, entry["path"].lstrip("/")) if save_path.exists(): self.log.warning(f"{save_path} already exists, new file: {entry}") try: self.log.debug(f"Downloading {entry['path']}") self.download_binary(entry["path"], save_path) good += 1 except Exception as ex: self.log.warning(f"Failed to download '{entry['path']}': {ex}") failed += 1 self.log.info( f"Finished downloading {len(to_download)} files to '{local_dir}'. " f"{good} downloads were successful, {failed} downloads failed." )
[docs] def nlst(self, directory: str | None = None) -> list[str] | None: """``nlst`` command to list files on the server.""" if directory is None: return self._do("nlst") else: return self._do("nlst", directory)
[docs] def nlst_files(self, directory: str | None = None) -> list[str]: """ List files in a directory on the device using NLST. This is a wrapper around :func:`~peat.protocols.FTP.nlst`. Args: directory: Case-insensitive name of directory to list the contents of. If :obj:`None` or empty string, the current directory is listed. Returns: List of names of files in the directory. """ files = self.nlst(directory) if files is None: return [] return files
[docs] def list_command(self, directory: str) -> list[str]: """ Get list of files with file type, permission, and timestamp information. This uses the LIST FTP command directly, instead of ``nlst`` or ``dir``. Args: directory: Case-insensitive name of directory to list the contents of. Returns: List of names of files in the directory, or empty list if the command failed. """ listings = [] self._do("retrlines", f"LIST {directory}", listings.append) return listings
[docs] def getwelcome(self) -> str | None: """Return the welcome message sent by the server.""" return self._do("getwelcome")
[docs] def process_vxworks_ftp_welcome( self, welcome: str, dev: Optional["peat.data.models.DeviceData"] = None ) -> str | None: """ Extract the VxWorks version from the FTP welcome message. Args: welcome: FTP welcome message string to parse dev: DeviceData object to annotate with extracted information If :obj:`None`, no information will be annotated, the version will be returned and nothing else will happen. Returns: The version number as a string, or :obj:`None` if the parse failed. """ if dev: dev.extra["ftp_welcome"] = welcome lower_welcome = welcome.strip().lower() if "vxworks" in lower_welcome: result = re.search(r"\(vxworks\s+(?:vxworks|)([\d\.]+)\)", lower_welcome) elif "wind river" in lower_welcome: result = re.search(r"wind river ftp server ([\d\.]+)\s+", lower_welcome) else: self.log.warning(f"Unknown format for FTP welcome '{welcome}'") return None if result: version = result.groups()[0] if dev: if not dev.os.version: dev.os.version = version else: dev.extra["vxworks_version_from_ftp"] = version if not dev.os.name: dev.os.name = "VxWorks" if not dev.os.vendor.name: dev.os.vendor.name = "Wind River Systems" if not dev.os.vendor.id: dev.os.vendor.id = "WindRiver" return version else: self.log.warning(f"Failed to parse FTP welcome '{welcome}' from {dev.ip}") return None
[docs] def _do(self, func: str, *args) -> list | str | None: try: resp = getattr(self.ftp, func)(*args) self.all_output.append((func, resp)) return resp except ftplib.all_errors as ex: self.log.debug(f"Action '{func}' failed: {ex}") return None
__all__ = ["FTP"]