Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"ec_curves",
UBInt16("ec_curves_length"),
Array(lambda ctx: ctx.ec_curves_length // 2, UBInt16("named_curves"))
)
ServerName = Struct(
"server_name",
UBInt16("server_name_list_length"),
UBInt8("name_type"),
UBInt16("server_name_length"),
Bytes("server_name", lambda ctx: ctx.server_name_length)
)
Extension = Struct(
"extension",
UBInt16("extension_type"),
UBInt16("extension_length"),
Switch("extension_struct", lambda ctx: ctx.extension_type, {
0: ServerName,
10: ECCurves,
11: ECPointFormat
})
def test_deep_merge():
cs = [Container(a="a"), Container(c="a", d="b"), Container(e="c")]
result = deep_merge(*cs)
assert dict(a="a", c="a", d="b", e="c") == result
super(ServiceASTRM, s).__init__()
s.header_base = construct.BitStruct('ASTRMBaseHeader',
construct.BitField('fmt', 3),
construct.Bit('channel'),
construct.Flag('vibrate'),
construct.Bit('packet_type'),
construct.BitField('seq_id', 10),
construct.BitField('payload_size', 16)
)
s.header_aud = construct.Struct('ASTRMAudioHeader',
construct.ULInt32('timestamp'),
# construct.Array(lambda ctx: ctx.payload_size, construct.UBInt8("data"))
)
s.header_msg = construct.Struct('ASTRMMsgHeader',
# This is kind of a hack, (there are two timestamp fields, which one is used depends on packet_type
construct.ULInt32('timestamp_audio'),
construct.ULInt32('timestamp'),
construct.Array(2, construct.ULInt32('freq_0')), # -> mc_video
construct.Array(2, construct.ULInt32('freq_1')), # -> mc_sync
construct.ULInt8('vid_format'),
construct.Padding(3)
)
s.header = construct.Struct('ASTRMHeader',
construct.Embed(s.header_base),
construct.Switch('format_hdr', lambda ctx: ctx.packet_type,
{
0 : construct.Embed(s.header_aud),
1 : construct.Embed(s.header_msg),
},
default = construct.Pass
)
)
def extended_header_2_test():
BIN = construct.Struct("BIN",
construct.UBInt32("test_analysis_code"),
construct.UBInt32(
"first_test_oscillator_attenuation"),
construct.UBInt32(
"second_test_oscillator_attenuation"),
construct.UBInt32("start_delay_usec"),
# 00 - No filter, 01 - Apply filter
construct.UBInt32("dc_filter_flag"),
construct.BFloat32("dc_filter_frequency"),
# See page 9 of format spec
construct.UBInt32("preamp_path"),
construct.UBInt32("test_oscillator_signal_type"))
return BIN
def extended_header_3_test():
if sys.byteorder == 'big':
pass
BIN = construct.Struct("BIN",
construct.UBInt32("test_signal_type"),
construct.UBInt32("test_signal_frequency_1"),
construct.UBInt32("test_signal_frequency_2"),
construct.UBInt32("test_signal_amplitude_1"),
construct.UBInt32("test_signal_amplitude_2"),
construct.BFloat32("test_signal_duty_cycle"),
construct.UBInt32("test_signal_active_duration"),
construct.UBInt32("test_signal_active_time")
)
return BIN
def extended_header_2_test():
BIN = construct.Struct("BIN",
construct.UBInt32("test_analysis_code"),
construct.UBInt32(
"first_test_oscillator_attenuation"),
construct.UBInt32(
"second_test_oscillator_attenuation"),
construct.UBInt32("start_delay_usec"),
# 00 - No filter, 01 - Apply filter
construct.UBInt32("dc_filter_flag"),
construct.BFloat32("dc_filter_frequency"),
# See page 9 of format spec
construct.UBInt32("preamp_path"),
construct.UBInt32("test_oscillator_signal_type"))
return BIN
def extended_header_3_test():
if sys.byteorder == 'big':
pass
BIN = construct.Struct("BIN",
construct.UBInt32("test_signal_type"),
construct.UBInt32("test_signal_frequency_1"),
construct.UBInt32("test_signal_frequency_2"),
construct.UBInt32("test_signal_amplitude_1"),
construct.UBInt32("test_signal_amplitude_2"),
construct.BFloat32("test_signal_duty_cycle"),
construct.UBInt32("test_signal_active_duration"),
construct.UBInt32("test_signal_active_time")
)
return BIN
_READ_RECORD_COUNT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS, "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
construct.Const(b"\x03\x21"), "record_id" / construct.Int16ul,
)
_MEAL_FLAG = {
common.Meal.NONE: 0x00,
common.Meal.BEFORE: 0x01,
common.Meal.AFTER: 0x02,
}
_READING_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
"timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
"value" / construct.Int16ul,
"control_test" / construct.Flag,
"meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
construct.Padding(2), # unknown
)
class Device(serial.SerialDevice, driver.GlucometerDevice):
BAUDRATE = 38400
DEFAULT_CABLE_ID = "10c4:85a7" # Specific ID for embedded cp210x
TIMEOUT = 0.5
def __init__(self, device: Optional[str]) -> None:
super().__init__(device)
UBInt8("unknown2"),
),
2: Struct("handshake",
AlphaString("username"),
),
3: Struct("chat",
AlphaString("message"),
),
4: Struct("time",
UBInt64("timestamp"),
),
5: Struct("inventory",
SBInt32("unknown1"),
UBInt16("length"),
MetaArray(lambda context: context["length"],
Struct("items",
SBInt16("id"),
If(lambda context: context["id"] >= 0,
Embed(Struct("item_information",
UBInt8("count"),
UBInt16("damage"),
)),
),
),
),
),
6: Struct("spawn",
SBInt32("x"),
SBInt32("y"),
SBInt32("z"),
),
10: flying,
construct.ULInt32("filesize"),
construct.Bytes("dos_date", 2),
construct.Bytes("dos_time", 2),
construct.ULInt16("file_attr"),
construct.CString("filename")
)
parse_res = itempos_struct.parse_stream(itempos_io)
if itempos_io.pos % 2 == 1:
itempos_io.read(1)
ext_struct = construct.Struct("ext",
construct.ULInt16("ext_size"),
construct.ULInt16("ext_version")
)
parse_ext = ext_struct.parse_stream(itempos_io)
if parse_ext["ext_version"] >= 0x3:
itempos2_struct = construct.Struct("itempos2",
construct.Padding(2), # 0004
construct.Padding(2), # BEEF
construct.Bytes("creation_dos_date", 2),
construct.Bytes("creation_dos_time", 2),
construct.Bytes("access_dos_date", 2),
construct.Bytes("access_dos_time", 2),
construct.Padding(4)
)
parse_res2 = itempos2_struct.parse_stream(itempos_io)
unicode_filename = ""
if parse_ext["ext_version"] >= 0x7:
itempos3_struct = construct.Struct("itempos3",
construct.ULInt64("file_ref"),
construct.Padding(8),
construct.Padding(2)
)