Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def session(self):
server = os.environ.get('PYPSEXEC_SERVER', None)
username = os.environ.get('PYPSEXEC_USERNAME', None)
password = os.environ.get('PYPSEXEC_PASSWORD', None)
if server and username and password:
connection = Connection(uuid.uuid4(), server, 445)
session = Session(connection, username, password)
tree = TreeConnect(session, r"\\%s\ADMIN$" % server)
paexec_file = Open(tree, "PAExec.exe")
connection.connect()
try:
session.connect()
tree.connect()
paexec_file.create(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.FILE_WRITE_DATA,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_READ,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_NON_DIRECTORY_FILE)
paexec_file.write(pkgutil.get_data('pypsexec', 'paexec.exe'), 0)
def test_parse_pdu_fine(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
response_pdu = ResponsePDU()
response_pdu['packed_drep'] = DataRepresentationFormat()
response_pdu['stub_data'] = b"\x01\x02\x03\x04"
expected = b"\x01\x02\x03\x04"
actual = api._parse_pdu(response_pdu.pack(), 10)
assert actual == expected
def test_parse_error_unknown(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
with pytest.raises(SCMRException) as exc:
api._parse_error(999, "function_name")
assert str(exc.value) == "Exception calling function_name. Code: 999" \
", Msg: ERROR_UNKNOWN"
def test_marshal_string_none(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
expected = b"\x00\x00\x00\x00"
actual = api._marshal_string(None)
assert actual == expected
def test_parse_error(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
with pytest.raises(SCMRException) as exc:
api._parse_error(5, "function_name")
assert str(exc.value) == "Exception calling function_name. Code: 5" \
", Msg: ERROR_ACCESS_DENIED"
def test_marshal_string_no_padding(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
expected = b"\x02\x00\x00\x00" \
b"\x00\x00\x00\x00" \
b"\x02\x00\x00\x00" \
b"\x68\x00\x00\x00"
actual = api._marshal_string("h")
assert actual == expected
def test_parse_pdu_failure(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
fault_pdu = FaultPDU()
fault_pdu['packed_drep'] = DataRepresentationFormat()
with pytest.raises(PDUException) as exc:
api._parse_pdu(fault_pdu.pack(), 10)
assert "Expecting ResponsePDU for opnum 10 response but got: " \
"FaultPDU" in str(exc.value)
def test_marshal_string_as_referent(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
expected = b"\x00\x00\x00\x01" \
b"\x03\x00\x00\x00" \
b"\x00\x00\x00\x00" \
b"\x03\x00\x00\x00" \
b"\x68\x00\x69\x00\x00\x00" \
b"\x00\x00"
actual = api._marshal_string("hi", unique=True)
assert actual == expected
#arguments = "Write-Host hello world; Start-Sleep -Seconds 10; Write-Host another hello; Write-Error hell"
#arguments = "Write-Host hello world; Start-Sleep -Seconds 5; Write-Host another hello; Write-Error hell"
svc_name = "PAExec-%d-%s" % (pid, current_host)
exe_payload = paexec_out_stream
exe_path = "%s.exe" % svc_name
# Setup SMB connection and session
guid = uuid.uuid4()
connection = Connection(guid, server, port)
try:
connection.connect()
session = Session(connection, username, password)
session.connect()
# open the service manager
scmr_api = SCMRApi(session)
scmr_api.open()
try:
sc_desired_access = DesiredAccess.SC_MANAGER_CONNECT | \
DesiredAccess.SC_MANAGER_CREATE_SERVICE | \
DesiredAccess.SC_MANAGER_ENUMERATE_SERVICE
scm_handle = scmr_api.open_sc_manager_w(server, None, sc_desired_access)
try:
svc_desired_access = DesiredAccess.SERVICE_QUERY_STATUS | \
DesiredAccess.SERVICE_START | \
DesiredAccess.SERVICE_STOP | \
def __init__(self, server, username=None, password=None, port=445,
encrypt=True):
self.server = server
self.port = port
self.pid = os.getpid()
self.current_host = socket.gethostname()
self.connection = Connection(uuid.uuid4(), server, port)
self.session = Session(self.connection, username, password,
require_encryption=encrypt)
self.service_name = "PAExec-%d-%s" % (self.pid, self.current_host)
log.info("Creating PyPsexec Client with unique name: %s"
% self.service_name)
self._exe_file = "%s.exe" % self.service_name
self._stdout_pipe_name = "PaExecOut%s%d"\
% (self.current_host, self.pid)
self._stderr_pipe_name = "PaExecErr%s%d"\
% (self.current_host, self.pid)
self._stdin_pipe_name = "PaExecIn%s%d" % (self.current_host, self.pid)
self._unique_id = get_unique_id(self.pid, self.current_host)
log.info("Generated unique ID for PyPsexec Client: %d"
% self._unique_id)
self._service = Service(self.service_name, self.session)