Source code for peat.modules.camlin.totus
from peat import DeviceData, DeviceModule, IPMethod
from .totus_http import TotusHTTP
# TODO: need to get the OS version somehow
# could use output from "hostnamectl" while ssh'd in
# TODO: see if we can download arbitrary files via an API
# TODO: download export
# Generate and download
# Download existing export if available
# TODO: parse export file/directory using PEAT
# Generate an export
# POST http://localhost/totus/api/1.0/exports
# {"startTime":1602655200000,"endTime":1634191200000,
# "historicalData":true,"pdPatterns":true,"integoRaw":true,
# "tfcData":true,"tcmSyslog":true,"tcmKernelLog":true,
# "tcmCalcStates":true,"dgaLogs":true}
# Get exports
# http://localhost/totus/api/1.0/exports
# Download export
# Get download url from "downloadUrl" for a export in list
# GET http://localhost/totus/api/1.0/exports/1634196496957?compress=1
# Remove the generated export from the device
# DELETE http://localhost/totus/api/1.0/exports/1634196496957
# TODO: Extract IP from login.html ("issuer" parameter in href)
[docs]
class Totus(DeviceModule):
"""PEAT module for the Camlin Totus G9 Dissolved Gas Analyzer (DGA).
Listening services
- SSH (TCP 22)
- HTTP (TCP 80)
Data collected
- Metadata
- Status
- Network configuration
- DNP3 registers
- Modbus registers
Authors
- Christopher Goes
- Thomas Byrd
"""
device_type = "DGA"
vendor_id = "Camlin" # aka Camlin Energy, Camlin Group
vendor_name = "Camlin Ltd"
brand = "Totus"
model = "G9"
default_options = {"http": {"user": "", "pass": ""}}
@classmethod
def _verify_http(cls, dev: DeviceData) -> bool:
"""Verify the device is a DGA via HTTP."""
cls.log.debug(f"Checking {dev.ip} via HTTP")
try:
with TotusHTTP(
ip=dev.ip,
port=dev.options["http"]["port"],
timeout=dev.options["http"]["timeout"],
dev=dev,
) as http:
response = http.get("")
if not response:
cls.log.debug(f"Failed to verify {dev.ip} via HTTP: no response")
return False
page = response.text
if not page:
cls.log.debug(
f"Failed to verify {dev.ip} via HTTP: no page content in response"
)
return False
# Check for Totus-specific strings in the homepage HTML content
if (
"<title>TOTUS</title>".lower() in page.lower()
or "totus-webapp." in page.lower()
):
cls.log.debug(f"Successfully verified {dev.ip} using HTTP")
return True
cls.log.debug(f"Failed to verify {dev.ip} via HTTP")
except Exception:
cls.log.exception(f"failed to verify {dev.ip} via HTTP due to an unhandled exception")
return False
@classmethod
def _pull(cls, dev: DeviceData) -> bool:
cls.log.info(f"Pulling from {dev.ip} via HTTP")
if not dev._cache.get("totus_http_session"):
dev._cache["totus_http_session"] = TotusHTTP(
ip=dev.ip,
port=dev.options["http"]["port"],
timeout=dev.options["http"]["timeout"],
dev=dev,
)
http = dev._cache["totus_http_session"]
if not http.login(
username=dev.options["http"]["user"], password=dev.options["http"]["pass"]
):
cls.log.error(f"Failed to pull from {dev.ip}: http login failed")
return False
dev.related.user.add(dev.options["http"]["user"])
at_least_one_success = http.get_and_process_all(dev)
# If all methods failed, exit with error
if not at_least_one_success:
return False
return True
Totus.ip_methods = [
IPMethod(
name="Totus DGA scrape HTTP homepage",
description=str(Totus._verify_http.__doc__).strip(),
type="unicast_ip",
identify_function=Totus._verify_http,
reliability=6,
protocol="http",
transport="tcp",
default_port=80,
)
]
__all__ = ["Totus"]