Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_psrp_run_protocol_version(self, wsman_conn):
with RunspacePool(wsman_conn) as pool:
if type(wsman_conn.transport).__name__ == "TransportFake":
expected_version = \
wsman_conn.transport._test_name.split("_")[-1]
actual_version = pool.protocol_version
assert actual_version == expected_version
ps = PowerShell(pool)
ps.add_script('''begin {
$DebugPreference = 'Continue'
Write-Debug "Start Block"
Write-Error "error"
}
process {
$input
}
end {
Write-Debug "End Block"
}''')
# this tests the merge logic works on v2.1
ps.merge_error("output")
actual = ps.invoke(["message 1", 2, ["3", 3]])
assert len(actual) == 4
def test_psrp_receive_failure(self, wsman_conn):
with RunspacePool(wsman_conn) as pool:
ps = PowerShell(pool)
ps.state = PSInvocationState.RUNNING
ps._command_id = ps.id
with pytest.raises(WSManFaultError) as err:
ps.end_invoke()
assert str(err.value.reason) == \
"The Windows Remote Shell received a request to perform an " \
"operation on a command identifier that does not exist. " \
def test_psrp_create_nested_invalid_state(self):
ps = PowerShell(RSPoolTest())
with pytest.raises(InvalidPipelineStateError) as err:
ps.create_nested_power_shell()
assert err.value.action == "create a nested PowerShell pipeline"
assert err.value.current_state == PSInvocationState.NOT_STARTED
assert err.value.expected_state == PSInvocationState.RUNNING
assert str(err.value) == \
"Cannot 'create a nested PowerShell pipeline' on the current " \
"state 'NotStarted', expecting state(s): 'Running'"
def test_psrp_being_invoke_no_commands(self):
ps = PowerShell(RSPoolTest())
with pytest.raises(InvalidPSRPOperation) as err:
ps.begin_invoke()
assert str(err.value) == "Cannot invoke PowerShell without any " \
"commands being set"
# seems like PS never calls PromptForCredential1 so we will skip that
host_ui.PromptForCredential2 = mock_prompt_credential
host_ui.PromptForChoice = mock_prompt_choice
host = PSHost(None, None, False, None, None, host_ui, None)
with RunspacePool(wsman_conn, host=host) as pool:
pool.exchange_keys()
mock_read_line_as_ss.return_value = pool.serialize(
u"ReadLineAsSecureString response", ObjectMeta("SS")
)
mock_ps_credential = PSCredential(username="username",
password=u"password")
mock_prompt_credential.return_value = mock_ps_credential
ps = PowerShell(pool)
ps.add_script('''$host.UI.ReadLine()
$host.UI.ReadLineAsSecureString()
$host.UI.Write("Write1")
$host.UI.Write([System.ConsoleColor]::Blue, [System.ConsoleColor]::White, "Write2")
$host.UI.WriteLine()
$host.UI.WriteLine("WriteLine2")
$host.UI.WriteLine([System.ConsoleColor]::Gray, [System.ConsoleColor]::Green, "WriteLine3")
$host.UI.WriteErrorLine("WriteErrorLine")
$host.UI.WriteDebugLine("WriteDebugLine")
$host.UI.WriteProgress(1, (New-Object -TypeName System.Management.Automation.ProgressRecord -ArgumentList 2, "activity", "description"))
$host.UI.WriteVerboseLine("WriteVerboseLine")
$host.UI.WriteWarningLine("WriteWarningLine")
$prompt_field = New-Object -TypeName System.Management.Automation.Host.FieldDescription -ArgumentList @("prompt field")
$prompt_field.Label = "PromptLabel"
$host.UI.Prompt("Prompt caption", "Prompt message", $prompt_field)
host_raw_ui.SetWindowPosition = set_window_position
host_raw_ui.SetCursorSize = set_cursor_size
host_raw_ui.SetBufferSize = set_buffer_size
host_raw_ui.SetWindowSize = set_window_size
host_raw_ui.SetWindowTitle = set_window_title
host_raw_ui.ReadKey = read_key
host_raw_ui.FlushInputBuffer = flush_input
host_raw_ui.SetBufferContents1 = set_buffer1
host_raw_ui.SetBufferContents2 = set_buffer2
host_raw_ui.ScrollBufferContents = scroll_buffer
host_ui = PSHostUserInterface(host_raw_ui)
host = PSHost(None, None, False, None, None, host_ui, None)
with RunspacePool(wsman_conn, host=host) as pool:
ps = PowerShell(pool)
ps.add_script('''$host.UI.RawUI.ForegroundColor
$host.UI.RawUI.ForegroundColor = [System.ConsoleColor]::Green
$host.UI.RawUI.ForegroundColor
$host.UI.RawUI.BackgroundColor
$host.UI.RawUI.BackgroundColor = [System.ConsoleColor]::Red
$host.UI.RawUI.BackgroundColor
$host.UI.RawUI.CursorPosition
$host.UI.RawUI.CursorPosition = (New-Object -TypeName System.Management.Automation.Host.Coordinates -ArgumentList 11, 12)
$host.UI.RawUI.CursorPosition
$host.UI.RawUI.WindowPosition
$host.UI.RawUI.WindowPosition = (New-Object -TypeName System.Management.Automation.Host.Coordinates -ArgumentList 13, 14)
$host.UI.RawUI.WindowPosition
def test_psrp_sec_string():
transport = TransportHTTP(server, port, username, password)
wsman = WSMan(transport, operation_timeout=30)
runspace_pool = RunspacePool(wsman)
runspace_pool.open()
try:
runspace_pool.exchange_keys()
ps = PowerShell(runspace_pool)
sec_string = runspace_pool._serializer.serialize(u"Hello World", ObjectMeta("SS"))
ps.add_cmdlet("Set-Variable").add_parameters({"Name": "sec_string", "Value": sec_string})
ps.add_statement()
ps.add_script("[System.Runtime.InteropServices.marshal]::PtrToStringAuto([System.Runtime.InteropServices.marshal]::SecureStringToBSTR($sec_string))")
ps.add_statement()
ps.add_cmdlet("ConvertTo-SecureString").add_parameters({"String": "abc", "AsPlainText": None, "Force": None})
output = ps.invoke()
finally:
runspace_pool.close()
a = ""
def test_connect_async_invalid_state(self):
ps = PowerShell(RSPoolTest())
with pytest.raises(InvalidPipelineStateError) as err:
ps.connect()
assert err.value.action == "connect to a disconnected pipeline"
assert err.value.current_state == PSInvocationState.NOT_STARTED
assert err.value.expected_state == PSInvocationState.DISCONNECTED
assert str(err.value) == \
"Cannot 'connect to a disconnected pipeline' on the current " \
"state 'NotStarted', expecting state(s): 'Disconnected'"
def test_psrp_with_history(self, wsman_conn):
with RunspacePool(wsman_conn) as pool:
ps = PowerShell(pool)
ps.add_script("Write-Output 1; Write-Output 2")
ps.invoke(add_to_history=True)
ps_hist = PowerShell(pool)
ps_hist.add_script("Get-History")
actual = ps_hist.invoke()
assert len(actual) == 1
assert actual[0].adapted_properties['CommandLine'] == \
"Write-Output 1; Write-Output 2"
assert actual[0].adapted_properties['ExecutionStatus'] == "Completed"
b_src = to_bytes(src)
src_size = os.path.getsize(b_src)
log.info("Copying '%s' to '%s' with a total size of %d"
% (src, dest, src_size))
with RunspacePool(self.wsman, configuration_name=configuration_name) as pool:
# Get the buffer size of each fragment to send, subtract. Adjust to size of the base64 encoded bytes. Also
# subtract 82 for the fragment, message, and other header info that PSRP adds.
buffer_size = int((self.wsman.max_payload_size - 82) / 4 * 3)
log.info("Creating file reader with a buffer size of %d" % buffer_size)
read_gen = read_buffer(b_src, src_size, buffer_size)
command = get_pwsh_script('copy.ps1')
log.debug("Starting to send file data to remote process")
powershell = PowerShell(pool)
powershell.add_script(command).add_argument(dest)
powershell.invoke(input=read_gen)
log.debug("Finished sending file data to remote process")
for warning in powershell.streams.warning:
warnings.warn(str(warning))
if powershell.had_errors:
errors = powershell.streams.error
error = "\n".join([str(err) for err in errors])
raise WinRMError("Failed to copy file: %s" % error)
output_file = to_unicode(powershell.output[-1]).strip()
log.info("Completed file transfer of '%s' to '%s'" % (src, output_file))
return output_file