Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_READ_RECORD_COUNT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS, "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
construct.Const(b"\x03\x21"), "record_id" / construct.Int16ul,
)
_MEAL_FLAG = {
common.Meal.NONE: 0x00,
common.Meal.BEFORE: 0x01,
common.Meal.AFTER: 0x02,
}
_READING_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
"timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
"value" / construct.Int16ul,
"control_test" / construct.Flag,
"meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
construct.Padding(2), # unknown
)
class Device(serial.SerialDevice, driver.GlucometerDevice):
BAUDRATE = 38400
DEFAULT_CABLE_ID = "10c4:85a7" # Specific ID for embedded cp210x
TIMEOUT = 0.5
def __init__(self, device: Optional[str]) -> None:
super().__init__(device)
UBInt8("unknown2"),
),
2: Struct("handshake",
AlphaString("username"),
),
3: Struct("chat",
AlphaString("message"),
),
4: Struct("time",
UBInt64("timestamp"),
),
5: Struct("inventory",
SBInt32("unknown1"),
UBInt16("length"),
MetaArray(lambda context: context["length"],
Struct("items",
SBInt16("id"),
If(lambda context: context["id"] >= 0,
Embed(Struct("item_information",
UBInt8("count"),
UBInt16("damage"),
)),
),
),
),
),
6: Struct("spawn",
SBInt32("x"),
SBInt32("y"),
SBInt32("z"),
),
10: flying,
construct.ULInt32("filesize"),
construct.Bytes("dos_date", 2),
construct.Bytes("dos_time", 2),
construct.ULInt16("file_attr"),
construct.CString("filename")
)
parse_res = itempos_struct.parse_stream(itempos_io)
if itempos_io.pos % 2 == 1:
itempos_io.read(1)
ext_struct = construct.Struct("ext",
construct.ULInt16("ext_size"),
construct.ULInt16("ext_version")
)
parse_ext = ext_struct.parse_stream(itempos_io)
if parse_ext["ext_version"] >= 0x3:
itempos2_struct = construct.Struct("itempos2",
construct.Padding(2), # 0004
construct.Padding(2), # BEEF
construct.Bytes("creation_dos_date", 2),
construct.Bytes("creation_dos_time", 2),
construct.Bytes("access_dos_date", 2),
construct.Bytes("access_dos_time", 2),
construct.Padding(4)
)
parse_res2 = itempos2_struct.parse_stream(itempos_io)
unicode_filename = ""
if parse_ext["ext_version"] >= 0x7:
itempos3_struct = construct.Struct("itempos3",
construct.ULInt64("file_ref"),
construct.Padding(8),
construct.Padding(2)
)
omegadot : double
Rate of right ascension
w : double
Argument of perigee
inc : double
Inclination
af0 : double
Polynomial clock correction coefficient (clock bias)
af1 : double
Polynomial clock correction coefficient (clock drift)
sender : int
Optional sender ID, defaults to SENDER_ID (see sbp/msg.py).
"""
_parser = construct.Struct(
'common' / construct.Struct(AlmanacCommonContent._parser),
'm0' / construct.Float64l,
'ecc' / construct.Float64l,
'sqrta' / construct.Float64l,
'omega0' / construct.Float64l,
'omegadot' / construct.Float64l,
'w' / construct.Float64l,
'inc' / construct.Float64l,
'af0' / construct.Float64l,
'af1' / construct.Float64l,)
__slots__ = [
'common',
'm0',
'ecc',
'sqrta',
'omega0',
'omegadot',
Please see ICD-GPS-200 (Chapter 20.3.3.5.1.4) for more details.
Parameters
----------
sbp : SBP
SBP parent object to inherit from.
t_nmct : GPSTimeSec
Navigation Message Correction Table Valitidy Time
l2c_mask : int
L2C capability mask, SV32 bit being MSB, SV1 bit being LSB
sender : int
Optional sender ID, defaults to SENDER_ID (see sbp/msg.py).
"""
_parser = construct.Struct(
't_nmct' / construct.Struct(GPSTimeSec._parser),
'l2c_mask' / construct.Int32ul,)
__slots__ = [
't_nmct',
'l2c_mask',
]
def __init__(self, sbp=None, **kwargs):
if sbp:
super( MsgSvConfigurationGPS,
self).__init__(sbp.msg_type, sbp.sender, sbp.length,
sbp.payload, sbp.crc)
self.from_binary(sbp.payload)
else:
super( MsgSvConfigurationGPS, self).__init__()
self.msg_type = SBP_MSG_SV_CONFIGURATION_GPS
PT_VAP_STATUS_REQUEST = 0x14
PT_SET_TX_POLICY = 0x15
PT_DEL_TX_POLICY = 0x16
PT_TX_POLICY_STATUS_RESPONSE = 0x17
PT_TX_POLICY_STATUS_REQUEST = 0x18
PT_SET_SLICE = 0x19
PT_DEL_SLICE = 0x1A
PT_SLICE_STATUS_RESPONSE = 0x1B
PT_SLICE_STATUS_REQUEST = 0x1C
PT_IGMP_REPORT = 0xE0
PT_INCOMING_MCAST_ADDR = 0xE1
HEADER = Struct(
"version" / Int8ub,
"type" / Int8ub,
"length" / Int32ub,
"seq" / Int32ub,
"xid" / Int32ub,
"device" / Bytes(6),
)
HEADER.name = "header"
HELLO_REQUEST = Struct(
"version" / Int8ub,
"type" / Int8ub,
"length" / Int32ub,
"seq" / Int32ub,
"xid" / Int32ub,
"device" / Bytes(6),
"size" / Int16ul,
"flags" / Int16ul,
"version" / Int32ul
)
filename_table = Struct(
# move to start of filename table
Seek(this._.data_start_offset + this._.filename_table_offset, 0),
# here we can find offset table: (total_files * 8 byte) size
# but we cheat, just read total_files * cstrings
"first_filename_offset" / Int32ul,
Seek(this._.data_start_offset + this.first_filename_offset),
"filenames" / Array(this._.files_count, CString(encoding="utf8"))
)
file_data_record = Struct(
"file_data_offset" / Int32ul,
"file_data_offset" / Computed(this.file_data_offset + this._._.data_start_offset),
"file_data_size" / Int32ul,
"unknown" / Hex(Bytes(8)),
"next_file_data_record" / Tell,
# move to file data offset
Seek(this.file_data_offset, 0),
# read file data
"data" / Bytes(this.file_data_size),
# return to next file data record in table
Seek(this.next_file_data_record, 0)
)
file_data_table = Struct(
# move to location of file data table offset
Seek(this._.data_start_offset + this._.filedata_table_offset, 0),
Padding(16),
"u7" / Default(Int32ub, 0),
"u8" / Default(Int32ub, 0),
"u9" / Default(Int32ub, 0),
Padding(8)
),
"load_cmd_reply": Struct(
Padding(2)
),
"link_query": Struct(
Padding(3),
"remote_player_number" / Int8ub,
Padding(3),
"slot" / PlayerSlot
),
"link_reply": Struct(
Padding(3),
"source_player_number" / Int8ub,
Padding(3),
"slot" / PlayerSlot,
"name" / PaddedString(64, encoding="utf-16-be"),
"date" / PaddedString(24, encoding="utf-16-be"),
"u5" / PaddedString(32, encoding="utf-16-be"), # "1000" as string? model?
"track_count" / Int32ub,
"u6" / Default(Int16ub, 0), # also seen 0x200
"u7" / Default(Int16ub, 0x101),
"playlist_count" / Int32ub,
"bytes_total" / Int64ub,
"bytes_free" / Int64ub
),
"rekordbox_hello": Pass,
"rekordbox_reply": Struct(
ProtocolVersion,
Random,
SessionID,
Bytes("cipher_suite", 2),
UBInt8("compression_method"),
Extensions,
)
ClientCertificateType = Struct(
"certificate_types",
SizeAtLeast(UBInt8("length"), min_size=1),
Array(lambda ctx: ctx.length, UBInt8("certificate_types")),
)
DistinguishedName = Struct(
"certificate_authorities",
UBInt16("length"),
Bytes("certificate_authorities", lambda ctx: ctx.length),
)
CertificateRequest = Struct(
"CertificateRequest",
ClientCertificateType,
SupportedSignatureAlgorithms,
DistinguishedName,
)
ServerDHParams = Struct(
"ServerDHParams",
PrefixedBytes("dh_p", UBInt16("dh_p_length")),
PrefixedBytes("dh_g", UBInt16("dh_g_length")),
construct.String("ChannelName", 10),
construct.String("Azimuth", 10),
construct.String("Inclination", 10),
construct.String("XCoordinate", 10),
construct.String("YCoordinate", 10),
construct.String("ZCoordinate", 10),
construct.String("XYUnits", 4),
construct.String("ZUnits", 4),
construct.String("PreampGain", 4),
construct.String("SensorModel", 12),
construct.String(
"SensorSerial", 12),
construct.String("Comments", 40),
construct.String(
"AdjustedNominalBitWeight", 8)),
construct.Struct("ChanInfo3",
construct.String("Channel", 2),
construct.String("ChannelName", 10),
construct.String("Azimuth", 10),
construct.String("Inclination", 10),
construct.String("XCoordinate", 10),
construct.String("YCoordinate", 10),
construct.String("ZCoordinate", 10),
construct.String("XYUnits", 4),
construct.String("ZUnits", 4),
construct.String("PreampGain", 4),
construct.String("SensorModel", 12),
construct.String(
"SensorSerial", 12),
construct.String("Comments", 40),
construct.String(
"AdjustedNominalBitWeight", 8)),