Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testNotAligned(self):
a = CBS('0b00111001001010011011')
a.pos = 1
self.assertEqual(a.readto('0b00'), '0b011100')
self.assertEqual(a.readto('0b110'), '0b10010100110')
self.assertRaises(ValueError, a.readto, '')
def test_afi_0_without_prefixlen(self):
'''
Test en/decoding of empty AFI addresses
'''
afi_address_hex = '0000'
bitstream = ConstBitStream(hex=afi_address_hex)
address = afi.read_afi_address_from_bitstream(bitstream)
self.assertIsNone(address, 'wrong address')
self.assertEqual(bitstream.pos, bitstream.len,
'unprocessed bits remaining in bitstream')
new_bitstream = afi.get_bitstream_for_afi_address(address)
self.assertEqual(new_bitstream.tobytes(), bitstream.tobytes())
def getbits_bit(f):
# bit w/ header
buff = f.read()
buff = buff[0x76:]
return bitstring.ConstBitStream(bytes=buff)
def from_bytes(cls, bitstream, decode_payload=True):
'''
Parse the given packet and update properties accordingly
'''
packet = cls()
# Convert to ConstBitStream (if not already provided)
if not isinstance(bitstream, ConstBitStream):
if isinstance(bitstream, Bits):
bitstream = ConstBitStream(auto=bitstream)
else:
bitstream = ConstBitStream(bytes=bitstream)
# Read the version
version = bitstream.read('uint:4')
if version != packet.version:
raise ValueError('Provided bytes do not contain an IPv6 packet')
# Read the traffic class
packet.traffic_class = bitstream.read('uint:8')
# Read the flow label
packet.flow_label = bitstream.read('uint:20')
def load_entries_to_memory(self):
file_name = 'raw_data/etl2_entries.obj'
try:
file_handler = open(file_name, 'rb')
entries = pickle.load(file_handler)
print('restored pickled etl2 data')
return entries
except:
print('processing raw etl2 data')
entries = []
image_size = (60, 60)
bits_per_pixel = 6
for file_directory, num_items in self.files:
file = bitstring.ConstBitStream(filename=file_directory)
# loop through the items in each file
for item_index in range(num_items):
file.pos = item_index * 6 * 3660
item_data = file.readlist('int:36,uint:6,pad:30,6*uint:6,6*uint:6,pad:24,2*uint:6,pad:180,bytes:2700')
# specifications about each item's data
# http://etlcdb.db.aist.go.jp/?page_id=1721
# 0 -> serial index
# 1 -> source, in T56
# 2:8 -> name of type of character, kanji, kana, in T56
# 8:14 -> name of font type, in T56
# 14:16 -> label
# 16 -> image bits
# print item_data[0], T56(r[1]), "".join(map(T56, item_data[2:8])), "".join(map(T56, r[8:14])), CO59[tuple(r[14:16])])
def from_bytes(cls, bitstream):
'''
Parse the given packet and update properties accordingly
'''
packet = cls()
# Convert to ConstBitStream (if not already provided)
if not isinstance(bitstream, ConstBitStream):
if isinstance(bitstream, Bits):
bitstream = ConstBitStream(auto=bitstream)
else:
bitstream = ConstBitStream(bytes=bitstream)
# Read the source and destination ports
(packet.source_port,
packet.destination_port) = bitstream.readlist('2*uint:16')
# Store the length
length = bitstream.read('uint:16')
if length < 8:
raise ValueError('Invalid UDP length')
# Read the checksum
packet.checksum = bitstream.read('uint:16')
def bitarray_to_u5(barr):
assert barr.len % 5 == 0
ret = []
s = bitstring.ConstBitStream(barr)
while s.pos != s.len:
ret.append(s.read(5).uint)
return ret
def from_bytes(cls, bitstream):
packet = cls()
# Convert to ConstBitStream (if not already provided)
if not isinstance(bitstream, ConstBitStream):
if isinstance(bitstream, Bits):
bitstream = ConstBitStream(auto=bitstream)
else:
bitstream = ConstBitStream(bytes=bitstream)
# Read the next header type
packet.next_header = bitstream.read('uint:8')
# Read the header length, given in multiples of 8 octets
header_length = bitstream.read('uint:8') + 1
# Read the options
options_length = (header_length * 8) - 2
packet.options = bitstream.read('bytes:%d' % options_length)
# And the rest is payload
remaining = bitstream[bitstream.pos:]
packet.payload = remaining.bytes
payload_class = protocol_registry.get_type_class(packet.next_header)
def move(self, ra, dec):
msize = '0x1800'
mtype = '0x0000'
aux_format_str = 'int:64=%r' % time()
localtime = ConstBitStream(aux_format_str.replace('.', ''))
sdata = ConstBitStream(msize) + ConstBitStream(mtype)
sdata += ConstBitStream(intle=localtime.intle, length=64) + ConstBitStream(uintle=ra, length=32)
sdata += ConstBitStream(intle=dec, length=32) + ConstBitStream(intle=0, length=32)
self.buffer = sdata
self.is_writable = True
self.handle_write()
def _get_file():
return ConstBitStream(filename=_Strings.filename)