Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# initiate SSH connection
try:
self.remote_conn_pre.connect(**ssh_connect_params)
except socket.error:
self.paramiko_cleanup()
msg = "Connection to device timed-out: {device_type} {ip}:{port}".format(
device_type=self.device_type, ip=self.host, port=self.port
)
raise NetmikoTimeoutException(msg)
except paramiko.ssh_exception.AuthenticationException as auth_err:
self.paramiko_cleanup()
msg = "Authentication failure: unable to connect {device_type} {ip}:{port}".format(
device_type=self.device_type, ip=self.host, port=self.port
)
msg += self.RETURN + str(auth_err)
raise NetmikoAuthenticationException(msg)
if self.verbose:
print(f"SSH connection established to {self.host}:{self.port}")
# Use invoke_shell to establish an 'interactive session'
if width and height:
self.remote_conn = self.remote_conn_pre.invoke_shell(
term="vt100", width=width, height=height
)
else:
self.remote_conn = self.remote_conn_pre.invoke_shell()
self.remote_conn.settimeout(self.blocking_timeout)
if self.keepalive:
self.remote_conn.transport.set_keepalive(self.keepalive)
self.special_login_handler()
) or re.search(alt_prompt_terminator, output, flags=re.M):
return return_msg
# Check if proper data received
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, output, flags=re.M
):
return return_msg
self.write_channel(self.TELNET_RETURN)
time.sleep(0.5 * delay_factor)
i += 1
except EOFError:
self.remote_conn.close()
msg = f"Login failed: {self.host}"
raise NetmikoAuthenticationException(msg)
# Last try to see if we already logged in
self.write_channel(self.TELNET_RETURN)
time.sleep(0.5 * delay_factor)
output = self.read_channel()
return_msg += output
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, output, flags=re.M
):
return return_msg
msg = f"Login failed: {self.host}"
self.remote_conn.close()
raise NetmikoAuthenticationException(msg)
msg = f"Login failed: {self.host}"
raise NetmikoAuthenticationException(msg)
# Last try to see if we already logged in
self.write_channel(self.TELNET_RETURN)
time.sleep(0.5 * delay_factor)
output = self.read_channel()
return_msg += output
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, output, flags=re.M
):
return return_msg
msg = f"Login failed: {self.host}"
self.remote_conn.close()
raise NetmikoAuthenticationException(msg)
def _test_channel_read(self, count=40, pattern=""):
"""Since Keymile NOS always returns True on paramiko.connect() we
check the output for substring Login incorrect after connecting."""
output = super()._test_channel_read(count=count, pattern=pattern)
pattern = r"Login incorrect"
if re.search(pattern, output):
self.paramiko_cleanup()
msg = "Authentication failure: unable to connect"
msg += f"{self.device_type} {self.host}:{self.port}"
msg += self.RESPONSE_RETURN + "Login incorrect"
raise NetmikoAuthenticationException(msg)
else:
return output
def session_preparation(self):
"""
Prepare the session after the connection has been established
Cisco WLC uses "config paging disable" to disable paging
"""
self._test_channel_read()
try:
self.set_base_prompt()
except ValueError:
msg = f"Authentication failed: {self.host}"
raise NetmikoAuthenticationException(msg)
self.disable_paging(command="config paging disable")
# Clear the read buffer
time.sleep(0.3 * self.global_delay_factor)
self.clear_buffer()