Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_calling_ssh_returns_result_parsed_from_command_output(buffer_connection, command_output_and_expected_result):
command_output = command_output_and_expected_result
buffer_connection.remote_inject_response([command_output])
ssh_cmd = Ssh(connection=buffer_connection.moler_connection, login="user", password="english",
host="host.domain.net", expected_prompt="host:.*#", options=None)
assert "TERM=xterm-mono ssh -l user host.domain.net" == ssh_cmd.command_string
assert ssh_cmd() is not None
def test_ssh_failed_known_hosts(buffer_connection, command_output_failed_known_hosts):
command_output = command_output_failed_known_hosts
buffer_connection.remote_inject_response([command_output])
ssh_cmd = Ssh(connection=buffer_connection.moler_connection, login="user", password="english", prompt="client:~ #",
host="host.domain.net", expected_prompt="host:.*#", known_hosts_on_failure='badvalue',
options=None)
assert "TERM=xterm-mono ssh -l user host.domain.net" == ssh_cmd.command_string
with pytest.raises(CommandFailure):
ssh_cmd()
def test_ssh_returns_proper_command_string(buffer_connection):
ssh_cmd = Ssh(buffer_connection, login="user", password="english",
host="host.domain.net", expected_prompt="host:.*#", options=None)
assert "TERM=xterm-mono ssh -l user host.domain.net" == ssh_cmd.command_string
def test_ssh_failed_host_key_verification(buffer_connection, command_output_failed_host_key_verification):
command_output = command_output_failed_host_key_verification
buffer_connection.remote_inject_response([command_output])
ssh_cmd = Ssh(connection=buffer_connection.moler_connection, login="user", password="english",
host="host.domain.net", expected_prompt="server:.*#", prompt="host:~ #", options=None)
assert "TERM=xterm-mono ssh -l user host.domain.net" == ssh_cmd.command_string
with pytest.raises(CommandFailure):
ssh_cmd()
def push_yes_if_needed(self, line):
if (not self._sent_continue_connecting) and self._regex_helper.search_compiled(Ssh._re_yes_no, line):
self.connection.send('yes')
self._sent_continue_connecting = True
def is_password_requested(self, line):
return self._regex_helper.search_compiled(Ssh._re_password, line)
def is_failure_indication(self, line):
return self._regex_helper.search_compiled(Ssh._re_failed_strings, line)
def _get_hosts_file_if_displayed(self, line):
"""
Checks if line from device has info about hosts file.
:param line: Line from device.
:return: Nothing but raises ParsingDone if regex matches.
"""
if (self.known_hosts_on_failure is not None) and self._regex_helper.search_compiled(Ssh._re_host_key, line):
self._hosts_file = self._regex_helper.group("HOSTS_FILE")
raise ParsingDone()
def is_failure_indication(self, line):
"""
Detects fail from command output.
:param line: Line from device
:return: Match object if matches, None otherwise
"""
return self._regex_helper.search_compiled(Ssh._re_failed_strings, line)
def _id_dsa(self, line):
"""
Checks id dsa.
:param line: Line from device.
:return: Nothing but raises ParsingDone if regex matches.
"""
if Ssh._re_id_dsa.search(line):
self.connection.sendline("")
raise ParsingDone()