"""
ControlLogix Common Industrial Protocol (CIP) implementation.
Originally based on code from Agostino Ruscito's pycomm
library and heavily modified.
Authors
- Craig Buchanan
- Christopher Goes
"""
from __future__ import annotations
from typing import Any
from peat import config, log
from peat.protocols.cip import *
from peat.protocols.data_packing import *
from peat.protocols.enip import EnipDriver, EnipSocket
from .clx_const import *
PathType = tuple | tuple[int, Any]
TagData = int | bytes
DataListType = tuple[TagData, int]
FragListType = tuple[TagData, int, bytes]
TemplateType = dict[str, str | dict]
AttrTags = tuple[dict[int, dict], dict]
AttrsType = dict[int, int] # ??
# TODO: figure out the tuple element types, annotate accordingly
# Some potential candidates (will need to determine with a debugger at runtime)
# ClassPath: tuple[int, int]
# InstancePath:
# AttributeList:
# TODO: write debugging data to file (instead of to normal log at debug level 4)
# TODO: generalize response handling code, currently duplicated across multiple methods
[docs]
class ClxCIP:
"""
Common Industrial Protocol (CIP) implementation for
Allen-Bradley ControlLogix devices.
"""
def __init__(self, ip: str, port: int, timeout: float = 5.0, cpu_slot: int = 0):
self.enip_socket = EnipSocket(ip, port, timeout)
self.driver = EnipDriver(self.enip_socket, cpu_slot)
self.log = log.bind(
classname=self.__class__.__name__,
target=f"{ip}[slot {cpu_slot}]",
)
self.log.trace(f"Initialized {repr(self)}")
def __enter__(self) -> ClxCIP:
if not self.open():
raise ConnectionError(f"Failed to connect to {str(self)}")
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
retval = self.close()
if exc_type:
self.log.debug(f"{exc_type.__name__}: {exc_val}")
return retval
def __str__(self) -> str:
return f"{self.enip_socket.ip}:{self.enip_socket.port}"
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}({self.enip_socket.ip}, "
f"{self.enip_socket.port}, {self.enip_socket.timeout}, "
f"{self.driver.cpu_slot})"
)
[docs]
def open(self) -> bool:
"""
Prepares the driver for operation.
"""
if self.driver.open():
self.log.info("CIP connection SUCCESS")
return True
return False
[docs]
def close(self) -> bool:
"""
Cleans up the driver when finished.
"""
return self.driver.close()
[docs]
def get_all_data(self) -> dict[str, dict]:
self.log.info("Pulling all data")
# Template data
temp_attrs, temp_tags = self.get_template_data()
# IO Module Data
io_mod_attrs, io_mod_tags = self.get_io_module_data()
# Symbol Data
symb_attrs = self.get_attributes_multi((CLASS_CODE["Symbol Object"],))
# Program Data
pr_attrs, p_sym_attrs, p_rou_attrs, p_rou_tags = self.get_program_data()
# Map Data
map_attrs, map_cxn_attrs = self.get_map_data()
# Unknown 6e Data
unk_6e_attrs, unk_6e_tags = self.get_unknown6e_data()
# Task Data
task_attrs = self.get_attributes_multi((CLASS_CODE["Task Object"],))
# Consolidate all of the data for the slot
slot_dict = {
"template_attributes": temp_attrs,
"template_tags": temp_tags,
"io_module_attributes": io_mod_attrs,
"io_module_tags": io_mod_tags,
"symbol_attributes": symb_attrs,
"program_attributes": pr_attrs,
"program_symbol_attributes": p_sym_attrs,
"program_routine_attributes": p_rou_attrs,
"program_routine_tags": p_rou_tags,
"map_attributes": map_attrs,
"map_cxn_attributes": map_cxn_attrs,
"unknown6e_attributes": unk_6e_attrs,
"unknown6e_tags": unk_6e_tags,
"task_attributes": task_attrs,
}
self.log.info("Finished pulling all data")
return slot_dict
[docs]
def get_instance_list(self, class_path: PathType) -> list:
"""
Returns the list of instance ids for a given class path.
Args:
class_path: The class to query
Returns:
``[<instance_id>,..]``
"""
tb = self.get_instance_list_tag_buffer(class_path)
instance_list = b"".join(bytes([x]) for x in tb)
return parse_get_instance_list(instance_list)
[docs]
def get_attributes(
self, instance_path: PathType, attribute_list: list[int] | None = None
) -> dict:
"""
Returns attributes for a single instance at a given instance path.
Args:
instance_path: The instance to query
attribute_list: The instance attributes to query
(default: all attributes)
Returns:
``{<attribute_id>: <attribute_value>,..}``
"""
class_code = instance_path[-2]
if (not attribute_list) and (class_code in CLASS_ATTRIBUTE_INFO):
attribute_list = list(CLASS_ATTRIBUTE_INFO[class_code])
elif not attribute_list:
attribute_list = []
attributes = b"".join(
bytes([x])
for x in self.get_attributes_tag_buffer(
instance_path=instance_path, attribute_list=attribute_list
)
)
return parse_get_attributes(attributes, class_code)
[docs]
def get_attributes_multi(
self,
class_path: PathType,
attribute_list: list | None = None,
instance_list: list | None = None,
) -> dict[int, dict]:
"""
Returns attributes for multiple instances at a given class path.
Example: ``attr = self.get_attributes_multi((CLASS_CODE['X Object'],))``
Args:
class_path: The class path to query
attribute_list: The instance attributes to query
(default: all attributes)
instance_list: The instances to query
(default: all instances at class path)
Returns:
``{<instance_id>: {<attribute_id>: <attribute_value>,..},..}``
"""
class_code = class_path[-1]
if (not attribute_list) and (class_code in CLASS_ATTRIBUTE_INFO):
attribute_list = list(CLASS_ATTRIBUTE_INFO[class_code])
elif not attribute_list:
attribute_list = []
if not instance_list:
instance_list = self.get_instance_list(class_path)
target_attributes = {}
for instance_id in instance_list:
instance_path = (*class_path, instance_id)
target_attributes[instance_id] = self.get_attributes(
instance_path=instance_path, attribute_list=attribute_list
)
return target_attributes
[docs]
def get_template_data(self, path: tuple = ()) -> AttrTags:
"""
Returns a tuple (template_attributes, template_tags) for all
template instances at the given path.
Args:
path: Path to the instance containing the template class
Returns:
(attr, tags)
- attr = ``{<tem_id>:{<attr_id>:<attr_val>,..},..}``
- tags = ``{<tem_id>:[tag_data],..}``
"""
class_code = CLASS_CODE["Template Object"]
template_attributes = self.get_attributes_multi((*path, class_code))
template_tags = {}
for instance_id in template_attributes:
template_path = (*path, class_code, instance_id)
template_size = template_attributes[instance_id][0x04] * 4 - 20
template_tag_buffer = self.read_template(
instance_path=template_path, size=template_size
)
template_tags[instance_id] = parse_template(
cip_data=template_tag_buffer,
member_count=template_attributes[instance_id][0x02],
)
return template_attributes, template_tags
[docs]
def get_io_module_data(self, path: tuple = ()) -> AttrTags:
"""
Returns a tuple (io_module_attributes, io_module_tags) for all
io_module instances at a given path.
Args:
path: Path to the instance containing the io module class
Returns:
(attr, tags)
- attr = ``{<iom_id>:{<attr_id>:<attr_val>,..},..}``
- tags = ``{<iom_id>:[tag_data],..}``
"""
self.log.info("Getting module IO data")
class_code = CLASS_CODE["IO Module Object"]
io_module_attributes = self.get_attributes_multi((*path, class_code))
io_module_tags = {}
template_attributes = self.get_attributes_multi(
class_path=(CLASS_CODE["Template Object"],)
)
for instance_id in io_module_attributes:
io_module_path = (*path, class_code, instance_id)
type_id = io_module_attributes[instance_id][0x02]
type_size_d1 = unpack_dint(io_module_attributes[instance_id][1][:4])
type_size_d2 = unpack_dint(io_module_attributes[instance_id][1][4:8])
type_size_d3 = unpack_dint(io_module_attributes[instance_id][1][8:12])
type_size = get_size_of_type(
type_id, template_attributes, type_size_d1, type_size_d2, type_size_d3
)
# TODO: Always receives a privilege violation error. Why??
io_module_tags[instance_id] = self.read_tag_with_size(
instance_path=io_module_path, size=type_size
)
return io_module_attributes, io_module_tags
[docs]
def get_program_data(self, path: tuple = ()) -> tuple[dict, dict, dict, dict]:
"""
Returns a tuple (program_attributes, program_symbol_attributes,
program_routine_attributes, program_routine_tags) for all program
instances at a given path.
Args:
path: Path to the instance containing the program class
Returns:
(attr, sym_attr, rout_attr, rout_tags)
- attr = ``{<prog_id>:{<attr_id>:<attr_value>,..},..}``
- sym_attr = ``{<prog_id>:{<sym_id>:{<attr_id>:<attr_val>,..},..},..}``
- rout_attr = ``{<prog_id>:{<rout_id>:{<attr_id>:<attr_val>,..},..},..}``
- rout_tags = ``{<prog_id>:{<rout_id>:[tag_data],..},..}``
"""
self.log.info("Extracting Program Data....")
class_code = CLASS_CODE["Program Object"]
program_attributes = self.get_attributes_multi((*path, class_code))
symbol_attributes = {}
routine_attributes = {}
routine_tags = {}
for instance_id in program_attributes:
program_path = (*path, class_code, instance_id)
symbol_attributes[instance_id] = self.get_attributes_multi(
class_path=(*program_path, CLASS_CODE["Symbol Object"])
)
(
routine_attributes[instance_id],
routine_tags[instance_id],
) = self.get_routine_data(program_path)
self.log.info("Program extracted")
return (program_attributes, symbol_attributes, routine_attributes, routine_tags)
[docs]
def get_routine_data(self, path: PathType = ()) -> AttrTags:
"""
Returns a :class:`tuple` (routine_attributes, routine_tags)
for all routine instances at a given path.
Args:
path: Path to the instance containing the routine class
Returns:
(attr, tags)
- attr = ``{<rout_id>: {<attr_id>: <attr_value>,..},..}``
- tags = ``{<rout_id>: [tag_data],..}``
"""
class_code = CLASS_CODE["Routine Object"]
routine_attributes = self.get_attributes_multi((*path, class_code))
routine_tags = {}
for instance_id in routine_attributes:
routine_path = (*path, class_code, instance_id)
lang = routine_attributes[instance_id][0x01]
if lang == LANG_RLL:
routine_tags[instance_id] = self.read_tag(routine_path)
elif lang in [LANG_STL, LANG_SFC, LANG_FBD]:
routine_tags[instance_id] = self.read_tag_fragmented(routine_path)
else:
self.log.warning(f"Invalid routine language: {lang}")
return routine_attributes, routine_tags
[docs]
def get_map_data(self, path: PathType = ()) -> tuple[dict, dict]:
"""
Returns a :class:`tuple` (map_attributes, map_cxn_attributes)
for all map instances at a given path.
Args:
path: Path to the instance containing the map class
Returns:
(attr, cxn_attr)
- attr = ``{<map_id>: {<attr_id>: <attr_val>,..},..}``
- cxn_attr = ``{<map_id>: {<cxn_id>: {<attr_id>: <attr_val>,..},..},..}``
"""
self.log.info("Memory Map extraction")
class_code = CLASS_CODE["Map Object"]
map_attributes = self.get_attributes_multi((*path, class_code))
map_cxn_attributes = {}
for instance_id in map_attributes:
map_path = (*path, class_code, instance_id)
map_cxn_attributes[instance_id] = self.get_attributes_multi(
class_path=(*map_path, CLASS_CODE["Cxn Object"])
)
return map_attributes, map_cxn_attributes
[docs]
def get_unknown6e_data(self, path: tuple = ()) -> AttrTags:
"""
Returns a :class:`tuple` (unknown6e_attributes, unknown6e_tags)
for all ``unknown6e`` instances at a given path.
Args:
path: Path to the instance containing the unknown6e class
Returns:
(attr, tags)
- attr = ``{<u6e_id>: {<attr_id>: <attr_val>,..},..},``
- tags = ``{<u6e_id>: [tag_data],..}``
"""
class_code = CLASS_CODE["Unknown 6e"]
unknown6e_attributes = self.get_attributes_multi((*path, class_code))
unknown6e_tags = {}
for instance_id in unknown6e_attributes:
unknown6e_path = (*path, class_code, instance_id)
# TODO: fix tag request to get proper data
unknown6e_tags[instance_id] = self.read_tag(unknown6e_path)
return unknown6e_attributes, unknown6e_tags
[docs]
def get_instance_list_tag_buffer(self, class_path: PathType) -> list:
"""
Returns a data buffer representing a :class:`list` of all
instance IDs of a specified class path.
Args:
class_path: The class to query
Returns:
Data buffer representing a :class:`list` of all instance IDs of a
specified class path
"""
if config.DEBUG >= 4:
self.log.debug(f"get_instance_list_tag_buffer({class_path})")
service = TAG_SERVICES_REQUEST["Get Instance List"]
tag_buffer = []
next_instance = 0
while next_instance != -1:
path_string = path_to_string((*class_path, next_instance))
reply = self.driver.send_connected_command(
service=service, path=path_string, cmd_data=b""
)
reply_data, next_instance = get_instance_list_data_from_reply(reply)
tag_buffer.extend(reply_data)
if config.DEBUG >= 4:
self.log.debug(f"get_instance_list_tag_buffer: DONE. tag_buffer:\n{tag_buffer}")
return tag_buffer
[docs]
def get_attributes_tag_buffer(
self, instance_path: PathType, attribute_list: list[int]
) -> list[TagData]:
"""
Returns a data buffer representing the specified attributes of an
instance at a specified path.
Args:
instance_path: The instance to query
attribute_list: Attributes to query
Returns:
Data buffer (list(attribute_data)) representing the specified
attributes of an instance
"""
if config.DEBUG >= 4:
self.log.debug(f"get_attributes_tag_buffer({instance_path}, {attribute_list})")
service = TAG_SERVICES_REQUEST["Get Attributes"]
tag_buffer = []
attribute_reads = 0
attribute_reads_total = 0
path_string = path_to_string(instance_path)
while attribute_reads != -1:
attribute_reads_total += attribute_reads
attribute_list_remaining = attribute_list[attribute_reads_total:]
rp = [
pack_uint(len(attribute_list_remaining)),
]
rp.extend([pack_uint(attribute) for attribute in attribute_list_remaining])
reply = self.driver.send_connected_command(
service=service, path=path_string, cmd_data=b"".join(rp)
)
reply_data, attribute_reads = get_attributes_data_from_reply(reply)
tag_buffer.extend(reply_data)
if config.DEBUG >= 4:
self.log.debug(f"get_attributes_tag_buffer: DONE. tag_buffer:\n{tag_buffer}")
return tag_buffer
[docs]
def read_tag(self, instance_path: PathType, tag_offset: int = 0) -> list[int]:
"""
Return a data buffer representing the tag data of an instance at a
specified path (at a specified offset).
Args:
instance_path: The instance to query
tag_offset: The tag offset to start reading
Returns:
Data buffer representing the tag data of an instance
"""
if config.DEBUG >= 4:
self.log.debug(f"read_tag({instance_path}, {hex(tag_offset)})")
tag_buffer = []
tag_size = 0
path_string = path_to_string(instance_path)
while tag_size != -1:
reply = self.driver.send_connected_command(
service=TAG_SERVICES_REQUEST["Read Tag"],
path=path_string,
cmd_data=b"\x00\x00\x00\x00" + pack_uint(tag_offset),
)
reply_data, tag_size = read_tag_data_from_reply(reply)
tag_buffer.extend(reply_data)
tag_offset += tag_size
if config.DEBUG >= 4:
self.log.debug(f"read_tag: DONE. tag_buffer:\n{tag_buffer}")
return tag_buffer
[docs]
def read_tag_fragmented(self, instance_path: PathType, tag_offset: int = 0) -> list[int]:
"""
Return a data buffer representing the tag data of an instance at a
specified path (at a specified offset).
Args:
instance_path: The instance to query
tag_offset: The tag offset to start reading
Returns:
Data buffer representing the tag data of an instance
"""
if config.DEBUG >= 4:
self.log.debug(f"read_tag_fragmented({instance_path}, {hex(tag_offset)})")
tag_buffer = []
tag_size = 0
tag_address = b""
path_string = path_to_string(instance_path)
while tag_size != -1:
reply = self.driver.send_connected_command(
service=TAG_SERVICES_REQUEST["Read Tag Fragmented"],
path=path_string,
cmd_data=b"\x00\x00\x00\x00" + pack_uint(tag_offset) + b"\x00\x00\x00\x00",
)
reply_data, tag_size, tag_address = read_tag_fragmented_data_from_reply(reply)
tag_buffer.extend(reply_data)
tag_offset += tag_size
ret = list(tag_address) + tag_buffer
if config.DEBUG >= 4:
self.log.debug(f"read_tag_fragmented: DONE. ret:\n{ret}")
return ret
[docs]
def read_template(self, instance_path: PathType, size: int, offset: int = 0) -> list:
"""
Return a data buffer representing the tag data of a template
instance at a specified path (at a specified offset).
Args:
instance_path: The template instance to query
size: The size of the template instance to query
offset: The tag offset to start reading
Returns:
Data buffer representing the tag data of a template instance
"""
if config.DEBUG >= 4:
self.log.debug(f"read_template({instance_path}, {hex(size)}, {hex(offset)})")
tag_buffer = []
tag_offset = offset
remaining_size = size
tag_size = 0
path_string = path_to_string(instance_path)
while tag_size != -1:
reply = self.driver.send_connected_command(
service=TAG_SERVICES_REQUEST["Read Tag"],
path=path_string,
cmd_data=pack_dint(tag_offset) + pack_uint(remaining_size),
)
reply_data, tag_size = read_tag_data_from_reply(reply)
tag_offset += tag_size
tag_buffer.extend(reply_data)
remaining_size = size - tag_offset
if config.DEBUG >= 4:
self.log.debug(f"read_template: DONE. tag_buffer:\n{tag_buffer}")
return tag_buffer
[docs]
def read_tag_with_size(self, instance_path: PathType, size: int) -> list:
"""
Return a data buffer representing the tag data of an instance at a
specified path (with a specified size).
Args:
instance_path: The instance to query
size: The size of the instance to query
Returns:
Data buffer representing the tag data of an instance
"""
if config.DEBUG >= 4:
self.log.debug(f"read_tag_with_size({instance_path}, {hex(size)})")
tag_buffer = []
path_string = path_to_string(instance_path)
join_string = b"".join([b"\x00", b"\x00", pack_dint(size)])
reply = self.driver.send_connected_command(
service=TAG_SERVICES_REQUEST["Read Tag With Size"],
path=path_string,
cmd_data=join_string,
)
reply_data = read_tag_with_size_data_from_reply(reply)
tag_buffer.extend(reply_data)
if config.DEBUG >= 4:
self.log.debug(f"read_tag_with_size: DONE. tag_buffer:\n{tag_buffer}")
return tag_buffer
[docs]
def get_size_of_type(
type_id: int,
template_attributes: dict,
type_size_d1: int = 0,
type_size_d2: int = 0,
type_size_d3: int = 0,
) -> int:
"""
Returns the size of the given type.
Args:
type_id: the ID of the type
template_attributes: template object attributes
type_size_d1: the size of the first dimension (if array)
type_size_d2: the size of the second dimension (if array)
type_size_d3: the size of the third dimension (if array)
Returns:
Integer value of the size of the type (in bytes)
"""
structure_bit = (type_id & 0x8000) >> 15
array_bits = (type_id & 0x6000) >> 13
type_bits = type_id & 0x0FFF
if structure_bit == 0: # Atomic data
if (type_bits in I_DATA_TYPE) and (I_DATA_TYPE[type_bits] in DATA_FUNCTION_SIZE):
type_size = DATA_FUNCTION_SIZE[I_DATA_TYPE[type_bits]]
else:
log.warning(f"Something went wrong. Unseen type: {type_bits}")
return 0
elif type_bits in template_attributes: # Structure data
# TODO: rewrite the 0x5 to remove this comment block
# template structure size
type_size = template_attributes[type_bits][0x05]
else:
log.warning(f"Something went wrong. Unseen type: {type_bits}")
return 0
# TODO: Figure out why the below checks exist
# (they are all true if array_bits > 2)
if array_bits > 0:
type_size *= type_size_d1
if array_bits > 1:
type_size *= type_size_d2
if array_bits > 2:
type_size *= type_size_d3
return type_size
[docs]
def validate_reply_data(cip_data: bytes, service: int, min_size: int) -> bool:
"""
Validates the reply data.
Args:
cip_data: The CIP data to verify
service: The reply service code
min_size: Used to check if the reply data is too short
Returns:
If validation was successful
"""
if len(cip_data) < min_size:
log.warning(f"CIP reply data too short ({len(cip_data)} < {min_size})")
return False
data_service = unpack_usint(cip_data[:1])
if data_service != service:
log.warning(
f"Wrong service code for this method "
f"(service_code: {hex(data_service)}, expected: {hex(service)})"
)
return False
cip_status = unpack_usint(cip_data[2:3])
if not ((cip_status == 0) or (cip_status in SERVICE_STATUS)):
log.warning(f"Unknown CIP status {hex(cip_status)}")
return False
return True
[docs]
def get_instance_list_data_from_reply(cip_data: bytes) -> DataListType:
"""
Returns a tuple (instance_list_data, next_instance_id) from
get_instance_list reply data.
Args:
cip_data: Reply data at the CIP layer
Returns:
(data, next_instance_id)
- data = ``[instance_list_data]``
- next_instance_id = instance id to read next if insufficient packet
- space (-1 if complete or error)
"""
if config.DEBUG >= 4:
log.debug("get_instance_list: reading reply: starting...")
# validate input data
reply_service = I_TAG_SERVICES_REPLY["Get Instance List"]
if not validate_reply_data(cip_data, reply_service, 4):
return b"", -1
# process input data
cip_status = unpack_usint(cip_data[2:3])
instance_list_data = cip_data[4:]
# check status of input data
if cip_status == SUCCESS:
next_instance = -1
if config.DEBUG >= 4:
log.debug("get_instance_list: reading reply: SUCCESS")
elif SERVICE_STATUS[cip_status] == "Insufficient Packet Space":
next_instance = (
max(
unpack_dint(instance_list_data[i : i + 4])
for i in range(0, len(instance_list_data), 4)
)
+ 1
)
if config.DEBUG >= 4:
log.debug(
f"get_instance_list: reading reply: IN PROGRESS...: "
f"next_index: {hex(next_instance)}"
)
else:
next_instance = -1
log.debug(
f"get_instance_list reply: "
f"error: {SERVICE_STATUS[cip_status]} (status = {hex(cip_status)})"
)
return instance_list_data, next_instance
[docs]
def get_attributes_data_from_reply(cip_data: bytes) -> DataListType:
"""
Returns the tuple (attribute_data, attribute_count) from
get_attributes reply data.
Args:
cip_data: Reply data at the CIP layer
Returns:
(data, count)
- data = ``[instance_attribute_data]``
- count = number of attributes in the reply (-1 if complete or error)
"""
if config.DEBUG >= 4:
log.debug("get_attributes_data_from_reply: reading reply: starting...")
# validate input data
reply_service = I_TAG_SERVICES_REPLY["Get Attributes"]
if not validate_reply_data(cip_data, reply_service, 6):
return b"", -1
# process input data
cip_status = unpack_usint(cip_data[2:3])
attribute_count = unpack_uint(cip_data[4:6])
attribute_data = cip_data[6:]
# check status of input data
if cip_status == SUCCESS:
attribute_count = -1
if config.DEBUG >= 4:
log.debug("get_attributes_data_from_reply: reading reply: SUCCESS")
elif SERVICE_STATUS[cip_status] == "Insufficient Packet Space":
if config.DEBUG >= 4:
log.debug(
f"get_attributes_data_from_reply: reading reply: IN "
f"PROGRESS...: attribute_count: {hex(attribute_count)}"
)
else:
log.debug(
f"get_attributes_data_from_reply: reading reply: "
f"error: {SERVICE_STATUS[cip_status]} (status = {hex(cip_status)})"
)
return attribute_data, attribute_count
[docs]
def read_tag_data_from_reply(cip_data: bytes) -> DataListType:
"""
Return the tuple (tag_data, tag_size) from read_tag reply data.
Args:
cip_data: Reply data at the CIP layer
Returns:
(data, offset)
- data = ``[tag_data]``
- size = size of the data in the reply (-1 if complete or error)
"""
if config.DEBUG >= 4:
log.debug("read_tag_data_from_reply reply: reading reply: starting...")
# validate input data
reply_service = I_TAG_SERVICES_REPLY["Read Tag"]
if not validate_reply_data(cip_data, reply_service, 4):
return b"", -1
# process input data
cip_status = unpack_usint(cip_data[2:3])
tag_data = cip_data[8:]
# check status of input data
if cip_status == SUCCESS:
tag_size = -1
if config.DEBUG >= 4:
log.debug("read_tag_data_from_reply: reading reply: SUCCESS")
elif SERVICE_STATUS[cip_status] == "Insufficient Packet Space":
tag_size = len(tag_data) // 4
if config.DEBUG >= 4:
log.debug(
f"read_tag_data_from_reply: reading reply: IN PROGRESS...:"
f"tag_size: {hex(tag_size)}"
)
else:
tag_size = -1
log.debug(
f"read_tag_data_from_reply: reading reply: "
f"error: {SERVICE_STATUS[cip_status]} (status = {hex(cip_status)})"
)
return tag_data, tag_size
[docs]
def read_tag_fragmented_data_from_reply(cip_data: bytes) -> FragListType:
"""
Return the tuple (tag_data, tag_size, tag_address) from
read_tag_fragmented reply data.
Args:
cip_data: Reply data at the CIP layer
Returns:
(data, size, address)
- data = ``[tag_data]``
- size = size of the data in the reply (-1 if complete or error)
- address = base memory address of the tag in the device
"""
if config.DEBUG >= 4:
log.debug("read_tag_fragmented: reading reply: starting...")
# validate input data
reply_service = I_TAG_SERVICES_REPLY["Read Tag Fragmented"]
if not validate_reply_data(cip_data, reply_service, 12):
return b"", -1, b"\x00\x00\x00\x00\xff\xff\xff\xff"
# process input data
cip_status = unpack_usint(cip_data[2:3])
tag_address = cip_data[4:12]
tag_data = cip_data[12:]
tag_size = -1
# check status of input data
if cip_status == SUCCESS:
if config.DEBUG >= 4:
log.debug("read_tag_fragmented: reading reply: SUCCESS")
elif SERVICE_STATUS[cip_status] == "Insufficient Packet Space":
tag_size = len(tag_data) // 4
if config.DEBUG >= 4:
log.debug(
f"read_tag_fragmented: reading reply: IN PROGRESS...:tag_size: {hex(tag_size)}"
)
else:
log.debug(
f"read_tag_fragmented: reading reply: "
f"error: {SERVICE_STATUS[cip_status]} (status = {hex(cip_status)})"
)
return tag_data, tag_size, tag_address
[docs]
def read_tag_with_size_data_from_reply(cip_data: bytes) -> TagData:
"""
Return the tag_data from ``read_tag_with_size`` reply data.
Args:
cip_data: Reply data at the CIP layer
Returns:
``[tag_data]``
"""
if config.DEBUG >= 4:
log.debug("read_tag_with_size: reading reply: starting...")
# Validate input data
reply_service = I_TAG_SERVICES_REPLY["Read Tag With Size"]
if not validate_reply_data(cip_data, reply_service, 4):
return b""
# Process input data
cip_status = unpack_usint(cip_data[2:3])
tag_data = cip_data[4:]
# Check status of input data
if cip_status == SUCCESS:
if config.DEBUG >= 4:
log.debug("read_tag_with_size: reading reply: SUCCESS")
else:
log.debug(
f"read_tag_with_size: reading reply: "
f"error: {SERVICE_STATUS[cip_status]} (status = {hex(cip_status)})"
)
return tag_data
[docs]
def path_to_string(path: PathType) -> bytes:
"""
Returns a string representing the path from a tuple.
Args:
path: :class:`tuple` representation of the path
Returns:
Byte string representation of the path (:class:`bytes`)
"""
path_string = b""
for idx in range(0, len(path), 2):
class_code = path[idx]
instance_code = path[idx + 1]
if class_code <= 0xFF:
path_string += CLASS_ID["8-bit"]
path_string += pack_usint(class_code)
else:
path_string += CLASS_ID["16-bit"] + b"\x00"
path_string += pack_uint(class_code)
if instance_code <= 0xFF:
path_string += INSTANCE_ID["8-bit"]
path_string += pack_usint(instance_code)
else:
path_string += INSTANCE_ID["16-bit"] + b"\x00"
path_string += pack_uint(instance_code)
full_path = (len(path_string) // 2).to_bytes(1, byteorder="big") + path_string
return full_path
[docs]
def parse_get_instance_list(cip_data: bytes) -> list[int]:
"""
Returns a list of instance ids (int) from raw instance list CIP data.
Args:
cip_data: ``[<instance_list_data>]``
Returns:
``[instance_id0, instance_id1, ...]``
"""
if config.DEBUG >= 4:
log.debug(f"parse_get_instance_list({cip_data})")
if len(cip_data) % 4 == 0:
instance_list = [unpack_dint(cip_data[i : i + 4]) for i in range(0, len(cip_data), 4)]
else:
log.debug("parse_get_instance_list: length of input data not multiple of 4")
instance_list = []
if config.DEBUG >= 4:
log.debug(f"parse_get_instance_list: DONE. instance_list:\n{instance_list}")
return instance_list
[docs]
def parse_get_attributes(cip_data: bytes, cip_class: int) -> dict[int, int]:
"""
Returns a dictionary of attributes from the raw :term:`CIP` data and the
CIP class id (:class:`int`).
Args:
cip_data: ``[<attributes_data>]`` (Represented as :class:`bytes` currently)
cip_class: CIP class of the data
Returns:
Dict with ``{<attr_id>:<attr_val>,..}``
"""
if config.DEBUG >= 4:
log.debug(f"parse_get_attributes({cip_data}, {cip_class})")
instance_attributes = {}
if cip_class not in CLASS_ATTRIBUTE_INFO:
log.warning(f"parse_get_attributes: CIP class undefined (0x{cip_class:0>4x})")
return instance_attributes
class_attribute_info = CLASS_ATTRIBUTE_INFO[cip_class]
idx = 0
while idx < len(cip_data):
if len(cip_data) < (idx + 4):
log.warning(
"parse_get_attributes: ID and status fields too small "
f"(size: 0x{len(cip_data) - idx:0>4x})"
)
break
attribute_id = unpack_uint(cip_data[idx : idx + 2])
attribute_status = unpack_uint(cip_data[idx + 2 : idx + 4])
if config.DEBUG >= 4:
log.debug(
f"parse_get_attributes: ID: 0x{attribute_id:0>4x} "
f"Status: 0x{attribute_status:0>4x}"
)
if attribute_status != SUCCESS:
idx += 4
continue
if attribute_id not in class_attribute_info:
log.warning(
f"parse_get_attributes: Cannot parse attribute "
f"(undefined attribute id: 0x{attribute_id:0>4x} "
f"for CIP class 0x{cip_class:0>4x})"
)
break
attribute_type = class_attribute_info[attribute_id]["type"]
if config.DEBUG >= 4:
log.debug(f"parse_get_attributes: Type: {attribute_type}")
if attribute_type.split(":")[0] == "STRING":
string_size = int(attribute_type.split(":")[1])
if string_size == 2:
attribute_size = unpack_uint(cip_data[idx + 4 : idx + 4 + string_size])
elif string_size == 4:
attribute_size = unpack_dint(cip_data[idx + 4 : idx + 4 + string_size])
else:
log.warning(
f"parse_get_attributes: Cannot parse attribute "
f"(unknown string size: 0x{string_size:0>4x})"
)
break
attribute_value = cip_data[
idx + 4 + string_size : idx + 4 + string_size + attribute_size
]
attribute_size += string_size
elif attribute_type.split(":")[0] == "RAW":
attribute_size = int(attribute_type.split(":")[1])
attribute_value = cip_data[idx + 4 : idx + 4 + attribute_size]
else:
attribute_size = DATA_FUNCTION_SIZE[attribute_type]
attribute_unpack_operation = UNPACK_DATA_FUNCTION[attribute_type]
attribute_value = attribute_unpack_operation(
cip_data[idx + 4 : idx + 4 + attribute_size]
)
if config.DEBUG >= 4:
log.debug(f"parse_get_attributes: Value: {attribute_value}")
instance_attributes[attribute_id] = attribute_value
idx += 4 + attribute_size
if config.DEBUG >= 4:
log.debug(f"parse_get_attributes: DONE. instance_attributes:\n{instance_attributes}")
return instance_attributes
[docs]
def parse_template(cip_data: list, member_count: int) -> TemplateType:
"""
Parses a template tag and returns a data structure
representing the template.
Args:
cip_data: A buffer (list of bytes) of the template tag
member_count: The number of members represented by the template
Returns:
.. code-block:: python
{"Name":<template_name>,
"Structure":{
<member_offset1>:{
"Name":<member_name>,
"Type":<member_type>,
"Info":<member_info>,
}, ..
}
}
"""
if config.DEBUG >= 4:
log.debug(f"parse_template({cip_data}, {member_count})")
name_data = cip_data[member_count * 8 :]
name_list = bytes(name_data).split(b"\x00")
template_name = name_list[0].split(b";")[0]
member_name_list = name_list[1 : member_count + 1]
template_struct = {
"Name": template_name,
"Structure": {},
}
for member_idx in range(member_count):
if len(cip_data) < (member_idx * 8 + 8):
log.warning(
"parse_template: Malformed CIP data (data does "
"not contain as many members as specified)"
)
break
member_data = cip_data[(member_idx * 8) : (member_idx * 8) + 8]
member_info = unpack_uint(bytes(member_data[:2]))
member_type = unpack_uint(bytes(member_data[2:4]))
member_offset = unpack_dint(bytes(member_data[4:8]))
if len(member_name_list) > member_idx:
member_name = member_name_list[member_idx]
else:
log.warning(
f"parse_template: Malformed CIP data (data does not "
f"contain name at index 0x{member_idx:0>4x})"
)
member_name = b""
template_struct["Structure"][member_offset] = {
"Name": member_name,
"Type": member_type,
"Info": member_info,
}
if config.DEBUG >= 4:
log.debug(f"parse_template: DONE. template_struct:\n{template_struct}")
return template_struct