Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / lanxi_stream.py: 31%

235 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild 

2# flake8: noqa # pylint: skip-file 

3 

4from packaging.version import Version 

5import kaitaistruct 

6from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO 

7from enum import Enum 

8 

9 

10class OpenapiHeader(KaitaiStruct): 

11 

12 class EMessageType(Enum): 

13 e_signal_data = 1 

14 e_data_quality = 2 

15 e_interpretation = 8 

16 e_aux_sequence_data = 11 

17 

18 def __init__(self, _io, _parent=None, _root=None): 

19 self._io = _io 

20 self._parent = _parent 

21 self._root = _root if _root else self 

22 self._read() 

23 

24 def _read(self): 

25 self.magic = self._io.read_bytes(2) 

26 if not self.magic == b"\x42\x4b": 

27 raise kaitaistruct.ValidationNotEqualError( 

28 b"\x42\x4b", self.magic, self._io, "/types/header/seq/0" 

29 ) 

30 self.header_length = self._io.read_u2le() 

31 self.message_type = KaitaiStream.resolve_enum( 

32 OpenapiMessage.Header.EMessageType, self._io.read_u2le() 

33 ) 

34 self.reserved1 = self._io.read_u2le() 

35 self.reserved2 = self._io.read_u4le() 

36 self.time = OpenapiMessage.Time(self._io, self, self._root) 

37 self.message_length = self._io.read_u4le() 

38 

39 

40if Version(kaitaistruct.__version__) < Version("0.9"): 

41 raise Exception( 

42 "Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" 

43 % (kaitaistruct.__version__) 

44 ) 

45 

46 

47class OpenapiMessage(KaitaiStruct): 

48 def __init__(self, _io, _parent=None, _root=None): 

49 self._io = _io 

50 self._parent = _parent 

51 self._root = _root if _root else self 

52 self._read() 

53 

54 def _read(self): 

55 self.header = OpenapiMessage.Header(self._io, self, self._root) 

56 _on = self.header.message_type 

57 if _on == OpenapiMessage.Header.EMessageType.e_signal_data: 

58 self._raw_message = self._io.read_bytes(self.header.message_length) 

59 _io__raw_message = KaitaiStream(BytesIO(self._raw_message)) 

60 self.message = OpenapiMessage.SignalData(_io__raw_message, self, self._root) 

61 elif _on == OpenapiMessage.Header.EMessageType.e_data_quality: 

62 self._raw_message = self._io.read_bytes(self.header.message_length) 

63 _io__raw_message = KaitaiStream(BytesIO(self._raw_message)) 

64 self.message = OpenapiMessage.DataQuality(_io__raw_message, self, self._root) 

65 elif _on == OpenapiMessage.Header.EMessageType.e_interpretation: 

66 self._raw_message = self._io.read_bytes(self.header.message_length) 

67 _io__raw_message = KaitaiStream(BytesIO(self._raw_message)) 

68 self.message = OpenapiMessage.Interpretations(_io__raw_message, self, self._root) 

69 else: 

70 self.message = self._io.read_bytes(self.header.message_length) 

71 

72 class Interpretations(KaitaiStruct): 

73 def __init__(self, _io, _parent=None, _root=None): 

74 self._io = _io 

75 self._parent = _parent 

76 self._root = _root if _root else self 

77 self._read() 

78 

79 def _read(self): 

80 self.interpretations = [] 

81 i = 0 

82 while not self._io.is_eof(): 

83 self.interpretations.append( 

84 OpenapiMessage.Interpretation(self._io, self, self._root) 

85 ) 

86 i += 1 

87 

88 class DataQuality(KaitaiStruct): 

89 def __init__(self, _io, _parent=None, _root=None): 

90 self._io = _io 

91 self._parent = _parent 

92 self._root = _root if _root else self 

93 self._read() 

94 

95 def _read(self): 

96 self.number_of_signals = self._io.read_u2le() 

97 self.qualities = [None] * (self.number_of_signals) 

98 for i in range(self.number_of_signals): 

99 self.qualities[i] = OpenapiMessage.DataQualityBlock(self._io, self, self._root) 

