Source code for peat.modules.sel.sel_comms

import timeit
from fnmatch import fnmatch
from pathlib import Path, PurePosixPath

from peat import DeviceData, DeviceError, config, log, utils
from peat.protocols import FTP

from .sel_serial import SELSerial
from .sel_telnet import SELTelnet


[docs] def clean_filenames(filenames: list[bytes | str]) -> list[str]: """ Some file listings contain embedded null bytes. """ clean = [] for filename in filenames: if isinstance(filename, bytes): filename = filename.decode() clean.append(filename.strip().replace("\x00", "")) return clean
def _method_name(comms: FTP | SELTelnet | SELSerial) -> str: if isinstance(comms, FTP): return "FTP" elif isinstance(comms, SELTelnet): return "Telnet" elif isinstance(comms, SELSerial): return "Serial" else: raise DeviceError(f"Invalid comms class: {comms.__class__.__name__}")
[docs] def download_files( files: list[str], comms: FTP | SELTelnet | SELSerial, save_dir: Path | None, handle_download_errors: bool = True, ) -> dict[str, dict[str, bytes | str | Path]]: """ Download a selected list of files from a relay in a comms-agnostic manner. .. code-block:: python { "filename": { "name": filename, "data": file_data, "raw_path": path, "device_path": PurePosixPath(...), "local_path": Path(...), }, ... } """ method = _method_name(comms) log.info(f"Retrieving {len(files)} files from {comms.address} via {method}") if isinstance(comms, SELTelnet) and not comms.can_download_files(): raise DeviceError( f"{comms.address} is not capable of downloading files " f"via Telnet. Terminating Telnet download." ) # filename: {path, data} all_file_data = {} for path in files: # path can be: # - "SET_P1.TXT" # - "SETTINGS SET_P1.TXT" # - "SETTINGS/SET_P1.TXT" (need to confirm) fmt_path = path.split(" ")[-1].replace("\\", "/") if fmt_path[0] != "/": fmt_path = f"/{fmt_path}" path_obj = PurePosixPath(fmt_path) filename = path_obj.name log.info(f"Retrieving '{path}' from {comms.address} via {method}") try: file_data = comms.download_binary(path, save_to_file=False) if file_data is None: log.error(f"Failed to download {filename} from {comms.address} via {method}") continue if isinstance(file_data, bytes) and path.upper().endswith(".TXT"): file_data = file_data.decode("utf-8") file_dict = { "name": filename, "data": file_data, # bytes or str "raw_path": path, "device_path": path_obj, "local_path": "", } if save_dir: local_path = save_dir / filename if isinstance(file_data, str) and file_data.strip() == "=>": log.warning( f"File '{filename}' only has '=>' as contents, " f"probably as a artifact of a Telnet mis-parse. " f"Writing empty data instead." ) utils.write_file("", local_path) else: utils.write_file(file_data, local_path) file_dict["local_path"] = local_path # Check for duplicates. An example of this is on the 700G, # which has EVENTS/HISTORY.TXT and REPORTS/HISTORY.TXT, # which contain the same data (timestamp will differ tho), # but have different parent directories. if filename in all_file_data: ext_count = 1 while f"{filename}.{ext_count}" in all_file_data: ext_count += 1 updated_filename = f"{filename}.{ext_count}" log.warning( f"{filename} already exists in all_file_data! " f"Saving to {updated_filename} instead." ) filename = updated_filename all_file_data[filename] = file_dict except Exception as ex: if handle_download_errors: log.error(f"Failed to retrieve '{path}' from {comms.address} via {method}: {ex}") else: raise ex from None log.info(f"Retrieved {len(files)} files from {comms.address} via {method}") return all_file_data
[docs] def filter_names( paths: list[str], only: list[str] | None = None, never: list[str] | None = None, ) -> list[str]: if not only and not never: # short-circuit common case return sorted(paths) if only is not None and not isinstance(only, list): raise DeviceError( f"'only' filters must be a list, not '{type(only).__name__}' " f"(value provided: {repr(only)})" ) if never is not None and not isinstance(never, list): raise DeviceError( f"'never' filters must be a list, not '{type(never).__name__}'" f"(value provided: {repr(never)})" ) filtered = set() for path in sorted(paths): if only: for allow_pattern in only: if path == allow_pattern or fnmatch(path.lower(), allow_pattern.lower()): filtered.add(path) break elif never: excluded = False for exclude_pattern in never: if path == exclude_pattern or fnmatch(path.lower(), exclude_pattern.lower()): excluded = True break if not excluded: filtered.add(path) return sorted(filtered)
[docs] def pull_files( dev: DeviceData, comms: FTP | SELTelnet | SELSerial ) -> dict[str, dict[str, bytes | str]]: """ Recursively download all files from a relay in a comms-agnostic manner. Args: dev: The device to pull from comms: Communication method to use, as a class instance. It must have the methods ``list_files()`` and ``download_binary``. Returns: The files downloaded, keyed by filename, with value being a dict including the file data, the device path, and the local path (if it was saved locally). """ method = _method_name(comms) start_time = timeit.default_timer() log.info(f"Pulling relay config files from {dev.address} via {method}") # Directory to save raw files to # Format: peat_results/<run-dir>/devices/{address}/relay_files_{timestamp}/* configs_dir = None if config.DEVICE_DIR: configs_dir = dev.get_sub_dir("relay_files") # Determine files to download from root directory and any sub-directories populate_file_listing(dev, comms) log.trace(f"{method} file listing: {dev.extra['file_listing']}") to_download = filter_names( list(dev.extra["file_listing"].keys()), only=dev.options["sel"]["only_download_dirs"], never=dev.options["sel"]["never_download_dirs"], ) log.info(f"Directories to download from {dev.address} via {method}: {to_download}") # Raw and file paths contents of all files downloaded all_files = {} # Download files from each directory for dirname in to_download: # Filter files to download based on config settings filenames = filter_names( dev.extra["file_listing"][dirname], dev.options["sel"]["only_download_files"], dev.options["sel"]["never_download_files"], ) if not filenames: log.warning( f"Skipping directory {dirname} on {dev.address}, " f"it's either empty or there are no files that " f"weren't filtered by the 'only_download_files' or " f"'never_download_files' options" ) continue # Determine file path to use for download if dirname != "/" and not (dev.extra.get("sel_old_ftp") or dev.options["sel"]["old_ftp"]): # Telnet requires space between directory and file name # Technically you can get away with a slash, but some devices don't like this. sep = "/" if isinstance(comms, FTP) else " " paths = [f"{dirname}{sep}{file}" for file in filenames] else: paths = filenames # Save files to a sub-directory matching the structure on the device, # if file saving is enabled (if configs_dir != None). sub_dir = configs_dir if configs_dir and dirname != "/": sub_dir = configs_dir / dirname try: # Change directory if needed for older devices if (dev.extra.get("sel_old_ftp") or dev.options["sel"]["old_ftp"]) and isinstance( comms, FTP ): comms.cd(dirname) # cd <dirname> => retr <file> => cd .. downloaded = download_files( files=paths, comms=comms, save_dir=sub_dir, handle_download_errors=dev.options["sel"].get("handle_download_errors", True), ) all_files.update(downloaded) # Restore directory location if (dev.extra.get("sel_old_ftp") or dev.options["sel"]["old_ftp"]) and isinstance( comms, FTP ): comms.cd("..") except Exception as ex: log.warning( f"Failed to retrieve files from {dirname} on {dev.address} via {method}: {ex}" ) duration = timeit.default_timer() - start_time log.info( f"Pulled {len(all_files)} files from {dev.address} via {method} " f"(duration: {utils.fmt_duration(duration)})" ) log.debug(f"Files pulled from {dev.address} via {method}: {', '.join(all_files.keys())}") # TODO: if "SET_ALL.TXT" not in files and SET_x.TXT in files, # then reconstruct SET_ALL from individual settings files dev._cache["all_files"] = all_files # Add files to related.files, in case they aren't already for file_dict in all_files.values(): dev.related.files.add(file_dict["name"]) return all_files
[docs] def populate_file_listing(dev: DeviceData, comms: FTP | SELTelnet | SELSerial) -> None: # ["DIAGNOSTICS", "EVENTS", "HMI", "REPORTS", "SER", "STATUS", "SETTINGS"] if not dev.extra.get("file_listing"): if isinstance(comms, FTP): slash_listing = comms.nlst_files() else: slash_listing = comms.list_files() dev.extra["file_listing"] = {"/": clean_filenames(slash_listing)} if not dev.extra.get("settings_root_directory"): if "SETTINGS" in dev.extra["file_listing"]["/"]: dev.extra["settings_root_directory"] = "SETTINGS" else: dev.extra["settings_root_directory"] = "/" log.debug( f"Settings root directory for {dev.address}: '{dev.extra['settings_root_directory']}'" ) root_files = [] files_on_dev = set() dirs_on_dev = set() for file in dev.extra["file_listing"]["/"]: # Process sub-directories # Note(cegoes): Files will always have "." as far as I know if "." not in file and file not in dev.extra["file_listing"]: if isinstance(comms, FTP): listing = clean_filenames(comms.nlst_files(file)) else: listing = clean_filenames(comms.list_files(file)) # Handle 351 and older devices that act in a weird way... # 'ls <directory>' doesn't work, you have to cd to the directory, # then run 'ls' to get a proper file listing. # cd <dirname> => ls => cd .. if file in listing and isinstance(comms, FTP): comms.cd(file) listing = clean_filenames(comms.nlst_files()) comms.cd("..") dev.extra["sel_old_ftp"] = True log.trace(f"Files in '{file}' from {dev.address}: {listing}") files_on_dev.update(listing) dirs_on_dev.add(file) # Sets the sub-directory to be a separate part of the file listing # dict from the files in the root directory, e.g. # dev.extra["file_listing"]["/"] = ["CFG.XML", ...] # dev.extra["file_listing"]["SETTINGS"] = ["SET_1.TXT", ...] dev.extra["file_listing"][file] = listing else: root_files.append(file) files_on_dev.add(file) dirs_on_dev.add("/") dev.extra["file_listing"]["/"] = root_files dev.extra["num_directories_on_device"] = len(dirs_on_dev) dev.extra["num_files_on_device"] = len(files_on_dev) dev.extra["all_dirs_on_device"] = sorted(dirs_on_dev) dev.extra["all_files_on_device"] = sorted(files_on_dev) dev.related.files.update(files_on_dev)