Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_psrp_disconnect_already_disconnected(self):
wsman = WSMan("")
rs = RunspacePool(wsman)
rs.state = RunspacePoolState.DISCONNECTED
rs.disconnect()
def test_psrp_reset_runspace_state_fail(self, wsman_conn):
pool = RunspacePool(wsman_conn)
pool.open()
try:
with pytest.raises(InvalidPSRPOperation) as err:
pool.reset_runspace_state()
assert str(err.value) == "Failed to reset runspace state"
finally:
pool.close()
def test_psrp_disconnect_invalid_state(self):
wsman = WSMan("")
rs = RunspacePool(wsman)
with pytest.raises(InvalidRunspacePoolStateError) as err:
rs.disconnect()
assert err.value.action == "disconnect a Runspace Pool"
assert err.value.current_state == RunspacePoolState.BEFORE_OPEN
assert err.value.expected_state == RunspacePoolState.OPENED
assert str(err.value) == \
"Cannot 'disconnect a Runspace Pool' on the " \
"current state 'BeforeOpen', expecting state(s): 'Opened'"
$host.UI.RawUI.ScrollBufferContents($rectangle, $coordinates, $rectangle, $buffer_cell)''')
actual = ps.invoke()
assert len(actual) == 17
assert str(actual[0]) == "White"
assert str(actual[1]) == "Green"
assert set_foreground_color.call_count == 1
assert isinstance(set_foreground_color.call_args[0][0], RunspacePool)
assert isinstance(set_foreground_color.call_args[0][1], PowerShell)
assert set_foreground_color.call_args[0][2] == Color.GREEN
assert str(actual[2]) == "Blue"
assert str(actual[3]) == "Red"
assert set_background_color.call_count == 1
assert isinstance(set_background_color.call_args[0][0], RunspacePool)
assert isinstance(set_background_color.call_args[0][1], PowerShell)
assert set_background_color.call_args[0][2] == Color.RED
assert str(actual[4]) == "1,2"
assert str(actual[5]) == "11,12"
assert set_cursor_position.call_count == 1
assert isinstance(set_cursor_position.call_args[0][0], RunspacePool)
assert isinstance(set_cursor_position.call_args[0][1], PowerShell)
assert set_cursor_position.call_args[0][2].extended_properties['x'] \
== 11
assert set_cursor_position.call_args[0][2].extended_properties['y'] \
== 12
assert str(actual[6]) == "3,4"
assert str(actual[7]) == "13,14"
assert set_window_position.call_count == 1
def test_psrp_connect_already_opened(self):
wsman = WSMan("")
rs = RunspacePool(wsman)
rs.state = RunspacePoolState.OPENED
rs.connect()
host_ui.WriteLine1 = mock_write_line1
host_ui.WriteLine2 = mock_write_line2
host_ui.WriteLine3 = mock_write_line3
host_ui.WriteErrorLine = mock_write_error
host_ui.WriteDebugLine = mock_write_debug
host_ui.WriteProgress = mock_write_progress
host_ui.WriteVerboseLine = mock_write_verbose
host_ui.WriteWarningLine = mock_write_warning
host_ui.Prompt = mock_prompt
# seems like PS never calls PromptForCredential1 so we will skip that
host_ui.PromptForCredential2 = mock_prompt_credential
host_ui.PromptForChoice = mock_prompt_choice
host = PSHost(None, None, False, None, None, host_ui, None)
with RunspacePool(wsman_conn, host=host) as pool:
pool.exchange_keys()
mock_read_line_as_ss.return_value = pool.serialize(
u"ReadLineAsSecureString response", ObjectMeta("SS")
)
mock_ps_credential = PSCredential(username="username",
password=u"password")
mock_prompt_credential.return_value = mock_ps_credential
ps = PowerShell(pool)
ps.add_script('''$host.UI.ReadLine()
$host.UI.ReadLineAsSecureString()
$host.UI.Write("Write1")
$host.UI.Write([System.ConsoleColor]::Blue, [System.ConsoleColor]::White, "Write2")
$host.UI.WriteLine()
$host.UI.WriteLine("WriteLine2")
$host.UI.WriteLine([System.ConsoleColor]::Gray, [System.ConsoleColor]::Green, "WriteLine3")
def test_psrp_reset_runspace_state(self, wsman_conn):
with RunspacePool(wsman_conn) as pool:
ps = PowerShell(pool)
ps.add_script("echo hi")
ps.invoke(add_to_history=True)
pool.reset_runspace_state()
ps = PowerShell(pool)
ps.add_cmdlet("Get-History")
actual = ps.invoke()
# should be empty after clearing history
assert actual == []
def test_psrp_with_input(self, wsman_conn):
with RunspacePool(wsman_conn) as pool:
ps = PowerShell(pool)
ps.add_script('''begin {
$DebugPreference = 'Continue'
Write-Debug "Start Block"
}
process {
$input
}
end {
Write-Debug "End Block"
}''')
actual = ps.invoke(["1", 2, {"a": "b"}, ["a", "b"]])
assert actual == [
u"1",
2,
def test_psrp_stream_no_output_invocation(self, wsman_conn):
with RunspacePool(wsman_conn) as pool:
ps = PowerShell(pool)
script = '''$DebugPreference = 'Continue'
$VerbosePreference = 'Continue'
Write-Debug 'debug stream'
Write-Verbose 'verbose stream'
Write-Error 'error stream'
Write-Output 'output stream'
Write-Warning 'warning stream'
Write-Information 'information stream'
'''
ps.add_script(script)
actual = ps.invoke(remote_stream_options=0)
assert ps.state == PSInvocationState.COMPLETED
% to_native(PYPSRP_IMP_ERR))
super(Connection, self)._connect()
self._build_kwargs()
display.vvv("ESTABLISH PSRP CONNECTION FOR USER: %s ON PORT %s TO %s" %
(self._psrp_user, self._psrp_port, self._psrp_host),
host=self._psrp_host)
if not self.runspace:
connection = WSMan(**self._psrp_conn_kwargs)
# create our psuedo host to capture the exit code and host output
host_ui = PSHostUserInterface()
self.host = PSHost(None, None, False, "Ansible PSRP Host", None,
host_ui, None)
self.runspace = RunspacePool(
connection, host=self.host,
configuration_name=self._psrp_configuration_name
)
display.vvvvv(
"PSRP OPEN RUNSPACE: auth=%s configuration=%s endpoint=%s" %
(self._psrp_auth, self._psrp_configuration_name,
connection.transport.endpoint), host=self._psrp_host
)
try:
self.runspace.open()
except AuthenticationError as e:
raise AnsibleConnectionFailure("failed to authenticate with "
"the server: %s" % to_native(e))
except WinRMError as e:
raise AnsibleConnectionFailure(
"psrp connection failure during runspace open: %s"