100 

101 class DataQualityBlock(KaitaiStruct): 

102 def __init__(self, _io, _parent=None, _root=None): 

103 self._io = _io 

104 self._parent = _parent 

105 self._root = _root if _root else self 

106 self._read() 

107 

108 def _read(self): 

109 self.signal_id = self._io.read_u2le() 

110 self.validity_flags = OpenapiMessage.ValidityFlags(self._io, self, self._root) 

111 self.reserved = self._io.read_u2le() 

112 

113 class Interpretation(KaitaiStruct): 

114 

115 class EDescriptorType(Enum): 

116 data_type = 1 

117 scale_factor = 2 

118 offset = 3 

119 period_time = 4 

120 unit = 5 

121 vector_length = 6 

122 channel_type = 7 

123 

124 def __init__(self, _io, _parent=None, _root=None): 

125 self._io = _io 

126 self._parent = _parent 

127 self._root = _root if _root else self 

128 self._read() 

129 

130 def _read(self): 

131 self.signal_id = self._io.read_u2le() 

132 self.descriptor_type = KaitaiStream.resolve_enum( 

133 OpenapiMessage.Interpretation.EDescriptorType, self._io.read_u2le() 

134 ) 

135 self.reserved = self._io.read_u2le() 

136 self.value_length = self._io.read_u2le() 

137 _on = self.descriptor_type 

138 if _on == OpenapiMessage.Interpretation.EDescriptorType.data_type: 

139 self.value = self._io.read_u2le() 

140 elif _on == OpenapiMessage.Interpretation.EDescriptorType.scale_factor: 

141 self.value = self._io.read_f8le() 

142 elif _on == OpenapiMessage.Interpretation.EDescriptorType.unit: 

143 self.value = OpenapiMessage.String(self._io, self, self._root) 

144 elif _on == OpenapiMessage.Interpretation.EDescriptorType.vector_length: 

145 self.value = self._io.read_u2le() 

146 elif _on == OpenapiMessage.Interpretation.EDescriptorType.period_time: 

147 self.value = OpenapiMessage.Time(self._io, self, self._root) 

148 elif _on == OpenapiMessage.Interpretation.EDescriptorType.offset: 

149 self.value = self._io.read_f8le() 

150 elif _on == OpenapiMessage.Interpretation.EDescriptorType.channel_type: 

151 self.value = self._io.read_u2le() 

152 self.padding = [None] * (((4 - (self._io.pos() % 4)) & 3)) 

153 for i in range(((4 - (self._io.pos() % 4)) & 3)): 

154 self.padding[i] = self._io.read_u1() 

155 

156 class String(KaitaiStruct): 

157 def __init__(self, _io, _parent=None, _root=None): 

158 self._io = _io 

159 self._parent = _parent 

160 self._root = _root if _root else self 

161 self._read() 

162 

163 def _read(self): 

164 self.count = self._io.read_u2le() 

165 self.data = (self._io.read_bytes(self.count)).decode("UTF-8") 

166 

167 class TimeFamily(KaitaiStruct): 

168 def __init__(self, _io, _parent=None, _root=None): 

169 self._io = _io 

170 self._parent = _parent 

171 self._root = _root if _root else self 

172 self._read() 

173 

174 def _read(self): 

175 self.k = self._io.read_u1() 

176 self.l = self._io.read_u1() 

177 self.m = self._io.read_u1() 

178 self.n = self._io.read_u1() 

179 

180 class ValidityFlags(KaitaiStruct): 

181 def __init__(self, _io, _parent=None, _root=None): 

182 self._io = _io 

183 self._parent = _parent 

184 self._root = _root if _root else self 

185 self._read() 

186 

187 def _read(self): 

188 self.f = self._io.read_u2le() 

189 

190 @property 

191 def overload(self): 

192 if hasattr(self, "_m_overload"): 

193 return self._m_overload if hasattr(self, "_m_overload") else None 

194 

195 self._m_overload = (self.f & 2) != 0 

196 return self._m_overload if hasattr(self, "_m_overload") else None 

197 

198 @property 

