import traceback
from time import sleep
from typing import Any
import peat
from peat import CommError, log
from . import forked_telnetlib as telnetlib
[docs]
class Telnet:
"""
Telnet functionality for interacting with devices.
Added features:
- Improved error handling
- Improved cleanup of connections on exit, even if exceptions happen
- Logging messages
- Records all commands and responses and saves to a file
- Simpler function calls
"""
# These will vary by device and should be overridden as needed
ENCODING: str = "utf-8"
PRE_WRITE_SLEEP: float = 0.15
POST_WRITE_SLEEP: float = 0.1
READ_DELAY: float = 0.15
LINE_TERMINATOR: str = "\r\n"
def __init__(self, ip: str, port: int = 23, timeout: float = 5.0) -> None:
self.ip: str = ip
self.port: int = port
self.timeout: float = timeout
self.log = log.bind(
classname=self.__class__.__name__,
target=f"{self.ip}:{self.port}",
)
self.connected: bool = False
self.all_output: list[str] = []
self.raw_output: list[bytes] = []
self.info: dict[str, Any] = {}
self.successful_creds: tuple[str, str] | None = None
self._comm: telnetlib.Telnet = telnetlib.Telnet()
self.log.trace2(f"Initialized {repr(self)}")
def __enter__(self) -> "Telnet":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.disconnect()
if exc_type:
self.log.debug(f"Unhandled exception while exiting - {exc_type.__name__}: {exc_val}")
self.log.trace(
f"Exception traceback\n"
f"{''.join(traceback.format_tb(exc_tb))}"
f"{exc_type.__name__}: {exc_val}"
)
def __str__(self) -> str:
return self.ip
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.ip}, {self.port}, {self.timeout})"
@property
def comm(self) -> telnetlib.Telnet:
"""
Python Telnet instance used for interacting with the device.
"""
if not self.connected:
try:
self._comm.open(self.ip, self.port, self.timeout)
self.connected = True
self.log.info(f"Connected to {self.ip}:{self.port}")
except Exception as ex:
raise CommError(f"Failed to connect to {self.ip}:{self.port}: {ex}") from ex
return self._comm
@comm.setter
def comm(self, obj: telnetlib.Telnet) -> None:
self._comm = obj
[docs]
def test_connection(self) -> bool:
"""
Test connection to the device.
"""
try:
if self.comm:
return True
return False
except CommError:
return False
[docs]
def disconnect(self) -> None:
"""
Cleanly disconnect from the device.
"""
if self.connected and self._comm:
self._comm.close()
self.connected = False
self.log.debug(f"Disconnected from {self.ip}")
if self.all_output:
# Save the raw output to disk as an artifact
try:
dev = peat.data.datastore.get(self.ip)
dev.write_file(self.all_output, "raw-telnet-output.json", merge_existing=True)
except Exception as ex:
self.log.warning(f"Failed to write raw output to file: {ex}")
[docs]
def write(self, command: bytes | str | int, flush: bool = False) -> None:
"""
Send a Telnet command to the device.
Args:
command: Command to send (this will be automatically encoded)
flush: If the responses to the command should be dropped
"""
self.log.debug(f"Writing command: {command}")
# If it's an int, first convert to a string, which will then be encoded
if isinstance(command, int):
command = str(command)
# If it's a string, encode to bytes with the device's encoding
if isinstance(command, str):
command = command.encode(self.ENCODING)
# Append the line terminator (encode here to ensure proper encoding)
command += self.LINE_TERMINATOR.encode(self.ENCODING)
# NOTE(cegoes): If we don't sleep, things get REALLY wonky.
# Wasted an hour figuring out why the exact same telnet commands
# worked in the REPL, but not the script...well, it was the pauses
# between sending and reading commands. JustTelnetThings.
sleep(self.PRE_WRITE_SLEEP)
self.comm.write(command)
sleep(self.POST_WRITE_SLEEP)
if flush:
self.comm.read_very_eager()
[docs]
def login(self, user: str, passwd: str) -> dict | None:
"""
Login to the telnet device.
"""
pass
[docs]
def read(self, delay: float | None = None, strip_whitespace: bool = True) -> str:
"""
Read all data currently in the telnet response buffer.
This is a stateful operation, so calling this method again will
not result in the same information. The results are saved in the
``all_output`` class attribute for future access.
Args:
delay: Seconds to sleep before querying for data
strip_whitespace: If ``str.strip()`` should be called on the results
Returns:
Decoded data read from the Telnet receive stream
"""
if delay is None:
delay = self.READ_DELAY
sleep(delay)
return self._add_data(self.comm.read_very_eager(), strip_whitespace)
[docs]
def read_until(
self,
until: bytes | str,
delay: float = 0.15,
timeout: float | None = None,
strip_whitespace: bool = True,
) -> str:
"""
Read the telnet response buffer until the specified string is reached.
This is a stateful operation, so calling this method again will
not result in the same information. The results are saved in the
all_output class attribute for future access.
Args:
until: String to read all data up to
delay: Seconds to sleep before querying for data
timeout: Seconds to wait for the string before timing out
(if None, this defaults to the Telnet class's timeout configuration)
strip_whitespace: If ``strip()`` should be called on the results
Returns:
Decoded data read from the telnet receive stream
"""
if isinstance(until, str):
until = until.encode(self.ENCODING)
if timeout is None:
timeout = self.timeout
sleep(delay)
return self._add_data(self.comm.read_until(until, timeout), strip_whitespace)
[docs]
def _add_data(self, raw_data: bytes, strip_whitespace: bool = True) -> str:
"""
Decodes and saves responses from device.
"""
self.raw_output.append(raw_data)
data = raw_data.decode(self.ENCODING)
self.all_output.append(data)
if "Goodbye" in data:
self.log.debug("Device said goodbye")
self.disconnect()
if strip_whitespace:
data = data.strip()
return data
__all__ = ["Telnet"]