Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_message(self):
message = PAExecSettingsMsg()
message['msg_id'] = PAExecMsgId.MSGID_SETTINGS
message['xor_val'] = b"\x01\x02\x03\x04"
message['unique_id'] = 1
buffer = PAExecSettingsBuffer()
buffer['processors'] = [1, 2]
buffer['interactive'] = True
buffer['password'] = "pass".encode('utf-16-le')
buffer['username'] = "user".encode('utf-16-le')
buffer['executable'] = "a.exe".encode('utf-16-le')
buffer['arguments'] = "arg1".encode('utf-16-le')
buffer['src_dir'] = "C:\\source".encode('utf-16-le')
buffer['dest_dir'] = "C:\\target".encode('utf-16-le')
src_file_info1 = PAExecFileInfo()
src_file_info1['filename'] = "src1".encode('utf-16-le')
src_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
def test_create_message(self):
message = PAExecMsg()
message['msg_id'] = PAExecMsgId.MSGID_OK
message['unique_id'] = 1234
message['buffer'] = b"\x01\x02\x03\x04"
expected = b"\x04\x00" \
b"\xd2\x04\x00\x00" \
b"\x04\x00\x00\x00" \
b"\x01\x02\x03\x04"
actual = message.pack()
assert len(message) == 14
assert actual == expected
def test_parse_message_fail_response(self):
actual = PAExecMsg()
data = b"\x06\x00" \
b"\xd2\x04\x00\x00" \
b"\x08\x00\x00\x00" \
b"\x04\x00\x00\x00\x68\x00\x69\x00"
actual.unpack(data)
with pytest.raises(PAExecException) as exc:
actual.check_resp()
assert str(exc.value) == "Received exception from remote PAExec " \
"service: hi"
assert exc.value.msg_id == PAExecMsgId.MSGID_FAILED
assert exc.value.buffer == b"\x04\x00\x00\x00\x68\x00\x69\x00"
def check_resp(self):
msg_id = self['msg_id'].get_value()
if msg_id != PAExecMsgId.MSGID_OK:
raise PAExecException(msg_id, self['buffer'].get_value())
def __init__(self):
self.fields = OrderedDict([
('msg_id', EnumField(
size=2,
default=PAExecMsgId.MSGID_SETTINGS,
enum_type=PAExecMsgId
)),
('xor_val', IntField(
size=4,
default=os.urandom(4)
)),
('unique_id', IntField(size=4)),
('buffer_len', IntField(size=4)),
('buffer', StructureField(
structure_type=PAExecSettingsBuffer
))
])
super(PAExecSettingsMsg, self).__init__()
break
log.info("Writing PAExecSettingsMsg to the main PAExec pipe")
log.info(str(input_data))
main_pipe.write(input_data.pack(), 0)
log.info("Reading PAExecMsg from the PAExec pipe")
settings_resp_raw = main_pipe.read(0, 1024)
settings_resp = PAExecMsg()
settings_resp.unpack(settings_resp_raw)
log.debug(str(settings_resp))
settings_resp.check_resp()
# start the process now
start_msg = PAExecMsg()
start_msg['msg_id'] = PAExecMsgId.MSGID_START_APP
start_msg['unique_id'] = self._unique_id
start_msg['buffer'] = PAExecStartBuffer()
start_buffer = PAExecStartBuffer()
start_buffer['process_id'] = self.pid
start_buffer['comp_name'] = self.current_host.encode('utf-16-le')
start_msg['buffer'] = start_buffer
log.info("Writing PAExecMsg with PAExecStartBuffer to start the "
"remote process")
log.debug(str(start_msg))
main_pipe.write(start_msg.pack(), 0)
if not interactive and not asynchronous:
# create a pipe for stdout, stderr, and stdin and run in a separate
# thread
log.info("Connecting to remote pipes to retrieve output")
def __init__(self):
self.fields = OrderedDict([
('msg_id', EnumField(
size=2,
enum_type=PAExecMsgId
)),
('unique_id', IntField(size=4)),
('buffer_length', IntField(
size=4,
default=lambda s: len(s['buffer'])
)),
('buffer', BytesField(
size=lambda s: s['buffer_length'].get_value()
))
])
super(PAExecMsg, self).__init__()
FilePipePrinterAccessMask.GENERIC_WRITE | \
FilePipePrinterAccessMask.FILE_APPEND_DATA | \
FilePipePrinterAccessMask.READ_CONTROL | \
FilePipePrinterAccessMask.SYNCHRONIZE
# connect to the main pipe and read the output
main_pipe = create_pipe(tree, main_name, main_access_mask)
main_pipe.write(input_data, 0)
main_out = main_pipe.read(0, 1024, wait=True)
main_out_resp = PAExecMsg()
main_out_resp.unpack(main_out)
main_out_resp.check_resp()
# send the start process
start_msg = PAExecMsg()
start_msg['msg_id'] = PAExecMsgId.MSGID_START_APP
start_msg['unique_id'] = paexec_id
start_msg_buffer = PAExecStartBuffer()
start_msg_buffer['process_id'] = pid
start_msg_buffer['comp_name'] = current_host.encode('utf-16-le')
start_msg['buffer'] = start_msg_buffer
start_msg_b = start_msg.pack()
main_pipe.write(start_msg_b, 0)
out_access_mask = FilePipePrinterAccessMask.FILE_READ_DATA | \
FilePipePrinterAccessMask.FILE_READ_ATTRIBUTES | \
FilePipePrinterAccessMask.FILE_READ_EA | \
FilePipePrinterAccessMask.READ_CONTROL | \
FilePipePrinterAccessMask.SYNCHRONIZE
ioctl_pipe(tree, stdout_name)