Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from glucometerutils import common, driver
from glucometerutils.support import (
construct_extras,
lifescan,
lifescan_binary_protocol,
serial,
)
_PACKET = lifescan_binary_protocol.LifeScanPacket(True)
_INVALID_RECORD = 501
_COMMAND_SUCCESS = construct.Const(b"\x05\x06")
_VERSION_REQUEST = construct.Const(b"\x05\x0d\x02")
_VERSION_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
"version" / construct.PascalString(construct.Byte, encoding="ascii"),
)
_SERIAL_NUMBER_REQUEST = construct.Const(
b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
_SERIAL_NUMBER_RESPONSE = construct.Struct(
_COMMAND_SUCCESS, "serial_number" / construct.GreedyString(encoding="ascii"),
)
_DATETIME_REQUEST = construct.Struct(
construct.Const(b"\x05\x20"), # 0x20 is the datetime
"month" / Int8ub,
"day" / Int8ub,
"hour" / Int8ub,
"minute" / Int8ub,
"_not_used1" / Padding(23),
"source_id" / Default(CommunicationSourceIDEnum, 1),
"user_high" / Default(Int8ub, 0),
"user_low" / Default(Int8ub, 0),
)),
"checksum" / PacketChecksum(Bytes(1)))
SetTimeDateResponse = Struct(
"fields" / RawCopy(
Struct(
"po" / BitStruct(
"command" / Const(0x3, Nibble),
"status" / Struct(
"reserved" / Flag,
"alarm_reporting_pending" / Flag,
"Winload_connected" / Flag,
"NeWare_connected" / Flag)),
"_not_used0" / Padding(35),
)),
"checksum" / PacketChecksum(Bytes(1)))
PerformAction = Struct(
"fields" / RawCopy(
Struct(
"po" / Struct(
"command" / Const(0x40, Int8ub)),
"_not_used0" / Padding(1),
"action" / Enum(
_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85")
_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15")
_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14")
_FREESTYLE_MESSAGE = construct.Struct(
"hid_report" / construct.Const(0, construct.Byte),
"message_type" / construct.Byte,
"command"
/ construct.Padded(
63, # command can only be up to 62 bytes, but one is used for length.
construct.Prefixed(construct.Byte, construct.GreedyBytes),
),
)
_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct(
"hid_report" / construct.Const(0, construct.Byte),
"message_type" / construct.Byte,
"command"
/ construct.Padded(
63, # command can only be up to 62 bytes, but one is used for length.
construct.GreedyBytes,
),
)
_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)")
_TEXT_REPLY_FORMAT = re.compile(
b"^(?P.*)CKSM:(?P[0-9A-F]{8})\r\n"
b"CMD (?POK|Fail!)\r\n$",
re.DOTALL,
)
_MULTIRECORDS_FORMAT = re.compile(
),
"dwFourCC" / Default(Int32ul, 0),
"dwRGBBitCount" / Default(Int32ul, 0),
"dwRBitMask" / Default(Int32ul, 0),
"dwGBitMask" / Default(Int32ul, 0),
"dwBBitMask" / Default(Int32ul, 0),
"dwABitMask" / Default(Int32ul, 0),
)
"""The ``DDS_PIXELFORMAT`` structure.
**Reference**:
`Microsoft `__
"""
DDS_HEADER = Struct(
"dwSize" / Const(124, Int32ul),
"dwFlags"
/ FlagsEnum(
Int32ul,
DDSD_CAPS=0x00000001,
DDSD_HEIGHT=0x00000002,
DDSD_WIDTH=0x00000004,
DDSD_PITCH=0x00000008,
DDSD_PIXELFORMAT=0x00001000,
DDSD_MIPMAPCOUNT=0x00020000,
DDSD_LINEARSIZE=0x00080000,
DDSD_DEPTH=0x00800000,
),
"dwHeight" / Int32ul,
"dwWidth" / Int32ul,
"dwPitchOrLinearSize" / Int32ul,
"dwDepth" / Default(Int32ul, 0),
AnlzTagQuantize2 = Struct(
Padding(4),
"u1" / Const(0x01000002, Int32ub), # maybe this encodes the count of "bpm" objects below
Padding(4),
"bpm" / Array(2, AnlzQuantizeTick),
"entry_count" / Int32ub, # 680
"u3" / Int32ub,
"u4" / Int32ub,
"u5" / Int32ub,
"u6" / Int32ub,
Padding(8)
)
AnlzTagWaveform = Struct(
"payload_size" / Int32ub, # is 0 for some tag types
"unknown" / Const(0x10000, Int32ub),
"entries" / Array(this.payload_size, Int8ub)
)
AnlzTagBigWaveform = Struct(
"u1" / Const(1, Int32ub),
"payload_size" / Int32ub,
"u2" / Const(0x960000, Int32ub),
"entries" / Array(this.payload_size, Int8ub)
)
AnlzTagColorWaveform = Struct(
"payload_word_size" / Const(0x06, Int32ub),
"payload_size" / Int32ub,
"unknown" / Const(0x00, Int32ub),
"entries" / Array(this.payload_word_size * this.payload_size, Int8sb), # See doubts about signed in ColorPreviewWaveformWidget
)
AnlzTagColorBigWaveform = Struct(
"payload_word_size" / Const(0x02, Int32ub),
) -> construct.Struct: # pylint: disable=invalid-name
if include_link_control:
link_control_construct = _LINK_CONTROL
else:
link_control_construct = construct.Const(b"\x00")
return construct.Struct(
"data"
/ construct.RawCopy(
construct.Struct(
construct.Const(b"\x02"), # stx
"length"
/ construct.Rebuild(construct.Byte, lambda this: len(this.message) + 6),
"link_control" / link_control_construct,
"message" / construct.Bytes(lambda this: this.length - 6),
construct.Const(b"\x03"), # etx
),
),
"checksum"
/ construct.Checksum(
construct.Int16ul, lifescan.crc_ccitt, construct.this.data.data
),
def InfiniPacket(name, identifier, subconstruct):
"""
Common header structure for packets.
This is possibly not the best way to go about building these kinds of
things.
"""
header = Struct("header",
# XXX Should this be Magic(chr(identifier))?
Const(UBInt8("identifier"), identifier),
UBInt8("flags"),
UBInt32("length"),
)
return Struct(name, header, subconstruct)
_DATETIME_STRUCT = construct.Struct(
"day" / construct.Int16ul, "minute" / construct.Byte, "hour" / construct.Byte,
)
_DAY_BITSTRUCT = construct.BitStruct(
"year" / construct.BitsInteger(7),
"month" / construct.BitsInteger(4),
"day" / construct.BitsInteger(5),
)
_READING_COUNT_STRUCT = construct.Struct(
"count" / construct.Int16ul, construct.Int16ul,
)
_READING_SELECTION_STRUCT = construct.Struct(
"record_id" / construct.Int16ul, construct.Const(b"\x00\x00"),
)
_MEAL_FLAG = {
common.Meal.NONE: 0x00,
common.Meal.BEFORE: 0x40,
common.Meal.AFTER: 0x80,
}
_READING_VALUE_STRUCT = construct.Struct(
"value" / construct.Int16ul,
construct.Const(b"\x06"),
"meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
)
def _make_packet(
from .artist import Artist
from .album import Album
from .playlist import Playlist
from .playlist_map import PlaylistMap
from .artwork import Artwork
from .color import Color
from .genre import Genre
from .key import Key
from .label import Label
# a strange page exists for every (?) page type, header.u9 is 1004 and page is filled with 0xf8ffff1f
StrangePage = Struct(
"strange_header" / Struct(
"index" / Int32ul, # page index (same as header?)
"next_index" / Int32ul, # index of next page containing real data or 0x3ffffff if next page empty
Const(0x3fffffff, Int32ul),
Padding(4),
"entry_count" / Int16ul, # number of 4-byte values
"u2" / Int16ul, # always 8191?
),
Array(1004, Int32ul),
Padding(20)
)
ReverseIndexedEntry = FocusedSeq("entry",
"entry_offset" / Int16ul,
"entry" / Pointer(this._._.entries_start+this.entry_offset,
Switch(lambda ctx: "strange" if ctx._._.is_strange_page else ctx._._.page_type, {
"block_tracks": Track,
"block_artists": Artist,
"block_albums": Album,
"block_playlists": Playlist,
_GLUCOSE_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
"unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
_MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A")
_READING_COUNT_RESPONSE = construct.Struct(
construct.Const(b"\x0f"), "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
construct.Const(b"\x05\x1f"), "record_id" / construct.Int16ul,
)
_READING_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
"timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore
"value" / construct.Int32ul,
)
def _make_packet(
message: bytes,
sequence_number: int,
expect_receive: bool,
acknowledge: bool,
disconnect: bool,
):