Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_throw_pypsexec_exception(self):
with pytest.raises(PypsexecException) as exc:
raise PypsexecException("hi")
assert str(exc.value) == "hi"
def test_proc_stdin_and_interactive(self):
client = Client("username", "password", "server")
with pytest.raises(PypsexecException) as exc:
client.run_executable("whoami",
interactive=True,
stdin=b"")
assert str(exc.value) == "Cannot send stdin data on an interactive " \
"or asynchronous process"
def test_open_service_missing(self, session):
service = Service("missing-service", session)
service.open()
assert service._handle is None
with pytest.raises(PypsexecException) as exc:
service.start()
assert str(exc.value) == "Cannot start service missing-service as " \
"it does not exist"
with pytest.raises(PypsexecException) as exc:
service.stop()
assert str(exc.value) == "Cannot stop service missing-service as " \
"it does not exist"
service.close()
def test_throw_pypsexec_exception(self):
with pytest.raises(PypsexecException) as exc:
raise PypsexecException("hi")
assert str(exc.value) == "hi"
def test_proc_both_elevated_and_limited_error(self):
client = Client("username", "password", "server")
with pytest.raises(PypsexecException) as exc:
client.run_executable("whoami",
run_elevated=True,
run_limited=True)
assert str(exc.value) == "Both run_elevated and run_limited are " \
"set, only 1 of these can be true"
def stop(self):
self._open_service()
if self._handle is None:
raise PypsexecException("Cannot stop service %s as it does not "
"exist" % self.name)
try:
self._scmr.control_service(self._handle,
ControlCode.SERVICE_CONTROL_STOP)
except SCMRException as exc:
if exc.return_code != ScmrReturnValues.ERROR_SERVICE_NOT_ACTIVE:
raise exc
default is pipe.OutputPipeBytes which returns a byte string of the
stderr
:param stdin: Either a byte string of generator that yields multiple
byte strings to send over the stdin pipe.
:param wow64: Set to True to run the executable as a 32-bit process.
:return: Tuple(stdout, stderr, rc)
stdout: (Bytes) The stdout.get_bytes() return result
stderr: (Bytes) The stderr.get_bytes() return result
rc: (Int) The return code of the process (The pid of the async
process when async=True)
"""
if run_elevated and run_limited:
raise PypsexecException("Both run_elevated and run_limited are "
"set, only 1 of these can be true")
if stdin is not None and (asynchronous or interactive):
raise PypsexecException("Cannot send stdin data on an interactive "
"or asynchronous process")
log.debug("Making sure PAExec service is running")
self._service.start()
smb_tree = TreeConnect(self.session,
r"\\%s\IPC$" % self.connection.server_name)
log.info("Connecting to SMB Tree %s" % smb_tree.share_name)
smb_tree.connect()
settings = PAExecSettingsBuffer()
settings['processors'] = processors if processors else []
settings['asynchronous'] = asynchronous
settings['dont_load_profile'] = not load_profile
settings['interactive_session'] = interactive_session
settings['interactive'] = interactive
def start(self):
self._open_service()
if self._handle is None:
raise PypsexecException("Cannot start service %s as it does not "
"exist" % self.name)
try:
self._scmr.start_service_w(self._handle)
except SCMRException as exc:
if exc.return_code != \
ScmrReturnValues.ERROR_SERVICE_ALREADY_RUNNING:
raise exc
@property
def buffer(self):
return self.args[1]
@property
def message(self):
error_length = struct.unpack("
# write the settings to the main PAExec pipe
pipe_access_mask = FilePipePrinterAccessMask.GENERIC_READ | \
FilePipePrinterAccessMask.GENERIC_WRITE | \
FilePipePrinterAccessMask.FILE_APPEND_DATA | \
FilePipePrinterAccessMask.READ_CONTROL | \
FilePipePrinterAccessMask.SYNCHRONIZE
for i in range(0, 3):
try:
main_pipe = open_pipe(smb_tree, self._exe_file,
pipe_access_mask)
except SMBResponseException as exc:
if exc.status != NtStatus.STATUS_OBJECT_NAME_NOT_FOUND:
raise exc
elif i == 2:
raise PypsexecException("Failed to open main PAExec pipe "
"%s, no more attempts remaining"
% self._exe_file)
log.warning("Main pipe %s does not exist yet on attempt %d. "
"Trying again in 5 seconds"
% (self._exe_file, i + 1))
time.sleep(5)
else:
break
log.info("Writing PAExecSettingsMsg to the main PAExec pipe")
log.info(str(input_data))
main_pipe.write(input_data.pack(), 0)
log.info("Reading PAExecMsg from the PAExec pipe")
settings_resp_raw = main_pipe.read(0, 1024)
settings_resp = PAExecMsg()