Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_round_trip(self):
"""
Deserialising a message and serialising it again results in the same
binary message.
"""
header = b"\x82\x10"
good = (b"\x00\x01\x00\x0b\x66\x6f\x6f\x2f\x62\x61\x72\x2f\x62\x61"
b"\x7a\x00")
event = Subscribe.deserialise((False, False, True, False),
BitStream(bytes=good))
self.assertEqual(event.serialise(), header + good)
def to_bytes(self):
'''
Create bytes from properties
'''
# Verify that the properties make sense
self.sanitize()
# Write the source and destination ports
bitstream = BitStream('uint:16=%d, '
'uint:16=%d' % (self.source_port,
self.destination_port))
# Write the length
payload_bytes = bytes(self.payload)
length = len(payload_bytes) + 8
bitstream += BitStream('uint:16=%d' % length)
# Write the checksum
bitstream += BitStream('uint:16=%d' % self.checksum)
return bitstream.bytes + payload_bytes
def encodeRect(self, data, p1, p2):
if len(self.media.dim)!=len(p1) or len(p1)!=len(p2):
raise Exception("Dimension of the specified coordinates does not match dimensions of the space of the media")
if not all(map(lambda x,y: x=0,p1)) or \
not all(map(lambda x,y: x=0,p2)):
raise Exception("Coordinates out of the media space range {}-{}x{}".format(p1,p2,self.media.dim))
self.reset()
bits=BitStream(bytes=data)
for c_val in self.iterateValues(p1,p2,self.direction):
vvv=self.encodeValue(c_val, bits)
self.media.set(vvv, c_val[:-1])
def main():
parser = argparse.ArgumentParser(description='PE/COFF Signer')
parser.add_argument('-input', action=ExpandPath, required=True, help="File to parse.")
parser.add_argument('-output', action=ExpandPath, required=True, help="File to write to.")
parser.add_argument('-openssl_path', action=ExpandPath, help="Path to OpenSSL to create signed voucher")
parser.add_argument('-signer_pfx', action=ExpandPath, help="Path to certificate to use to sign voucher. Must contain full certificate chain.")
parser.add_argument('-password_service', help="Name of Keyring/Wallet service/host")
parser.add_argument('-password_user', help="Name of Keyring/Wallet user name")
parser.add_argument('-verbose', action='store_true', help="Verbose output.")
app_args = parser.parse_args()
# to simplify relocation handling we use a mutable BitStream so we can remove
# the BaseAddress from each relocation
stream = bitstring.BitStream(filename=app_args.input)
dict = processCOFFBinary(stream)
if dict['result'] == False:
dict = processMachoBinary(app_args.input)
if dict['result'] == False:
raise Exception("Invalid File")
binaryDigest = der_encoder.encode(dict['digest'])
with open(app_args.output, 'wb') as f:
f.write(binaryDigest)
def manchester_decode(symbols):
bits = []
for dibit in symbols.cut(2):
if dibit == '0b01':
bits.append(0)
elif dibit == '0b10':
bits.append(1)
else:
return None
return bitstring.BitStream(bits)
# Write the flags
bitstream += BitStream('bool=False, bool=%d, '
'bool=%d' % (self.dont_fragment,
self.more_fragments))
# Write the fragment offset
bitstream += BitStream('uint:13=%d' % self.fragment_offset)
# Write the TTL
bitstream += BitStream('uint:8=%d' % self.ttl)
# Write the protocol number
bitstream += BitStream('uint:8=%d' % self.protocol)
# Write the header checksum as 0 for now, we calculate it later
bitstream += BitStream('uint:16=0')
# Write the source and destination addresses
bitstream += BitStream('uint:32=%d, '
'uint:32=%d' % (int(self.source),
int(self.destination)))
# Add the options
bitstream += BitStream(bytes=self.options)
padding_len = (4 - (len(self.options) % 4)) % 4
bitstream += BitStream(padding_len * 8)
# Calculate the header checksum and fill it in
my_checksum = checksum.ones_complement(bitstream.bytes)
bitstream[80:96] = BitStream('uint:16=%d' % my_checksum)
return bitstream.bytes + payload_bytes
raise ValueError('Reserved flag must be 0')
# Read the fragment offset
packet.fragment_offset = bitstream.read('uint:13')
# Read the TTL
packet.ttl = bitstream.read('uint:8')
# Read the protocol number
packet.protocol = bitstream.read('uint:8')
# Read the header checksum
header_checksum = bitstream.read('uint:16')
# Set the checksum bits in the header to 0 and re-calculate
header[80:96] = BitStream(16)
my_checksum = checksum.ones_complement(header.bytes)
if my_checksum != header_checksum:
raise ValueError('Header checksum does not match')
# Read the source and destination addresses
packet.source = IPv4Address(bitstream.read('uint:32'))
packet.destination = IPv4Address(bitstream.read('uint:32'))
# Read the options
option_len = (ihl - 5) * 4
packet.options = bitstream.read('bytes:%d' % option_len)
# And the rest is payload
payload_bytes = (total_length) - (ihl * 4)
packet.payload = bitstream.read('bytes:%d' % payload_bytes)
verbose: whether to print out NAL structure and fields
"""
if use_bitstream:
# testing the parser in a bitstream
self.file = None
self.stream = BitStream(use_bitstream)
else:
fn, ext = os.path.splitext(os.path.basename(f))
valid_input_ext = ['.264', '.h264']
# TODO: extend for H.265
# valid_input_ext = ['.264', 'h264', '.265', '.h265']
if not ext in valid_input_ext:
raise RuntimeError("Valid input types: " + str(valid_input_ext))
bitstream_file = f
self.file = bitstream_file
self.stream = BitStream(filename=bitstream_file)
self.verbose = verbose
self.callbacks = {}