Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_message(self):
message = BindPDU()
message['pfx_flags'].set_flag(PFlags.PFC_MAYBE)
packed_drep = DataRepresentationFormat()
packed_drep['integer_character'] = \
IntegerCharacterRepresentation.ASCII_LITTLE_ENDIAN
packed_drep['floating_point'] = FloatingPointRepresentation.IEEE
message['packed_drep'] = packed_drep
message['call_id'] = 4
message['assoc_group_id'] = 2
con_elem = ContextElement()
con_elem['context_id'] = 1
syntax = SyntaxIdElement()
syntax['uuid'] = uuid.UUID(bytes=b"\xff" * 16)
con_elem['abstract_syntax'] = syntax
con_elem['transfer_syntaxes'] = [syntax]
message['context_elems'] = [con_elem]
expected = b"\x05" \
b"\x00" \
b"\x0b" \
b"\x40" \
b"\x10" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x48\x00" \
b"\x00\x00" \
def test_parse_message(self):
actual = ContextElement()
data = b"\x04\x00" \
b"\x02" \
b"\x00" \
b"\xff\xff\xff\xff\xff\xff\xff\xff" \
b"\xff\xff\xff\xff\xff\xff\xff\xff" \
b"\x00\x00\x00\x00" \
b"\xee\xee\xee\xee\xee\xee\xee\xee" \
b"\xee\xee\xee\xee\xee\xee\xee\xee" \
b"\x00\x00\x00\x00" \
b"\xdd\xdd\xdd\xdd\xdd\xdd\xdd\xdd" \
b"\xdd\xdd\xdd\xdd\xdd\xdd\xdd\xdd" \
b"\x00\x00\x00\x00"
data = actual.unpack(data)
assert len(actual) == 64
assert data == b""
assert actual['context_id'].get_value() == 4
def test_create_message(self):
message = ContextElement()
message['context_id'] = 4
syntax1 = SyntaxIdElement()
syntax1['uuid'] = uuid.UUID(bytes=b"\xff" * 16)
syntax2 = SyntaxIdElement()
syntax2['uuid'] = uuid.UUID(bytes=b"\xee" * 16)
syntax3 = SyntaxIdElement()
syntax3['uuid'] = uuid.UUID(bytes=b"\xdd" * 16)
message['abstract_syntax'] = syntax1
message['transfer_syntaxes'] = [syntax2, syntax3]
expected = b"\x04\x00" \
b"\x02" \
b"\x00" \
b"\xff\xff\xff\xff\xff\xff\xff\xff" \
b"\xff\xff\xff\xff\xff\xff\xff\xff" \
b"\x00\x00\x00\x00" \
b"\xee\xee\xee\xee\xee\xee\xee\xee" \
context_ndr = ContextElement()
context_ndr['context_id'] = 0
context_ndr['abstract_syntax'] = SyntaxIdElement()
context_ndr['abstract_syntax']['uuid'] = \
uuid.UUID("367ABB81-9844-35F1-AD32-98F038001003")
context_ndr['abstract_syntax']['version'] = 2
# https://msdn.microsoft.com/en-us/library/cc243843.aspx
ndr_syntax = SyntaxIdElement()
ndr_syntax['uuid'] = uuid.UUID("8a885d04-1ceb-11c9-9fe8-08002b104860")
ndr_syntax['version'] = 2
context_ndr['transfer_syntaxes'] = [
ndr_syntax
]
context_bind = ContextElement()
context_bind['context_id'] = 1
context_bind['abstract_syntax'] = SyntaxIdElement()
context_bind['abstract_syntax']['uuid'] = \
uuid.UUID("367ABB81-9844-35F1-AD32-98F038001003")
context_bind['abstract_syntax']['version'] = 2
# https://msdn.microsoft.com/en-us/library/cc243715.aspx
# uuid prefix = 6CB71C2C-9812-4540
# uuid prefix bytes = b'\x2c\x1c\xb7\x6c\x12\x98\x40\x45'
# BindTimeFeatureNegotiateBitmask
# https://msdn.microsoft.com/en-us/library/cc243884.aspx
# SecurityContextMultiplexingSupported = 0x01
# KeepConnectionOnOrphanSupported = 0x02
# version number is 1
bind_syntax = SyntaxIdElement()
bind_syntax['uuid'] = b'\x2c\x1c\xb7\x6c\x12\x98\x40\x45' \
default=lambda s: len(s['transfer_syntaxes'].get_value())
)),
('reserved', IntField(size=1)),
('abstract_syntax', StructureField(
structure_type=SyntaxIdElement
)),
('transfer_syntaxes', ListField(
list_type=StructureField(
size=20,
structure_type=SyntaxIdElement
),
list_count=lambda s: s['n_transfer_syn'].get_value(),
size=lambda s: s['n_transfer_syn'].get_value() * 20
)),
])
super(ContextElement, self).__init__()
0,
ShareAccess.FILE_SHARE_READ |
ShareAccess.FILE_SHARE_WRITE |
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_NON_DIRECTORY_FILE)
# we need to bind svcctl to SCManagerW over DCE/RPC
bind = BindPDU()
bind['pfx_flags'].set_flag(PFlags.PFC_FIRST_FRAG)
bind['pfx_flags'].set_flag(PFlags.PFC_LAST_FRAG)
bind['packed_drep'] = DataRepresentationFormat()
bind['call_id'] = self.call_id
self.call_id += 1
context_ndr = ContextElement()
context_ndr['context_id'] = 0
context_ndr['abstract_syntax'] = SyntaxIdElement()
context_ndr['abstract_syntax']['uuid'] = \
uuid.UUID("367ABB81-9844-35F1-AD32-98F038001003")
context_ndr['abstract_syntax']['version'] = 2
# https://msdn.microsoft.com/en-us/library/cc243843.aspx
ndr_syntax = SyntaxIdElement()
ndr_syntax['uuid'] = uuid.UUID("8a885d04-1ceb-11c9-9fe8-08002b104860")
ndr_syntax['version'] = 2
context_ndr['transfer_syntaxes'] = [
ndr_syntax
]
context_bind = ContextElement()
context_bind['context_id'] = 1
def _unpack_context_elems(self, structure, data):
context_elems = []
while data != b"":
context_elem = ContextElement()
data = context_elem.unpack(data)
context_elems.append(context_elem)
return context_elems
)),
('max_recv_frag', IntField(
size=2,
default=4280
)),
('assoc_group_id', IntField(size=4)),
# p_context_list_t
('n_context_elem', IntField(
size=1,
default=lambda s: len(s['context_elems'].get_value())
)),
('reserved', IntField(size=1)),
('reserved2', IntField(size=2)),
('context_elems', ListField(
list_count=lambda s: s['n_context_elem'].get_value(),
list_type=StructureField(structure_type=ContextElement),
unpack_func=lambda s, d: self._unpack_context_elems(s, d)
)),
('auth_verifier', BytesField(
size=lambda s: s['auth_length'].get_value()
))
])
super(BindPDU, self).__init__()