Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_parse_message(self):
actual = RequestPDU()
data = b"\x05" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x10\x00\x00\x00" \
b"\x1c\x00" \
b"\x00\x00" \
b"\x04\x00\x00\x00" \
b"\x00\x00\x00\x00" \
b"\x01\x00" \
b"\x0a\x00" \
b"\x01\x02\x03\x04"
data = actual.unpack(data)
assert len(actual) == 28
assert data == b""
assert actual['rpc_vers'].get_value() == 5
def test_parse_message_with_object(self):
actual = RequestPDU()
data = b"\x05" \
b"\x00" \
b"\x00" \
b"\x80" \
b"\x10\x00\x00\x00" \
b"\x2c\x00" \
b"\x00\x00" \
b"\x04\x00\x00\x00" \
b"\x00\x00\x00\x00" \
b"\x01\x00" \
b"\x0a\x00" \
b"\xff\xff\xff\xff\xff\xff\xff\xff" \
b"\xff\xff\xff\xff\xff\xff\xff\xff" \
b"\x01\x02\x03\x04"
data = actual.unpack(data)
assert len(actual) == 44
def test_create_message_with_object(self):
message = RequestPDU()
message['pfx_flags'].set_flag(PFlags.PFC_OBJECT_UUID)
message['packed_drep'] = DataRepresentationFormat()
message['call_id'] = 4
message['cont_id'] = 1
message['opnum'] = 10
message['object'] = b"\xff" * 16
message['stub_data'] = b"\x01\x02\x03\x04"
expected = b"\x05" \
b"\x00" \
b"\x00" \
b"\x80" \
b"\x10\x00\x00\x00" \
b"\x2c\x00" \
b"\x00\x00" \
b"\x04\x00\x00\x00" \
b"\x00\x00\x00\x00" \
def test_create_message(self):
message = RequestPDU()
message['packed_drep'] = DataRepresentationFormat()
message['call_id'] = 4
message['cont_id'] = 1
message['opnum'] = 10
message['stub_data'] = b"\x01\x02\x03\x04"
expected = b"\x05" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x10\x00\x00\x00" \
b"\x1c\x00" \
b"\x00\x00" \
b"\x04\x00\x00\x00" \
b"\x00\x00\x00\x00" \
b"\x01\x00" \
b"\x0a\x00" \
def _invoke(self, function_name, opnum, data):
req = RequestPDU()
req['pfx_flags'].set_flag(PFlags.PFC_FIRST_FRAG)
req['pfx_flags'].set_flag(PFlags.PFC_LAST_FRAG)
req['packed_drep'] = DataRepresentationFormat()
req['call_id'] = self.call_id
self.call_id += 1
req['opnum'] = opnum
req['stub_data'] = data
ioctl_request = SMB2IOCTLRequest()
ioctl_request['ctl_code'] = CtlCode.FSCTL_PIPE_TRANSCEIVE
ioctl_request['file_id'] = self.handle.file_id
ioctl_request['max_output_response'] = 1024
ioctl_request['flags'] = IOCTLFlags.SMB2_0_IOCTL_IS_FSCTL
ioctl_request['buffer'] = req
def parse_pdu(data):
"""
Converts the raw byte string of PDU data into a *PDU() structure. If the
type is invalid or unknown to pypsexec it will throw a PDUException.
:param data: The byte string returned in the buffer of the IOCTL response
:return: *PDU() structure that is dependent on the type being parsed
"""
type = struct.unpack("
('call_id', IntField(size=4)),
('alloc_hint', IntField(size=4)),
('cont_id', IntField(size=2)),
('opnum', IntField(size=2)),
('object', BytesField(
size=lambda s:
16 if s['pfx_flags'].has_flag(PFlags.PFC_OBJECT_UUID) else 0
)),
('stub_data', BytesField(
size=lambda s: self._get_stub_data_size(s)
)),
('auth_verifier', BytesField(
size=lambda s: s['auth_length'].get_value()
))
])
super(RequestPDU, self).__init__()