Coverage for / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / lanxi_stream.py: 31%
235 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-27 18:22 +0000
1# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild
2# flake8: noqa # pylint: skip-file
4from packaging.version import Version
5import kaitaistruct
6from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO
7from enum import Enum
10class OpenapiHeader(KaitaiStruct):
12 class EMessageType(Enum):
13 e_signal_data = 1
14 e_data_quality = 2
15 e_interpretation = 8
16 e_aux_sequence_data = 11
18 def __init__(self, _io, _parent=None, _root=None):
19 self._io = _io
20 self._parent = _parent
21 self._root = _root if _root else self
22 self._read()
24 def _read(self):
25 self.magic = self._io.read_bytes(2)
26 if not self.magic == b"\x42\x4b":
27 raise kaitaistruct.ValidationNotEqualError(
28 b"\x42\x4b", self.magic, self._io, "/types/header/seq/0"
29 )
30 self.header_length = self._io.read_u2le()
31 self.message_type = KaitaiStream.resolve_enum(
32 OpenapiMessage.Header.EMessageType, self._io.read_u2le()
33 )
34 self.reserved1 = self._io.read_u2le()
35 self.reserved2 = self._io.read_u4le()
36 self.time = OpenapiMessage.Time(self._io, self, self._root)
37 self.message_length = self._io.read_u4le()
40if Version(kaitaistruct.__version__) < Version("0.9"):
41 raise Exception(
42 "Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s"
43 % (kaitaistruct.__version__)
44 )
47class OpenapiMessage(KaitaiStruct):
48 def __init__(self, _io, _parent=None, _root=None):
49 self._io = _io
50 self._parent = _parent
51 self._root = _root if _root else self
52 self._read()
54 def _read(self):
55 self.header = OpenapiMessage.Header(self._io, self, self._root)
56 _on = self.header.message_type
57 if _on == OpenapiMessage.Header.EMessageType.e_signal_data:
58 self._raw_message = self._io.read_bytes(self.header.message_length)
59 _io__raw_message = KaitaiStream(BytesIO(self._raw_message))
60 self.message = OpenapiMessage.SignalData(_io__raw_message, self, self._root)
61 elif _on == OpenapiMessage.Header.EMessageType.e_data_quality:
62 self._raw_message = self._io.read_bytes(self.header.message_length)
63 _io__raw_message = KaitaiStream(BytesIO(self._raw_message))
64 self.message = OpenapiMessage.DataQuality(_io__raw_message, self, self._root)
65 elif _on == OpenapiMessage.Header.EMessageType.e_interpretation:
66 self._raw_message = self._io.read_bytes(self.header.message_length)
67 _io__raw_message = KaitaiStream(BytesIO(self._raw_message))
68 self.message = OpenapiMessage.Interpretations(_io__raw_message, self, self._root)
69 else:
70 self.message = self._io.read_bytes(self.header.message_length)
72 class Interpretations(KaitaiStruct):
73 def __init__(self, _io, _parent=None, _root=None):
74 self._io = _io
75 self._parent = _parent
76 self._root = _root if _root else self
77 self._read()
79 def _read(self):
80 self.interpretations = []
81 i = 0
82 while not self._io.is_eof():
83 self.interpretations.append(
84 OpenapiMessage.Interpretation(self._io, self, self._root)
85 )
86 i += 1
88 class DataQuality(KaitaiStruct):
89 def __init__(self, _io, _parent=None, _root=None):
90 self._io = _io
91 self._parent = _parent
92 self._root = _root if _root else self
93 self._read()
95 def _read(self):
96 self.number_of_signals = self._io.read_u2le()
97 self.qualities = [None] * (self.number_of_signals)
98 for i in range(self.number_of_signals):
99 self.qualities[i] = OpenapiMessage.DataQualityBlock(self._io, self, self._root)
101 class DataQualityBlock(KaitaiStruct):
102 def __init__(self, _io, _parent=None, _root=None):
103 self._io = _io
104 self._parent = _parent
105 self._root = _root if _root else self
106 self._read()
108 def _read(self):
109 self.signal_id = self._io.read_u2le()
110 self.validity_flags = OpenapiMessage.ValidityFlags(self._io, self, self._root)
111 self.reserved = self._io.read_u2le()
113 class Interpretation(KaitaiStruct):
115 class EDescriptorType(Enum):
116 data_type = 1
117 scale_factor = 2
118 offset = 3
119 period_time = 4
120 unit = 5
121 vector_length = 6
122 channel_type = 7
124 def __init__(self, _io, _parent=None, _root=None):
125 self._io = _io
126 self._parent = _parent
127 self._root = _root if _root else self
128 self._read()
130 def _read(self):
131 self.signal_id = self._io.read_u2le()
132 self.descriptor_type = KaitaiStream.resolve_enum(
133 OpenapiMessage.Interpretation.EDescriptorType, self._io.read_u2le()
134 )
135 self.reserved = self._io.read_u2le()
136 self.value_length = self._io.read_u2le()
137 _on = self.descriptor_type
138 if _on == OpenapiMessage.Interpretation.EDescriptorType.data_type:
139 self.value = self._io.read_u2le()
140 elif _on == OpenapiMessage.Interpretation.EDescriptorType.scale_factor:
141 self.value = self._io.read_f8le()
142 elif _on == OpenapiMessage.Interpretation.EDescriptorType.unit:
143 self.value = OpenapiMessage.String(self._io, self, self._root)
144 elif _on == OpenapiMessage.Interpretation.EDescriptorType.vector_length:
145 self.value = self._io.read_u2le()
146 elif _on == OpenapiMessage.Interpretation.EDescriptorType.period_time:
147 self.value = OpenapiMessage.Time(self._io, self, self._root)
148 elif _on == OpenapiMessage.Interpretation.EDescriptorType.offset:
149 self.value = self._io.read_f8le()
150 elif _on == OpenapiMessage.Interpretation.EDescriptorType.channel_type:
151 self.value = self._io.read_u2le()
152 self.padding = [None] * (((4 - (self._io.pos() % 4)) & 3))
153 for i in range(((4 - (self._io.pos() % 4)) & 3)):
154 self.padding[i] = self._io.read_u1()
156 class String(KaitaiStruct):
157 def __init__(self, _io, _parent=None, _root=None):
158 self._io = _io
159 self._parent = _parent
160 self._root = _root if _root else self
161 self._read()
163 def _read(self):
164 self.count = self._io.read_u2le()
165 self.data = (self._io.read_bytes(self.count)).decode("UTF-8")
167 class TimeFamily(KaitaiStruct):
168 def __init__(self, _io, _parent=None, _root=None):
169 self._io = _io
170 self._parent = _parent
171 self._root = _root if _root else self
172 self._read()
174 def _read(self):
175 self.k = self._io.read_u1()
176 self.l = self._io.read_u1()
177 self.m = self._io.read_u1()
178 self.n = self._io.read_u1()
180 class ValidityFlags(KaitaiStruct):
181 def __init__(self, _io, _parent=None, _root=None):
182 self._io = _io
183 self._parent = _parent
184 self._root = _root if _root else self
185 self._read()
187 def _read(self):
188 self.f = self._io.read_u2le()
190 @property
191 def overload(self):
192 if hasattr(self, "_m_overload"):
193 return self._m_overload if hasattr(self, "_m_overload") else None
195 self._m_overload = (self.f & 2) != 0
196 return self._m_overload if hasattr(self, "_m_overload") else None
198 @property
199 def invalid(self):
200 if hasattr(self, "_m_invalid"):
201 return self._m_invalid if hasattr(self, "_m_invalid") else None
203 self._m_invalid = (self.f & 8) != 0
204 return self._m_invalid if hasattr(self, "_m_invalid") else None
206 @property
207 def overrun(self):
208 if hasattr(self, "_m_overrun"):
209 return self._m_overrun if hasattr(self, "_m_overrun") else None
211 self._m_overrun = (self.f & 16) != 0
212 return self._m_overrun if hasattr(self, "_m_overrun") else None
214 class SignalData(KaitaiStruct):
215 def __init__(self, _io, _parent=None, _root=None):
216 self._io = _io
217 self._parent = _parent
218 self._root = _root if _root else self
219 self._read()
221 def _read(self):
222 self.number_of_signals = self._io.read_u2le()
223 self.reserved = self._io.read_u2le()
224 self.signals = [None] * (self.number_of_signals)
225 for i in range(self.number_of_signals):
226 self.signals[i] = OpenapiMessage.SignalBlock(self._io, self, self._root)
228 class Header(KaitaiStruct):
230 class EMessageType(Enum):
231 e_signal_data = 1
232 e_data_quality = 2
233 e_interpretation = 8
234 e_aux_sequence_data = 11
236 def __init__(self, _io, _parent=None, _root=None):
237 self._io = _io
238 self._parent = _parent
239 self._root = _root if _root else self
240 self._read()
242 def _read(self):
243 self.magic = self._io.read_bytes(2)
244 if not self.magic == b"\x42\x4b":
245 raise kaitaistruct.ValidationNotEqualError(
246 b"\x42\x4b", self.magic, self._io, "/types/header/seq/0"
247 )
248 self.header_length = self._io.read_u2le()
249 self.message_type = KaitaiStream.resolve_enum(
250 OpenapiMessage.Header.EMessageType, self._io.read_u2le()
251 )
252 self.reserved1 = self._io.read_u2le()
253 self.reserved2 = self._io.read_u4le()
254 self.time = OpenapiMessage.Time(self._io, self, self._root)
255 self.message_length = self._io.read_u4le()
257 class SignalBlock(KaitaiStruct):
258 def __init__(self, _io, _parent=None, _root=None):
259 self._io = _io
260 self._parent = _parent
261 self._root = _root if _root else self
262 self._read()
264 def _read(self):
265 self.signal_id = self._io.read_s2le()
266 self.number_of_values = self._io.read_s2le()
267 self.values = [None] * (self.number_of_values)
268 for i in range(self.number_of_values):
269 self.values[i] = OpenapiMessage.Value(self._io, self, self._root)
271 class Time(KaitaiStruct):
272 def __init__(self, _io, _parent=None, _root=None):
273 self._io = _io
274 self._parent = _parent
275 self._root = _root if _root else self
276 self._read()
278 def _read(self):
279 self.time_family = OpenapiMessage.TimeFamily(self._io, self, self._root)
280 self.time_count = self._io.read_u8le()
282 class Value(KaitaiStruct):
283 def __init__(self, _io, _parent=None, _root=None):
284 self._io = _io
285 self._parent = _parent
286 self._root = _root if _root else self
287 self._read()
289 def _read(self):
290 self.value1 = self._io.read_u1()
291 self.value2 = self._io.read_u1()
292 self.value3 = self._io.read_s1()
294 @property
295 def calc_value(self):
296 if hasattr(self, "_m_calc_value"):
297 return self._m_calc_value if hasattr(self, "_m_calc_value") else None
299 self._m_calc_value = (self.value1 + (self.value2 << 8)) + (self.value3 << 16)
300 return self._m_calc_value if hasattr(self, "_m_calc_value") else None