Source code for peat.modules.sel.sel_telnet
from time import sleep
from peat import datastore
from peat.protocols import Telnet
from .sel_ascii import SELAscii
[docs]
class SELTelnet(SELAscii):
"""
Telnet wrapper for SEL relays.
This is a transport implementation of :class:`~peat.modules.sel.sel_ascii.SELAscii`.
Refer to :class:`~peat.modules.sel.sel_ascii.SELAscii` for functions/commands to run.
.. code-block:: python
:caption: Example usage of SELTelnet with a SEL 451 on the PEAT testing rack
>>> from peat.modules.sel.sel_telnet import SELTelnet
>>> tn = SELTelnet('192.0.2.123') # doctest: +SKIP
>>> tn.elevate(1) # doctest: +SKIP
True
>>> tn.list_files() # doctest: +SKIP
['CFG.TXT', 'CFG.XML', 'EVENTS', 'REPORTS', 'SETTINGS', 'SWCFG.ZIP', 'SYNCHROPHASORS']
>>> tn.disconnect() # doctest: +SKIP
"""
def __init__(self, ip: str, port: int = 23, timeout: float = 5.0) -> None:
super().__init__(ip, timeout) # Initialize SELAscii
self.ip: str = ip # hack to allow agnostic use between SELTelnet and FTP
self.port: int = port
self.telnet_download_capable: bool | None = None
self._comm: Telnet | None = None
self.log.trace(f"Initialized {repr(self)}")
@property
def comm(self) -> Telnet:
if not self._comm:
self._comm = Telnet(self.address, self.port, self.timeout)
# TODO: hack to copy attributes since Telnet isn't a superclass
self._comm.ENCODING = self.ENCODING
self._comm.PRE_WRITE_SLEEP = self.PRE_WRITE_SLEEP
self._comm.POST_WRITE_SLEEP = self.POST_WRITE_SLEEP
self._comm.READ_DELAY = self.READ_DELAY
return self._comm
@comm.setter
def comm(self, obj: Telnet) -> None:
self._comm = obj
[docs]
def test_connection(self) -> bool:
return self.comm.test_connection()
[docs]
def disconnect(self) -> None:
if self._comm and self._comm.connected:
try:
if self.priv_level == 0:
# Attempt to capture useful output from "quit"
# while at level 0, such as RID, TID, and time.
self.write("quit")
self.write("exit")
else:
self.write("exit")
except Exception:
pass
# Ensure anything left in Telnet buffer gets saved to all_output
try:
self.read()
# Ignore errors from telnetlib about connection being closed
except Exception:
pass
self.priv_level = 0
self._comm.disconnect()
self._comm = None
[docs]
def read(self, delay: float | None = None, strip_whitespace: bool = True) -> str:
data = self.comm.read(delay, strip_whitespace=False)
self.all_output.append(data)
if getattr(self.comm, "raw_output", None):
self.raw_output.append(self.comm.raw_output[-1])
if strip_whitespace:
data = data.strip()
return data
[docs]
def read_until(
self,
until: bytes | str,
strip_whitespace: bool = True,
) -> str:
data = self.comm.read_until(until)
self.all_output.append(data)
if getattr(self.comm, "raw_output", None):
self.raw_output.append(self.comm.raw_output[-1])
if strip_whitespace:
data = data.strip()
return data
[docs]
def can_download_files(self) -> bool:
"""
If file downloads are possible via Telnet.
"""
if self.telnet_download_capable is not None:
return self.telnet_download_capable
self.log.info(f"Testing if {self.address} can download files via Telnet")
self._ensure_priv("can_download_files", level=1)
self.read() # Flush any output from previous operations
self.write("fil show")
sleep(self.READ_DELAY)
result = self.read()
if not result:
self.log.error(
f"No response from {self.address} for command 'fil show'. "
f"Marking as unable to download files."
)
self.telnet_download_capable = False
elif "command requires" not in result.lower():
self.log.error(
f"{self.address} does not support the 'file show' command. "
f"It is likely an older model such as a 451 or 2032 "
f"using an older version of SEL's ASCII protocol."
)
self.telnet_download_capable = False
else:
self.telnet_download_capable = True
return self.telnet_download_capable
[docs]
def download_binary(self, filename: str, save_to_file: bool = True) -> str | None:
"""
Download a file from the device.
.. warning::
File downloads require level 1 (``acc``) permissions on all devices
we've seen. Ensure you have elevated the login level with
:meth:`~peat.modules.sel.sel_telnet.SELTelnet.elevate`
before calling this method!
.. note::
Older devices, notably the 451 and 2032 on the PEAT rack, don't
support the ``file show`` command and therefore CANNOT download
files via Telnet.
.. note::
The name ``download_binary`` is used for compatibility with
code that calls the same method on instances of
:class:`~peat.protocols.ftp.FTP`. The interfaces
are functionally identical.
Args:
filename: *Case-insensitive* name of the file to download or the
directory and file pair, separated by a space.
save_to_file: If the data should be automatically written to a file
in the output directory for the device matching ``self.address``.
Returns:
The contents of the file, or :obj:`None` if there was an error
Raises:
DeviceError: Automatic privilege elevation failed
"""
# TODO: figure out if it's possible to download files via telnet
# on devices that don't support "show". I tried using a Ymodem
# library with the Telnet socket and object to pull after executing
# a 'fil read' command but no dice. I suspect 'fil read' executes
# a file transfer over a modem line or some other interface and
# not the telnet connection itself.
if not self.can_download_files():
return None
self.log.info(f"Reading file '{filename}' from {self.address}")
self._ensure_priv("download_binary", level=1)
self.read() # Flush any output from previous operations
# "show" supported: 700G, 351S, 351,
# "read": 451, 2032 (Requires Ymodem to transfer the file?)
self.write(f"fil show {filename}")
sleep(self.READ_DELAY) # Give tn time to load file
raw_file = ""
# ensure command and 0x02 character doesn't appear in chunk
first_parts = self.read(strip_whitespace=False).splitlines()
if not first_parts:
self.log.error(f"No data for file {filename} from {self.address}")
return None
index = 0
while "fil show" in first_parts[index] or (
"=>" in first_parts[index] and index < len(first_parts)
):
index += 1
chunk = "\r\n".join(first_parts[index:])
raw_file += chunk
while chunk and "\x03" not in chunk:
chunk = self.read(strip_whitespace=False)
raw_file += chunk
# 0x02: START OF TEXT
# 0x03: END OF TEXT
match = self.DATA_REGEX.search(raw_file)
if not match:
self.log.error(
f"Failed to match the data section read of file "
f"'{filename}' from {self.address} via Telnet. It may"
f"be a text file or something we haven't seen yet. "
f"Returning the raw data."
)
return raw_file
self.log.info(f"Finished reading file '{filename}' from {self.address}")
data = match.groups()[0]
if data and save_to_file:
datastore.get(self.address).write_file(data, filename)
if data:
data = data.replace("\x02", "").replace("\x03", "")
return data
__all__ = ["SELTelnet"]