199 def invalid(self): 

200 if hasattr(self, "_m_invalid"): 

201 return self._m_invalid if hasattr(self, "_m_invalid") else None 

202 

203 self._m_invalid = (self.f & 8) != 0 

204 return self._m_invalid if hasattr(self, "_m_invalid") else None 

205 

206 @property 

207 def overrun(self): 

208 if hasattr(self, "_m_overrun"): 

209 return self._m_overrun if hasattr(self, "_m_overrun") else None 

210 

211 self._m_overrun = (self.f & 16) != 0 

212 return self._m_overrun if hasattr(self, "_m_overrun") else None 

213 

214 class SignalData(KaitaiStruct): 

215 def __init__(self, _io, _parent=None, _root=None): 

216 self._io = _io 

217 self._parent = _parent 

218 self._root = _root if _root else self 

219 self._read() 

220 

221 def _read(self): 

222 self.number_of_signals = self._io.read_u2le() 

223 self.reserved = self._io.read_u2le() 

224 self.signals = [None] * (self.number_of_signals) 

225 for i in range(self.number_of_signals): 

226 self.signals[i] = OpenapiMessage.SignalBlock(self._io, self, self._root) 

227 

228 class Header(KaitaiStruct): 

229 

230 class EMessageType(Enum): 

231 e_signal_data = 1 

232 e_data_quality = 2 

233 e_interpretation = 8 

234 e_aux_sequence_data = 11 

235 

236 def __init__(self, _io, _parent=None, _root=None): 

237 self._io = _io 

238 self._parent = _parent 

239 self._root = _root if _root else self 

240 self._read() 

241 

242 def _read(self): 

243 self.magic = self._io.read_bytes(2) 

244 if not self.magic == b"\x42\x4b": 

245 raise kaitaistruct.ValidationNotEqualError( 

246 b"\x42\x4b", self.magic, self._io, "/types/header/seq/0" 

247 ) 

248 self.header_length = self._io.read_u2le() 

249 self.message_type = KaitaiStream.resolve_enum( 

250 OpenapiMessage.Header.EMessageType, self._io.read_u2le() 

251 ) 

252 self.reserved1 = self._io.read_u2le() 

253 self.reserved2 = self._io.read_u4le() 

254 self.time = OpenapiMessage.Time(self._io, self, self._root) 

255 self.message_length = self._io.read_u4le() 

256 

257 class SignalBlock(KaitaiStruct): 

258 def __init__(self, _io, _parent=None, _root=None): 

259 self._io = _io 

260 self._parent = _parent 

261 self._root = _root if _root else self 

262 self._read() 

263 

264 def _read(self): 

265 self.signal_id = self._io.read_s2le() 

266 self.number_of_values = self._io.read_s2le() 

267 self.values = [None] * (self.number_of_values) 

268 for i in range(self.number_of_values): 

269 self.values[i] = OpenapiMessage.Value(self._io, self, self._root) 

270 

271 class Time(KaitaiStruct): 

272 def __init__(self, _io, _parent=None, _root=None): 

273 self._io = _io 

274 self._parent = _parent 

275 self._root = _root if _root else self 

276 self._read() 

277 

278 def _read(self): 

279 self.time_family = OpenapiMessage.TimeFamily(self._io, self, self._root) 

280 self.time_count = self._io.read_u8le() 

281 

282 class Value(KaitaiStruct): 

283 def __init__(self, _io, _parent=None, _root=None): 

284 self._io = _io 

285 self._parent = _parent 

286 self._root = _root if _root else self 

287 self._read() 

288 

289 def _read(self): 

290 self.value1 = self._io.read_u1() 

291 self.value2 = self._io.read_u1() 

292 self.value3 = self._io.read_s1() 

293 

294 @property 

295 def calc_value(self): 

296 if hasattr(self, "_m_calc_value"): 

297 return self._m_calc_value if hasattr(self, "_m_calc_value") else None 

298 

299 self._m_calc_value = (self.value1 + (self.value2 << 8)) + (self.value3 << 16) 

300 return self._m_calc_value if hasattr(self, "_m_calc_value") else None