Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def extended_header_2_test():
BIN = construct.Struct("BIN",
construct.UBInt32("test_analysis_code"),
construct.UBInt32(
"first_test_oscillator_attenuation"),
construct.UBInt32(
"second_test_oscillator_attenuation"),
construct.UBInt32("start_delay_usec"),
# 00 - No filter, 01 - Apply filter
construct.UBInt32("dc_filter_flag"),
construct.BFloat32("dc_filter_frequency"),
# See page 9 of format spec
construct.UBInt32("preamp_path"),
construct.UBInt32("test_oscillator_signal_type"))
return BIN
def extended_header_3_test():
if sys.byteorder == 'big':
pass
BIN = construct.Struct("BIN",
construct.UBInt32("test_signal_type"),
construct.UBInt32("test_signal_frequency_1"),
construct.UBInt32("test_signal_frequency_2"),
construct.UBInt32("test_signal_amplitude_1"),
construct.UBInt32("test_signal_amplitude_2"),
construct.BFloat32("test_signal_duty_cycle"),
construct.UBInt32("test_signal_active_duration"),
construct.UBInt32("test_signal_active_time")
)
return BIN
def extended_header_2_test():
BIN = construct.Struct("BIN",
construct.UBInt32("test_analysis_code"),
construct.UBInt32(
"first_test_oscillator_attenuation"),
construct.UBInt32(
"second_test_oscillator_attenuation"),
construct.UBInt32("start_delay_usec"),
# 00 - No filter, 01 - Apply filter
construct.UBInt32("dc_filter_flag"),
construct.BFloat32("dc_filter_frequency"),
# See page 9 of format spec
construct.UBInt32("preamp_path"),
construct.UBInt32("test_oscillator_signal_type"))
return BIN
def extended_header_3_test():
if sys.byteorder == 'big':
pass
BIN = construct.Struct("BIN",
construct.UBInt32("test_signal_type"),
construct.UBInt32("test_signal_frequency_1"),
construct.UBInt32("test_signal_frequency_2"),
construct.UBInt32("test_signal_amplitude_1"),
construct.UBInt32("test_signal_amplitude_2"),
construct.BFloat32("test_signal_duty_cycle"),
construct.UBInt32("test_signal_active_duration"),
construct.UBInt32("test_signal_active_time")
)
return BIN
def external_header():
swap = True
if sys.byteorder == 'big':
swap = False
BIN = construct.Struct("BIN",
construct.UBInt32("size"),
construct.UBInt32("receiver_line"),
construct.UBInt32("receiver_point"),
construct.UBInt8("receiver_point_index"),
construct.BitField("reserved01", 18, swapped=swap)
)
return BIN
def __init__(self, u):
self.adapters = Struct("adapters",
UBInt32("count"),
Array(lambda c: c.count,
Struct("adapter",
UBInt32("index"),
Bytes("MAC", 6)))
).parse(u.get_buffer())
file_header_struct.number_of_pages))
print(u'')
page_sizes_data_size = file_header_struct.number_of_pages * 4
page_sizes_data = self._file_object.read(page_sizes_data_size)
if self._debug:
print(u'Page sizes data:')
print(hexdump.Hexdump(page_sizes_data))
try:
page_sizes_array = construct.Array(
file_header_struct.number_of_pages,
construct.UBInt32(u'page_sizes')).parse(page_sizes_data)
except construct.FieldError as exception:
raise IOError(
u'Unable to parse page sizes array with error: {0:s}'.format(
exception))
self._page_sizes = []
for page_index in range(file_header_struct.number_of_pages):
self._page_sizes.append(page_sizes_array[page_index])
if self._debug:
print(u'Page: {0:d} size\t\t\t\t\t\t\t: {1:d}'.format(
page_index, page_sizes_array[page_index]))
if self._debug:
print(u'')
# Bit 3 -- SVSM VLFF error
# Bit 4 -- Invalid Receiver Line/Station
# Bit 5 -- Trace was Zero filled (T2 only)
# Bit 6 -- Battery improperly removed
# Bit 7 -- SVSM Dynamic Offset Filter mode,
# 0 = static
# BIT test type and codes (0 - 28) See FireFly SEG
# Y Ver 3.0 Tech Bulletin
construct.UBInt8("BITTest"),
# Sweep Phase Rotation; 0 if undefined
construct.UBInt16("SweepPhaseRot"),
construct.UBInt8("unass03"), # Unassigned
construct.UBInt8("BoxFun"), # Box function
# Source effort used to generate the trace
# (mantissa)
construct.UBInt32("SourceEffortM"),
# Source effort, (exponent)
construct.UBInt16("SourceEffortE"),
# Source measurement units
construct.UBInt16("SourceUnits"),
# -1 -- Other
# 0 -- Unknown
# 1 -- Joule
# 2 -- Kilowatt
# 3 -- Pascal
# 4 -- Bar
# 5 -- Bar-meter
# 6 -- Kilograms
# 7 -- Pounds
construct.UBInt8("EventType"), # Event type:
# 0x00 -- Zeroed or truncated trace
# 0x40 -- BIT data - Raw Trace
def extended_header_3():
swap = True
if sys.byteorder == 'big':
swap = False
BIN = construct.Struct("BIN",
construct.UBInt32("line_number"),
construct.UBInt32("receiver_point"),
construct.UBInt8("point_index"),
construct.UBInt32("first_shot_line"),
construct.UBInt32("first_shot_point"),
construct.UBInt8("first_shot_point_index"),
construct.UBInt32("last_shot_line"),
construct.UBInt32("last_shot_point"),
construct.UBInt8("last_shot_point_index"),
construct.BitField("reserved01", 5, swapped=swap)
)
return BIN
)
class EpochTimeStampAdapter(Adapter):
""" Convert epoch timestamp <-> localtime """
def _decode(self, obj, context):
return time.ctime(obj)
def _encode(self, obj, context):
return int(time.mktime(time.strptime(obj)))
packet_record = Struct("packet_record",
UBInt32("original_length"),
UBInt32("included_length"),
UBInt32("record_length"),
UBInt32("cumulative_drops"),
EpochTimeStampAdapter(UBInt32("timestamp_seconds")),
UBInt32("timestamp_microseconds"),
HexDumpAdapter(Field("data", lambda ctx: ctx.included_length)),
# 24 being the static length of the packet_record header
Padding(lambda ctx: ctx.record_length - ctx.included_length - 24),
)
datalink_type = Enum(UBInt32("datalink"),
IEEE802dot3 = 0,
IEEE802dot4 = 1,
IEEE802dot5 = 2,
IEEE802dot6 = 3,
ETHERNET = 4,
HDLC = 5,
CHARSYNC = 6,
IBMCHANNEL = 7,
FDDI = 8,