Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
FilePipePrinterAccessMask.FILE_WRITE_DATA,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_READ,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_NON_DIRECTORY_FILE)
paexec_file.write(pkgutil.get_data('pypsexec', 'paexec.exe'), 0)
paexec_file.close(get_attributes=False)
yield session
finally:
paexec_file.create(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.DELETE,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_DELETE_ON_CLOSE)
paexec_file.close(get_attributes=False)
connection.disconnect(True)
else:
pytest.skip("PYPSEXEC_SERVER, PYPSEXEC_USERNAME, PYPSEXEC_PASSWORD"
" environment variables were not set. Integration "
smb_tree = TreeConnect(client.session,
r"\\%s\ADMIN$" % client.connection.server_name)
smb_tree.connect()
share = Open(smb_tree, "")
share.create(ImpersonationLevel.Impersonation,
DirectoryAccessMask.FILE_READ_ATTRIBUTES |
DirectoryAccessMask.SYNCHRONIZE |
DirectoryAccessMask.FILE_LIST_DIRECTORY,
FileAttributes.FILE_ATTRIBUTE_DIRECTORY,
ShareAccess.FILE_SHARE_READ |
ShareAccess.FILE_SHARE_WRITE |
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_DIRECTORY_FILE)
try:
paexec_files = share.query_directory("PAExec-*.exe",
FileInformationClass.
FILE_NAMES_INFORMATION)
except SMBResponseException as exc:
if exc.status != NtStatus.STATUS_NO_SUCH_FILE:
raise exc
paexec_files = []
return client, paexec_services, paexec_files
def create_pipe(tree, name, access_mask):
pipe = Open(tree, name)
pipe.open(ImpersonationLevel.Impersonation,
access_mask,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_READ |
ShareAccess.FILE_SHARE_WRITE |
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_NON_DIRECTORY_FILE |
CreateOptions.FILE_SYNCHRONOUS_IO_NONALERT)
return pipe
wait_pipe,
sid=tree.session.session_id,
tid=tree.tree_connect_id
)
log.info("Receiving FSCTL_PIPE_WAIT response for pipe: %s"
% name)
tree.session.connection.receive(request)
pipe.create(ImpersonationLevel.Impersonation,
access_mask,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
0,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_NON_DIRECTORY_FILE |
CreateOptions.FILE_SYNCHRONOUS_IO_NONALERT)
return pipe
def create_pipe(tree, name, access_mask):
pipe = Open(tree, name)
pipe.open(ImpersonationLevel.Impersonation,
access_mask,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_READ |
ShareAccess.FILE_SHARE_WRITE |
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_NON_DIRECTORY_FILE |
CreateOptions.FILE_SYNCHRONOUS_IO_NONALERT)
return pipe