Source code for peat.modules.schneider.m340.m340_pull

"""
Methods to pull from Schneider Modicon M340 PLCs.

This includes process logic, configuration, and firmware.

Authors

- Christopher Goes
- Mark Woodard
- Patrica Schulz
"""

import binascii
import socket

from peat import CommError, utils
from peat import log as peat_logger
from peat.protocols import FTP, SNMP

from .umas_packets import UMASResponse, send_umas_packet

# TODO: use Schneider MIB and proper object names instead of raw OIDs
# The MIB just needs to be compiled like we did for the SIPROTEC MIBs
# misc/m340/SchneiderTFE-V01-04.mib


# TODO: what parts of this are status, and what are config?
[docs] def pull_network_config(ip: str, timeout: float = 1.0, snmp_community: str = "public") -> dict: """ Pull configuration information using SNMP, FTP, and Modbus/TCP. Args: ip: IP address of the device timeout: Time to wait for a service to respond, in seconds snmp_community: SNMPv1 Community String to use Returns: The device configuration information """ log = peat_logger.bind(target=ip) log.debug(f"Pulling configuration using network services for Schneider device {ip}...") device_info = {} # Get configuration information from network sources # TODO: verified_services for service in device_info.get("verified_services", ["FTP", "SNMP", "Modbus/TCP"]): # TODO: snmp "verification" may "fail" due to a incorrect community string # We can get a correct one from the project file config # and re-verify using updated string if we find it in the config if service == "SNMP": try: # Pull data from service and add to info snmp_module, system = _get_snmp_metadata( ip, community=snmp_community, timeout=timeout ) except Exception as err: log.exception(f"Could not pull SNMP metadata: {err}") continue # Skip to next service if there is an error # Add the overall system info straight into the device info device_info.update(system) # Add the information for each module to its corresponding dict module_name = "module_" + str(snmp_module["slot"]) # module_<slot #> if module_name in device_info: # If module exists, add the info device_info[module_name].update(snmp_module) else: # If it doesn't, create it using the info device_info[module_name] = snmp_module elif service == "FTP": try: # Pull and parse data from service ftp_info = _get_ftp_metadata(ip, timeout=timeout) except Exception as err: log.exception(f"Could not pull FTP metadata: {err}") continue # Skip to next service if there is an error # Merge the FTP info into overall device info for index, ftp_value in ftp_info.items(): # TODO: check slot number found_device = False if isinstance(ftp_value, dict) and "device" in ftp_value: for j, info_value in device_info.items(): if ( isinstance(info_value, dict) and info_value.get("model_name") == ftp_value["device"] ): # Check if MAC address found via FTP differs if ( "mac_address" in info_value and "mac_address" in ftp_value and info_value["mac_address"] != ftp_value["mac_address"] ): log.warning( f"Conflicting MAC addresses found for " f"module {j!s} in device {ip}" ) device_info[j]["conflicted_mac_address"] = info_value[ "mac_address" ] # Merge the module dicts info_value.update(ftp_value) found_device = True break if not found_device: device_info[index] = ftp_value # For things without a module else: # Non-devices (e.g Loader status) device_info[index] = ftp_value elif service == "Modbus/TCP": try: # Pull data from service and add to info device_info.update(_get_modbus_metadata(ip, timeout=timeout)) except Exception as ex: log.error(f"Could not pull Modbus/TCP metadata. Error: {ex}") continue # Skip to next service if there is an error else: log.error(f"Unknown service for device {ip}: {service!s}") log.debug( f"Finished pulling configuration using network services for Schneider device {ip}", ) return device_info
def _get_snmp_metadata( ip: str, community: str = "public", timeout: float = 0.5, max_failures: int = 7 ) -> tuple[dict, dict]: """ Gets as much device metadata as possible over SNMP. Args: ip: IPv4 address of the device community: SNMP Community string to use timeout: SNMP protocol timeout, in seconds max_failures: max number of failed requests before raising an exception Returns: Info for the module hosting SNMP and the overall system, respectfully """ log = peat_logger.bind(target=ip) log.info(f"Pulling configuration information for device {ip} over SNMP...") module_info = {} system_info = {} # TODO: make these dicts module-level global constants, mark as typing.Final # SNMP OIDs to pull from device. # Format: "info_returned": "fully-qualified_oid" # Refer to the .MIB file for details on each OID. # # Since these OIDs are non-mandatory, the requests to make # depend on communicationServices results. # # OIDs for information specific to the module running the SNMP server. # First key in tuple is OID, second is type. module_oids = { # Slot of the CPU "slot": ("4.1.3833.1.7.10.0", int), # Full name and version of hardware "model_name": ("4.1.3833.1.7.1.0", str), "module_description": ("2.1.1.1.0", str), # Status of module (1=bad, 2=good) "module_status": ("4.1.3833.1.7.4.0", int), "mac_address": ("4.1.3833.1.7.18.0", str), "configured_ipv4_address": ("4.1.3833.1.7.15.0", str), "configured_ipv4_netmask": ("4.1.3833.1.7.16.0", str), "configured_ipv4_gateway": ("4.1.3833.1.7.17.0", str), # Static or DHCP "ip_config_mode": ("4.1.3833.1.7.5.0", str), # Number of network interfaces present in the module, regardless of state "num_network_interfaces": ("2.1.2.1.0", int), "bandwidth_management": ("4.1.3833.1.7.7.0", int), "communication_services": ("4.1.3833.1.7.3.0", int), } # OIDs with SMTP email server configuration and status information smtp_oids = { "service_status": ("4.1.3833.1.9.1.1.1.2.0", int), "ipv4_address": ("4.1.3833.1.9.1.1.1.3.0", str), "mail_sent_count": ("4.1.3833.1.9.1.1.1.4.0", int), "error_count": ("4.1.3833.1.9.1.1.1.5.0", int), "last_error": ("4.1.3833.1.9.1.1.1.6.0", int), "last_mail_elapsed_time": ("4.1.3833.1.9.1.1.1.7.0", int), "link_service_status": ("4.1.3833.1.9.1.1.1.8.0", str), # The number of time that the link to the SMTP server has been detected down. "server_check_fail_count": ("4.1.3833.1.9.1.1.1.9.0", int), } # OIDs with Port 502 service configuration and status information port_502_oids = { # The protocols supported by the Port502 Messaging service: # modbusThroughGateway(1) -- MODBUS protocol through UNITE Gateway # modbusDirect (2) -- MODBUS direct # unite(4) -- UNITE protocol only # modbusThroughGatewayAndUnite(5) -- MODBUS through Gateway and UNITE protocol # modbusDirectAndUnite(6) -- MODBUS direct and UNITE protocol "supported_protocol": ("4.1.3833.1.2.2.0", int), "service_status": ("4.1.3833.1.2.1.0", str), # 1=disabled, 2=enabled "port_security": ("4.1.3833.1.2.3.0", int), "max_connections": ("4.1.3833.1.2.4.0", int), "total_messages_received": ("4.1.3833.1.2.9.0", int), "total_messages_sent": ("4.1.3833.1.2.10.0", int), "total_errors_sent": ("4.1.3833.1.2.11.0", int), # TODO: enumerate through all entries in IPSecurityTable (4.1.3833.1.2.7) # Basically, the offending IP address, and how many attempts they made } # OIDs with Web server configuration and status information web_oids = { "service_status": ("4.1.3833.1.5.1.0", int), # 1=disabled, 2=enabled "password_status": ("4.1.3833.1.5.2.0", int), "successful_attempts": ("4.1.3833.1.5.3.0", int), "failed_attempts": ("4.1.3833.1.5.4.0", int), } # OIDs with system-wide information system_oids = { # profileCPUType "full_model": ("4.1.3833.1.7.11.0", str), "system_uptime": ("2.1.31.1.5.0", int), "implementation_class": ("4.1.3833.1.7.19.0", str), "firmware_version": ("4.1.3833.1.7.2.0", str), } # Create SNMP object to use for pulling data snmp = SNMP(ip=ip, timeout=timeout, community=community) num_failures = 0 # Pull information for the module running the SNMP server # TODO: threading would help a lot here log.debug("Pulling SNMP device modules info...") for name, oid_pair in module_oids.items(): response = snmp.get(f"1.3.6.1.{oid_pair[0]}") if not response: num_failures += 1 if num_failures >= max_failures: raise CommError( f"Number of failed SNMP query attempts exceeded max of {max_failures}" ) continue data = oid_pair[1](response[0]["value_string"]) # Convert data if data != "": if name == "mac_address": # Handle MAC address being represented as # raw bytes and not text characters. mac = binascii.b2a_hex(bytes(data, "utf-8")).decode().upper() module_info["mac_address"] = ":".join( a + b for a, b in zip(mac[::2], mac[1::2], strict=False) ) elif "ipv4" in name: # snmp.get() will generate a IP string module_info[name] = data elif name == "module_status": module_info[name] = "ok" if data == 2 else "nok" elif name == "bandwidth_management": module_info[name] = "enabled" if data == 2 else "disabled" elif name == "communication_services": srvcs = {} bits = [int(x) for x in f"{int(data):07b}"] srvcs["port_502_messaging"] = "supported" if bits[0] == 1 else "unsupported" srvcs["io_scanning"] = "supported" if bits[1] == 1 else "unsupported" srvcs["global_data"] = "supported" if bits[2] == 1 else "unsupported" srvcs["web"] = "supported" if bits[3] == 1 else "unsupported" srvcs["address_server"] = "supported" if bits[4] == 1 else "unsupported" srvcs["time_management"] = "supported" if bits[5] == 1 else "unsupported" srvcs["email"] = "supported" if bits[6] == 1 else "unsupported" module_info[name] = srvcs else: module_info[name] = data # Pull SMTP email server information for the module smtp_info = {} log.debug("Pulling SNMP email server info...") for name, oid_pair in smtp_oids.items(): response = snmp.get(f"1.3.6.1.{oid_pair[0]}") if not response: continue data = oid_pair[1](response[0]["value_string"]) # Convert data if data != "": if "ipv4" in name: # snmp.get() will generate a IP string smtp_info[name] = data elif name == "link_service_status": # 1 = nok, SMTP server is unreachable # 2 = ok, SMTP server can be reached smtp_info[name] = "reachable" if data == 2 else "unreachable" elif name == "service_status": if data == 1: smtp_info[name] = "no configuration" elif data == 2: smtp_info[name] = "operational and running" else: smtp_info[name] = "stopped" else: smtp_info[name] = data module_info["smtp_server"] = smtp_info # Pull Port 502 service information for the module supp_protos = { 1: "modbusThroughGateway", 2: "modbusDirect", # Don't ask me what 3 is...it's like Station 9-3/4 4: "unite", 5: "modbusThroughGatewayAndUnite", 6: "modbusDirectAndUnite", } port_502_info = {} log.debug("Pulling SNMP Port502 info...") for name, oid_pair in port_502_oids.items(): response = snmp.get(f"1.3.6.1.{oid_pair[0]}") if not response: continue # Convert data data = oid_pair[1](response[0]["value_string"]) if data != "": if name == "port_security": port_502_info[name] = "enabled" if data == 2 else "disabled" elif name == "supported_protocol": port_502_info[name] = supp_protos[data] elif name == "service_status": port_502_info[name] = "operational" if data == 2 else "no configuration" else: port_502_info[name] = data module_info["port_502"] = port_502_info # Pull Web server info web_info = {} log.debug("Pulling SNMP web server info...") for name, oid_pair in web_oids.items(): response = snmp.get(f"1.3.6.1.{oid_pair[0]}") if not response: continue # Convert dataQ data = oid_pair[1](response[0]["value_string"]) if data != "": if name == "password_status": web_info[name] = "enabled" if data == 2 else "disabled" elif name == "service_status": web_info[name] = "operational" if data == 2 else "no configuration" else: web_info[name] = data module_info["web_server"] = web_info # Get information on LEDs on the module and their state # TODO: make more generic by dynamically determining # how many LEDs there are at runtime. led_oids = { "name": ("4.1.3833.1.7.9.1.2", str), "description": ("4.1.3833.1.7.9.1.3", str), "state": ("4.1.3833.1.7.9.1.4", int), } led_info = {} log.debug("Pulling SNMP LED info...") for name, oid_pair in led_oids.items(): # Pull information on each LED over SNMP responses = snmp.get( identity=f"1.3.6.1.{oid_pair[0]}", single_query=False, # TODO: manual limit as workaround until MIB import is added # Assume there are 5 LEDs query_limit=5, ) if not responses: continue led_info[name] = [oid_pair[1](x["value_encoded"]) for x in responses] for i in range(5): # Convert from the lists of values per OID to dict of info per LED info = {} for k in led_oids.keys(): info[k] = led_info[k][i] module_info["led_" + str(i)] = info # Pull information for the overall system log.debug("Pulling SNMP overall system info...") for name, oid_pair in system_oids.items(): response = snmp.get(f"1.3.6.1.{oid_pair[0]}") if not response: continue data = oid_pair[1](response[0]["value_string"]) if data != "": system_info[name] = data return module_info, system_info def _get_ftp_metadata( ip: str, user: str = "loader", passwd: str = "fwdownload", port: int = 21, timeout: float = 5.0, ) -> dict: """ Gets as much device metadata as possible over FTP. Args: ip: IPv4 address of the device user: FTP username passwd: FTP password port: TCP port for FTP service timeout: Number of seconds to wait before timing out Returns: The information found, or empty :class:`dict` if no info was found """ log = peat_logger.bind(target=f"{ip}:{port}") log.info(f"Pulling configuration information for {ip}:{port} via FTP...") try: with FTP(ip, port, timeout) as ftp: if not ftp.login(user, passwd): log.error(f"Failed FTP login on {ip}:{port}") return {} ldst_data = ftp.cmd("LDST") dinf_data = ftp.cmd("DINF") sd_data = ftp.cmd("FREE") except Exception as ex: log.error(f"Failed to pull FTP data from {ip}: {ex}") return {} device_info = {} # ** Loader status (LDST) ** loader_status = {} ldst_data = ldst_data.split("\n")[1] # Remove "200" lines ldst_data = ldst_data.replace(" ", "").split(",") # Clean spaces and split for pair in ldst_data: split = pair.split("=") loader_status[utils.convert_to_snake_case(split[0])] = split[1] device_info["loader_status"] = loader_status # ** Device info (DINF) ** # Each module is defined by: FwLoc, HwId, FwId, Device, Ir, Desc, Date, MAC # Split, Remove "200" lines, recombine dinf_data = "".join(dinf_data.split("\n")[1:-1]) module_number = 0 next_str, parted, curr_str = dinf_data.rpartition("FwLoc=") while "FwLoc=" in parted: # Split into key-values, remove trailing whitespace and single quotes module_data = [x.strip().replace("'", "") for x in curr_str.split(",")] # Remove empty strings module_data = list(filter(None, module_data)) module_info = {} module_tag = "device_" + str(module_number) # Get the metadata from the key-value pairs module_info["fw_loc"] = module_data[0] for pair in module_data[1:]: split = pair.split("=") if split[0] == "Desc": key = "description" elif split[0] == "HwId": key = "hardware_id" else: key = utils.convert_to_snake_case(split[0]) if key == "ir": module_info[key] = int(split[1]) # TODO: is "Date" the manufacture date or firmware flash date? elif key == "date": module_info["timestamp"] = split[1].replace("#dt", "") elif key == "mac": module_info["mac_address"] = split[1].replace("-", ":") else: module_info[key] = split[1] # Save the module info device_info[module_tag] = module_info # Move to the next section (next module) next_str, parted, curr_str = next_str.rpartition("FwLoc=") module_number += 1 # ** Free space on the SD card (FREE) ** device_info["free_space_sd_card"] = int(sd_data.split("=")[-1].strip()) return device_info def _get_modbus_metadata(ip: str, timeout: float = 5.0, port: int = 502) -> dict: """ Gets as much device metadata as possible over Modbus/TCP. Args: ip: IPv4 address of the device timeout: Number of seconds to wait before timing out port: TCP port of the Modbus service Returns: The information found, or empty :class:`dict` if none was found """ log = peat_logger.bind(target=f"{ip}:{port}") log.info(f"Pulling configuration information from {ip} over Modbus/TCP...") # TODO: put socket in a with statement sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) try: sock.connect((ip, port)) except OSError as err: log.error(f"Failed to pull Modbus metadata from {ip}: {err}") sock.close() return {} device_info = {} # Get the memory card model # Source: github.com/digitalbond/Redpoint/blob/master/modicon-info.nse # TODO: is there more information from this? payload = binascii.unhexlify("01bf00000005005a000606") response = send_umas_packet(sock, payload, UMASResponse) device_info["memory_card_model"] = bytes(response.payload)[3:-1].decode() sock.close() log.debug(f"Metadata found over Modbus for device {ip}: {device_info!s}") return device_info