Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_winrs(self, functional_transports):
for wsman in functional_transports:
with WinRS(wsman) as shell:
process = Process(shell, "echo", ["hi"])
process.invoke()
process.signal(SignalCode.CTRL_C)
assert process.rc == 0
assert process.stdout == b"hi\r\n"
assert process.stderr == b""
def test_winrs_extra_opts(self, wsman_conn):
with WinRS(wsman_conn, name="shell 1", lifetime=60, idle_time_out=60,
working_directory="C:\\Windows") as shell:
assert shell.name == "shell 1"
assert shell.lifetime == 60
assert shell.idle_time_out == "PT60.000S"
assert shell.working_directory == "C:\\Windows"
process = Process(shell, "powershell.exe", ["(pwd).Path"])
process.invoke()
process.signal(SignalCode.CTRL_C)
assert process.rc == 0
assert process.stdout == b"C:\\Windows\r\n"
assert process.stderr == b""
def test_winrs_environment(self, wsman_conn):
complex_chars = r'_-(){}[]<>*+-/\?"''!@#$^&|;:i,.`~0'
env_block = OrderedDict([
('env1', 'var1'),
(1234, 5678),
(complex_chars, complex_chars),
])
with WinRS(wsman_conn, environment=env_block) as shell:
process = Process(shell, "cmd.exe", ["/c", "set"])
process.invoke()
process.signal(SignalCode.CTRL_C)
env_list = process.stdout.decode('utf-8').splitlines()
assert process.rc == 0
assert "env1=var1" in env_list
assert "1234=5678" in env_list
assert "%s=%s" % (complex_chars, complex_chars) in env_list
assert process.stderr == b""
def test_winrs_open_already_opened(self, wsman_conn):
with WinRS(wsman_conn) as shell:
shell.open()
shell.close()
def test_winrs_no_cmd_shell(self, wsman_conn):
with WinRS(wsman_conn) as shell:
process = Process(shell, "powershell.exe", ["Write-Host", "hi"],
no_shell=True)
# this will fail as you need to provide the full path when not
# running in cmd shell
with pytest.raises(WSManFaultError) as exc:
process.invoke()
assert exc.value.provider_fault == "The system cannot find the " \
"file specified."
assert exc.value.code == 2147942402
# fix the execute path and invoke again
process.executable = \
r"C:\Windows\system32\WindowsPowerShell\v1.0\powershell.exe"
process.invoke()
process.signal(SignalCode.CTRL_C)
def test_winrs_unicode(self, wsman_conn):
with WinRS(wsman_conn, codepage=65001) as shell:
process = Process(shell, "powershell.exe",
[u"Write-Host こんにちは"])
process.invoke()
process.signal(SignalCode.CTRL_C)
assert process.rc == 0
assert process.stdout.decode('utf-8') == u"こんにちは\n"
assert process.stderr == b""
def test_winrs_stderr_rc(self, wsman_conn):
with WinRS(wsman_conn) as shell:
process = Process(shell, "cmd.exe", ["/c echo out && echo "
"err>&2 && exit 1"])
process.invoke()
process.signal(SignalCode.CTRL_C)
assert process.rc == 1
assert process.stdout == b"out \r\n"
assert process.stderr == b"err \r\n"
ssl = ""
for member in config['members'].split(','):
res = ""
try:
if ssl:
client = WSMan(member, ssl=True, auth="ntlm",
cert_validation=False,
connection_timeout=3,
username=config['user'], password=config['password'])
else:
client = WSMan(member, ssl=False, auth="ntlm",
cert_validation=False,
connection_timeout=3,
username=config['user'], password=config['password'])
with WinRS(client) as shell:
process = Process(shell, REMCMD)
print(process)
stdout, stderr, _rc = process.invoke()
if "decode" in dir(stdout):
res = stdout.decode()
err = stderr.decode()
else:
res = stdout
err = stderr
if err:
print("get_winRS: {} -> err: {}".format(config, err),
file=sys.stderr)
errors += 1
except Exception as e:
print("get_winRS: Connect to {} failed: {}".format(member, e.args[0]),
file=sys.stderr)
hold
:param max_runspaces: The maximum number of runspaces that a pool can
hold
:param session_key_timeout_ms: The maximum time to wait for a session
key transfer from the server
"""
log.info("Initialising RunspacePool object for configuration %s"
% configuration_name)
# The below are defined in some way at
# https://msdn.microsoft.com/en-us/library/ee176015.aspx
self.id = str(uuid.uuid4()).upper()
self.state = RunspacePoolState.BEFORE_OPEN
self.connection = connection
resource_uri = "http://schemas.microsoft.com/powershell/%s" \
% configuration_name
self.shell = WinRS(connection, resource_uri=resource_uri, id=self.id,
input_streams='stdin pr', output_streams='stdout')
self.ci_table = {}
self.pipelines = {}
self.session_key_timeout_ms = session_key_timeout_ms
# Extra properties that are important and can control the RunspacePool
# behaviour
self.apartment_state = apartment_state
self.thread_options = thread_options
self.host = host
self.protocol_version = None
self.ps_version = None
self.serialization_version = None
self.user_events = []
self._application_private_data = None