Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_parse_pdu_known():
message = BindNakPDU()
message['packed_drep'] = DataRepresentationFormat()
message['call_id'] = 4
message['provider_reject_reason'] = BindNakReason.LOCAL_LIMIT_EXCEEDED
message['p_protocols'] = [5]
data = message.pack()
actual = parse_pdu(data)
assert isinstance(actual, BindNakPDU)
assert len(actual) == 21
def test_parse_message(self):
actual = BindNakPDU()
data = b"\x05" \
b"\x00" \
b"\x0d" \
b"\x00" \
b"\x10\x00\x00\x00" \
b"\x15\x00" \
b"\x00\x00" \
b"\x04\x00\x00\x00" \
b"\x02\x00" \
b"\x01" \
b"\x05\x00"
data = actual.unpack(data)
assert len(actual) == 21
assert actual['rpc_vers'].get_value() == 5
assert actual['rpc_vers_minor'].get_value() == 0
assert actual['ptype'].get_value() == PType.BIND_NAK
def test_create_message(self):
message = BindNakPDU()
message['packed_drep'] = DataRepresentationFormat()
message['call_id'] = 4
message['provider_reject_reason'] = BindNakReason.LOCAL_LIMIT_EXCEEDED
message['p_protocols'] = [5]
expected = b"\x05" \
b"\x00" \
b"\x0d" \
b"\x00" \
b"\x10\x00\x00\x00" \
b"\x15\x00" \
b"\x00\x00" \
b"\x04\x00\x00\x00" \
b"\x02\x00" \
b"\x01" \
b"\x05\x00"
actual = message.pack()
def test_parse_pdu_known():
message = BindNakPDU()
message['packed_drep'] = DataRepresentationFormat()
message['call_id'] = 4
message['provider_reject_reason'] = BindNakReason.LOCAL_LIMIT_EXCEEDED
message['p_protocols'] = [5]
data = message.pack()
actual = parse_pdu(data)
assert isinstance(actual, BindNakPDU)
assert len(actual) == 21
def parse_pdu(data):
"""
Converts the raw byte string of PDU data into a *PDU() structure. If the
type is invalid or unknown to pypsexec it will throw a PDUException.
:param data: The byte string returned in the buffer of the IOCTL response
:return: *PDU() structure that is dependent on the type being parsed
"""
type = struct.unpack("
('call_id', IntField(size=4)),
('provider_reject_reason', EnumField(
size=2,
enum_type=BindNakReason
)),
# versions
('n_protocols', IntField(
size=1,
default=lambda s: len(s['p_protocols'].get_value())
)),
('p_protocols', ListField(
list_type=IntField(size=2),
list_count=lambda s: s['n_protocols'].get_value()
))
])
super(BindNakPDU, self).__init__()