Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from glucometerutils import common, driver, exceptions
from glucometerutils.support import lifescan, lifescan_binary_protocol
# This device uses SCSI blocks as registers.
_REGISTER_SIZE = 512
_PACKET = construct.Padded(
_REGISTER_SIZE, lifescan_binary_protocol.LifeScanPacket(False)
)
_COMMAND_SUCCESS = construct.Const(b"\x03\x06")
_QUERY_REQUEST = construct.Struct(
construct.Const(b"\x03\xe6\x02"),
"selector" / construct.Enum(construct.Byte, serial=0x00, model=0x01, software=0x02),
)
_QUERY_RESPONSE = construct.Struct(
construct.Const(b"\x03\x06"), "value" / construct.CString(encoding="utf-16-le"),
)
_READ_PARAMETER_REQUEST = construct.Struct(
construct.Const(b"\x03"), "selector" / construct.Enum(construct.Byte, unit=0x04),
)
_READ_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
"unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
'midi_list' / FixedSized(_this.flags.len, GreedyRange(Struct(
'delta_time' / If(_this._index > 0, VarInt),
# The "running status" technique means multiple commands may be sent under
# the same status. This condition occurs when, after parsing the current
# commands, we see the next byte is NOT a status byte (MSB is low).
#
# Below, this is accomplished by storing the most recent status byte
# on the global context with the `* remember_last` macro; then using it
# on the `else` branch of the `command_byte` selection.
'__next' / Peek(Int8ub),
'command_byte' / IfThenElse(_this.__next & 0x80,
Byte * remember_last,
Computed(lambda ctx: ctx._root._last_command_byte)
),
'command' / If(_this.command_byte, Enum(Computed(_this.command_byte & 0xf0),
note_on=COMMAND_NOTE_ON,
note_off=COMMAND_NOTE_OFF,
aftertouch=COMMAND_AFTERTOUCH,
control_mode_change=COMMAND_CONTROL_MODE_CHANGE)),
'channel' / If(_this.command_byte, Computed(_this.command_byte & 0x0f)),
'params' / Switch(_this.command, {
'note_on': Struct(
'key' / MIDINote,
'velocity' / Int8ub,
),
'note_off': Struct(
'key' / MIDINote,
'velocity' / Int8ub,
),
'aftertouch': Struct(
),
'Speedmeter' / BitStruct(
'Speedmeter_Model' / Enum(BitsInteger(2),
EXTERNAL = 0b00,
INTERNAL = 0b01,
MOTORPHASE = 0b10
),
'Signals' / BitsInteger(6)
),
'Checksum' / Byte
)
# 53 0b 03 ff ff 64 06 14 0a 19 08 14 14 27
pedal_message = Struct(
'pedal_message' / Const(b"\x53\x0B"),
'pedal_type' / Enum(Byte,
NONE = 0x00,
DH_SENSOR_12 = 0x01,
BB_SENSOR_32 = 0x02,
DOUBLE_SIGNAL_24 = 0x03
),
'designated_assist' / Enum(Byte,
MODE_0 = 0x00,
MODE_1 = 0x01,
MODE_2 = 0x02,
MODE_3 = 0x03,
MODE_4 = 0x04,
MODE_5 = 0x05,
MODE_6 = 0x06,
MODE_7 = 0x07,
MODE_8 = 0x08,
MODE_9 = 0x09,
def _EOSFocusMode(self):
return Enum(
self._UInt32,
default=Pass,
OneShot=0,
AIServo=1,
AIFocus=2,
Manual=3,
)
def _AccessCapability(self):
'''Return desired endianness for AccessCapability'''
return Enum(
self._UInt16,
default=Pass,
ReadWrite=0x0000,
ReadOnlyWithoutObjectDeletion=0x0001,
ReadOnlyWithObjectDeletion=0x0002,
)
"""
Binary protocol for DSI; ported to construct 2.8 from the original
rsvpkeyboard app.
"""
from construct import (Array, Bytes, Const, Embedded, Enum, Float32b, Int8ub,
Int16ub, Int32ub, Optional, PascalString, Struct,
Switch, this)
DEFAULT_FS = 300
DEFAULT_CHANNELS = ['P3', 'C3', 'F3', 'Fz', 'F4', 'C4', 'P4', 'Cz',
'CM', 'A1', 'Fp1', 'Fp2', 'T3', 'T5', 'O1', 'O2',
'X3', 'X2', 'F7', 'F8', 'X1',
'A2', 'T6', 'T4', 'TRG']
header = Struct(
'magic' / Const(b'@ABCD'), # bytes 0-5
'type' / Enum(Int8ub, NULL=0, EEG_DATA=1, EVENT=5), # byte 5
'payload_length' / Int16ub, # bytes 6-7
'number' / Int32ub, # bytes 8-11
)
HEADER_LEN = 12
# TODO: Resolve the 'unknown encoding' error for the message when we import
# unicode_literals.
event = Struct(
'event_code' / Enum(Int32ub, # bytes 12-15
VERSION=1,
DATA_START=2,
DATA_STOP=3,
SENSOR_MAP=9,
DATA_RATE=10
),
def _ExposureProgramMode(self):
return Enum(
self._UInt16,
default=Pass,
IntelligentAuto=0x8000,
SuperiorAuto=0x8001,
P=0x2,
A=0x3,
S=0x4,
M=0x1,
MovieP=0x8050,
MovieA=0x8051,
MovieS=0x8052,
MovieM=0x8053,
# Mode=0x8054, # TODO: ??
Panoramic=0x8041,
Portrait=0x7,
SportsAction=0x8011,
PortmapVersion = 2
PortmapProcedure = Enum(Int32ub,
null = 0,
set = 1,
unset = 2,
getport = 3,
dump = 4,
call_result = 5
)
PortmapProtocol = Enum(Int32ub,
ip = 6,
udp = 17
)
RpcProgram = Enum(Int32ub,
portmap = 100000,
nfs = 100003,
mount = 100005
)
PortmapArgs = Struct(
"prog" / RpcProgram,
"vers" / Int32ub,
"prot" / PortmapProtocol,
"port" / Default(Int32ub, 0)
)
PortmapRes = Int32ub
NfsVersion = 2
NfsProcedure = Enum(Int32ub,
def _encode(self, obj, context):
return int(time.mktime(time.strptime(obj)))
packet_record = Struct("packet_record",
UBInt32("original_length"),
UBInt32("included_length"),
UBInt32("record_length"),
UBInt32("cumulative_drops"),
EpochTimeStampAdapter(UBInt32("timestamp_seconds")),
UBInt32("timestamp_microseconds"),
HexDumpAdapter(Field("data", lambda ctx: ctx.included_length)),
# 24 being the static length of the packet_record header
Padding(lambda ctx: ctx.record_length - ctx.included_length - 24),
)
datalink_type = Enum(UBInt32("datalink"),
IEEE802dot3 = 0,
IEEE802dot4 = 1,
IEEE802dot5 = 2,
IEEE802dot6 = 3,
ETHERNET = 4,
HDLC = 5,
CHARSYNC = 6,
IBMCHANNEL = 7,
FDDI = 8,
OTHER = 9,
UNASSIGNED = 10,
)
snoop_file = Struct("snoop",
Magic("snoop\x00\x00\x00"),
UBInt32("version"), # snoop v1 is deprecated
def __setup_constructors(self):
'''Set endianness and create transport-specific constructors.'''
# Set endianness of constructors before using them.
self._set_endian('little')
self.__Length = Int32ul
self.__Type = Enum(
Int16ul,
default=Pass,
Undefined=0x0000,
Command=0x0001,
Data=0x0002,
Response=0x0003,
Event=0x0004,
)
# This is just a convenience constructor to get the size of a header.
self.__Code = Int16ul
self.__Header = Struct(
'Length' / self.__Length,
'Type' / self.__Type,
'Code' / self.__Code,
'TransactionID' / self._TransactionID,
)