Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_message(self):
message = RequestPDU()
message['packed_drep'] = DataRepresentationFormat()
message['call_id'] = 4
message['cont_id'] = 1
message['opnum'] = 10
message['stub_data'] = b"\x01\x02\x03\x04"
expected = b"\x05" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x10\x00\x00\x00" \
b"\x1c\x00" \
b"\x00\x00" \
b"\x04\x00\x00\x00" \
b"\x00\x00\x00\x00" \
b"\x01\x00" \
b"\x0a\x00" \
b"\x01\x02\x03\x04"
def test_create_message(self):
message = BindPDU()
message['pfx_flags'].set_flag(PFlags.PFC_MAYBE)
packed_drep = DataRepresentationFormat()
packed_drep['integer_character'] = \
IntegerCharacterRepresentation.ASCII_LITTLE_ENDIAN
packed_drep['floating_point'] = FloatingPointRepresentation.IEEE
message['packed_drep'] = packed_drep
message['call_id'] = 4
message['assoc_group_id'] = 2
con_elem = ContextElement()
con_elem['context_id'] = 1
syntax = SyntaxIdElement()
syntax['uuid'] = uuid.UUID(bytes=b"\xff" * 16)
con_elem['abstract_syntax'] = syntax
con_elem['transfer_syntaxes'] = [syntax]
message['context_elems'] = [con_elem]
expected = b"\x05" \
b"\x00" \
b"\x0b" \
def test_parse_message(self):
actual = DataRepresentationFormat()
data = b"\x10" \
b"\x00" \
b"\x00" \
b"\x00"
data = actual.unpack(data)
assert len(actual) == 4
assert data == b""
assert actual['integer_character'].get_value() == \
IntegerCharacterRepresentation.ASCII_LITTLE_ENDIAN
assert actual['floating_point'].get_value() == \
FloatingPointRepresentation.IEEE
assert actual['reserved1'].get_value() == 0
assert actual['reserved2'].get_value() == 0
def test_parse_pdu_fine(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
response_pdu = ResponsePDU()
response_pdu['packed_drep'] = DataRepresentationFormat()
response_pdu['stub_data'] = b"\x01\x02\x03\x04"
expected = b"\x01\x02\x03\x04"
actual = api._parse_pdu(response_pdu.pack(), 10)
assert actual == expected
def test_create_message(self):
message = DataRepresentationFormat()
message['floating_point'] = FloatingPointRepresentation.IEEE
expected = b"\x10" \
b"\x00" \
b"\x00" \
b"\x00"
actual = message.pack()
assert len(message) == 4
assert actual == expected
def test_create_message_with_object(self):
message = RequestPDU()
message['pfx_flags'].set_flag(PFlags.PFC_OBJECT_UUID)
message['packed_drep'] = DataRepresentationFormat()
message['call_id'] = 4
message['cont_id'] = 1
message['opnum'] = 10
message['object'] = b"\xff" * 16
message['stub_data'] = b"\x01\x02\x03\x04"
expected = b"\x05" \
b"\x00" \
b"\x00" \
b"\x80" \
b"\x10\x00\x00\x00" \
b"\x2c\x00" \
b"\x00\x00" \
b"\x04\x00\x00\x00" \
b"\x00\x00\x00\x00" \
b"\x01\x00" \
b"\x0a\x00" \
def test_parse_pdu_failure(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
fault_pdu = FaultPDU()
fault_pdu['packed_drep'] = DataRepresentationFormat()
with pytest.raises(PDUException) as exc:
api._parse_pdu(fault_pdu.pack(), 10)
assert "Expecting ResponsePDU for opnum 10 response but got: " \
"FaultPDU" in str(exc.value)
size=1,
default=5
)),
('rpc_vers_minor', IntField(size=1)),
('ptype', EnumField(
size=1,
enum_type=PType,
default=PType.RESPONSE
)),
('pfx_flags', FlagField(
size=1,
flag_type=PFlags
)),
('packed_drep', StructureField(
size=4,
structure_type=DataRepresentationFormat
)),
('frag_length', IntField(
size=2,
default=lambda s: len(s)
)),
('auth_length', IntField(
size=2,
default=lambda s: len(s['auth_verifier'])
)),
('call_id', IntField(size=4)),
('alloc_hint', IntField(size=4)),
('cont_id', IntField(size=2)),
('cancel_count', IntField(size=1)),
('reserved', IntField(size=1)),
('stub_data', BytesField(
size=lambda s: self._get_stub_data_size(s)
def _invoke(self, function_name, opnum, data):
req = RequestPDU()
req['pfx_flags'].set_flag(PFlags.PFC_FIRST_FRAG)
req['pfx_flags'].set_flag(PFlags.PFC_LAST_FRAG)
req['packed_drep'] = DataRepresentationFormat()
req['call_id'] = self.call_id
self.call_id += 1
req['opnum'] = opnum
req['stub_data'] = data
ioctl_request = SMB2IOCTLRequest()
ioctl_request['ctl_code'] = CtlCode.FSCTL_PIPE_TRANSCEIVE
ioctl_request['file_id'] = self.handle.file_id
ioctl_request['max_output_response'] = 1024
ioctl_request['flags'] = IOCTLFlags.SMB2_0_IOCTL_IS_FSCTL
ioctl_request['buffer'] = req
session_id = self.tree.session.session_id
tree_id = self.tree.tree_connect_id
log.info("Sending svcctl RPC request for %s" % function_name)
def __init__(self):
self.fields = OrderedDict([
('integer_character', EnumField(
size=1,
enum_type=IntegerCharacterRepresentation,
default=IntegerCharacterRepresentation.ASCII_LITTLE_ENDIAN
)),
('floating_point', EnumField(
size=1,
enum_type=FloatingPointRepresentation,
default=FloatingPointRepresentation.IEEE
)),
('reserved1', IntField(size=1)),
('reserved2', IntField(size=1))
])
super(DataRepresentationFormat, self).__init__()