Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_message(self):
message = PAExecSettingsBuffer()
message['processors'] = [1, 2]
message['interactive'] = True
message['password'] = "pass".encode('utf-16-le')
message['username'] = "user".encode('utf-16-le')
message['executable'] = "a.exe".encode('utf-16-le')
message['arguments'] = "arg1".encode('utf-16-le')
message['src_dir'] = "C:\\source".encode('utf-16-le')
message['dest_dir'] = "C:\\target".encode('utf-16-le')
src_file_info1 = PAExecFileInfo()
src_file_info1['filename'] = "src1".encode('utf-16-le')
src_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
src_file_info2 = PAExecFileInfo()
src_file_info2['filename'] = "src2".encode('utf-16-le')
src_file_info2['file_last_write'] = datetime.utcfromtimestamp(0)
message['src_files'] = [src_file_info1, src_file_info2]
def test_create_message(self):
message = PAExecSettingsMsg()
message['msg_id'] = PAExecMsgId.MSGID_SETTINGS
message['xor_val'] = b"\x01\x02\x03\x04"
message['unique_id'] = 1
buffer = PAExecSettingsBuffer()
buffer['processors'] = [1, 2]
buffer['interactive'] = True
buffer['password'] = "pass".encode('utf-16-le')
buffer['username'] = "user".encode('utf-16-le')
buffer['executable'] = "a.exe".encode('utf-16-le')
buffer['arguments'] = "arg1".encode('utf-16-le')
buffer['src_dir'] = "C:\\source".encode('utf-16-le')
buffer['dest_dir'] = "C:\\target".encode('utf-16-le')
src_file_info1 = PAExecFileInfo()
src_file_info1['filename'] = "src1".encode('utf-16-le')
src_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
src_file_info2 = PAExecFileInfo()
src_file_info2['filename'] = "src2".encode('utf-16-le')
src_file_info2['file_last_write'] = datetime.utcfromtimestamp(0)
buffer['src_files'] = [src_file_info1, src_file_info2]
def test_parse_message(self):
actual = PAExecSettingsBuffer()
data = b"\x01\x00\x00\x00" \
b"\x02\x00\x00\x00" \
b"\x01\x00\x00\x00\x02\x00\x00\x00" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x00\x00\x00\x00" \
b"\x01" \
b"\x00" \
b"\x00" \
b"\x04\x00\x00\x00" \
b"\x70\x00\x61\x00\x73\x00\x73\x00" \
b"\x04\x00\x00\x00" \
b"\x75\x00\x73\x00\x65\x00\x72\x00" \
def unpack(self, data):
# need to de-xor the buffer to get human readable values
xor_value = struct.unpack("
def __init__(self):
self.fields = OrderedDict([
('msg_id', EnumField(
size=2,
default=PAExecMsgId.MSGID_SETTINGS,
enum_type=PAExecMsgId
)),
('xor_val', IntField(
size=4,
default=os.urandom(4)
)),
('unique_id', IntField(size=4)),
('buffer_len', IntField(size=4)),
('buffer', StructureField(
structure_type=PAExecSettingsBuffer
))
])
super(PAExecSettingsMsg, self).__init__()
None,
None,
None)[1]
# start the new service
scmr_api.start_service_w(service_handle)
finally:
scmr_api.close_service_handle_w(scm_handle)
finally:
scmr_api.close()
# connect to named pipe of the service
tree = TreeConnect(session, r"\\%s\IPC$" % session.connection.server_name)
tree.connect()
settings = PAExecSettingsBuffer()
settings['username'] = username.encode('utf-16-le')
settings['password'] = password.encode('utf-16-le')
settings['executable'] = exe.encode('utf-16-le')
settings['arguments'] = arguments.encode('utf-16-le')
input_data_struct = PAExecSettingsMsg()
input_data_struct['unique_id'] = paexec_id
input_data_struct['buffer'] = settings
input_data = input_data_struct.pack()
cleanup_pipes = []
try:
# create pipes and connect to them
main_name = "%s.exe" % svc_name
stdout_name = "PaExecOut%s%d" % (current_host, pid)
stderr_name = "PaExecErr%s%d" % (current_host, pid)
if run_elevated and run_limited:
raise PypsexecException("Both run_elevated and run_limited are "
"set, only 1 of these can be true")
if stdin is not None and (asynchronous or interactive):
raise PypsexecException("Cannot send stdin data on an interactive "
"or asynchronous process")
log.debug("Making sure PAExec service is running")
self._service.start()
smb_tree = TreeConnect(self.session,
r"\\%s\IPC$" % self.connection.server_name)
log.info("Connecting to SMB Tree %s" % smb_tree.share_name)
smb_tree.connect()
settings = PAExecSettingsBuffer()
settings['processors'] = processors if processors else []
settings['asynchronous'] = asynchronous
settings['dont_load_profile'] = not load_profile
settings['interactive_session'] = interactive_session
settings['interactive'] = interactive
settings['run_elevated'] = run_elevated
settings['run_limited'] = run_limited
settings['username'] = self._encode_string(username)
settings['password'] = self._encode_string(password)
settings['use_system_account'] = use_system_account
settings['working_dir'] = self._encode_string(working_dir)
settings['show_ui_on_win_logon'] = show_ui_on_win_logon
settings['priority'] = priority
settings['executable'] = self._encode_string(executable)
settings['arguments'] = self._encode_string(arguments)
settings['remote_log_path'] = self._encode_string(remote_log_path)
unpack_func=lambda s, d:
self._unpack_file_list(s, d, 'num_src_files')
)),
('num_dest_files', IntField(
size=4,
default=lambda s: len(s['dest_files'].get_value())
)),
('dest_files', ListField(
list_count=lambda s: s['num_dest_files'].get_value(),
list_type=StructureField(structure_type=PAExecFileInfo),
unpack_func=lambda s, d:
self._unpack_file_list(s, d, 'num_dest_files')
)),
('timeout_seconds', IntField(size=4))
])
super(PAExecSettingsBuffer, self).__init__()