Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# If multiple lines in the output take the last line
prompt = prompt.split(self.RESPONSE_RETURN)[-1]
prompt = prompt.strip()
# Check that ends with a valid terminator character
if not prompt[-1] in (pri_prompt_terminator, alt_prompt_terminator):
raise ValueError("Router prompt not found: {0}".format(prompt))
# Strip off any leading HRP_. characters for USGv5 HA
prompt = re.sub(r"^HRP_.", "", prompt, flags=re.M)
# Strip off leading and trailing terminator
prompt = prompt[1:-1]
prompt = prompt.strip()
self.base_prompt = prompt
log.debug("prompt: {0}".format(self.base_prompt))
return self.base_prompt
"""
output = ""
check_count = 12
while check_count >= 0:
if self.check_config_mode():
self.write_channel(self.normalize_cmd(exit_config))
output += self.read_until_pattern(pattern=pattern)
else:
break
check_count -= 1
# One last check for whether we successfully exited config mode
if self.check_config_mode():
raise ValueError("Failed to exit configuration mode")
log.debug(f"exit_config_mode: {output}")
return output
def disable_paging(self, command="terminal length 0", delay_factor=1):
"""Must be in enable mode to disable paging."""
self.enable()
delay_factor = self.select_delay_factor(delay_factor)
time.sleep(delay_factor * .1)
self.clear_buffer()
command = self.normalize_cmd(command)
log.debug("In disable_paging")
log.debug("Command: {0}".format(command))
self.write_channel(command)
output = self.read_until_prompt()
if self.ansi_escape_codes:
output = self.strip_ansi_escape_codes(output)
log.debug("{0}".format(output))
log.debug("Exiting disable_paging")
return output
def check_config_mode(self, check_string=")#", pattern=""):
"""
Checks if the device is in configuration mode or not.
Arista, unfortunately, does this:
loc1-core01(s1)#
Can also be (s2)
"""
log.debug("pattern: {0}".format(pattern))
self.write_channel(self.RETURN)
output = self.read_until_pattern(pattern=pattern)
log.debug("check_config_mode: {0}".format(repr(output)))
output = output.replace("(s1)", "")
output = output.replace("(s2)", "")
log.debug("check_config_mode: {0}".format(repr(output)))
return check_string in output
def disable_paging(self, command="terminal length 999", delay_factor=1):
"""Disable paging default to a Cisco CLI method."""
delay_factor = self.select_delay_factor(delay_factor)
time.sleep(delay_factor * .1)
self.clear_buffer()
command = self.normalize_cmd(command)
log.debug("In disable_paging")
log.debug("Command: {0}".format(command))
self.write_channel(command)
output = self.read_until_prompt()
if self.ansi_escape_codes:
output = self.strip_ansi_escape_codes(output)
log.debug("{0}".format(output))
log.debug("Exiting disable_paging")
return output
config_commands = (config_commands,)
if not hasattr(config_commands, "__iter__"):
raise ValueError("Invalid argument passed into send_config_set")
# Send config commands
output = ""
for cmd in config_commands:
output += self.send_command(cmd)
if self.fast_cli:
pass
else:
time.sleep(delay_factor * 0.05)
output = self._sanitize_output(output)
log.debug("{}".format(output))
return output
elif not cmd_verify:
for cmd in config_commands:
self.write_channel(self.normalize_cmd(cmd))
time.sleep(delay_factor * 0.05)
else:
for cmd in config_commands:
self.write_channel(self.normalize_cmd(cmd))
time.sleep(delay_factor * 0.05)
output += self.read_until_pattern(pattern=re.escape(cmd))
# Gather output
output += self._read_channel_timing(
delay_factor=delay_factor, max_loops=max_loops
)
output += self._sanitize_output(output)
log.debug(f"{output}")
return output
# Need to send multiple times to test connection
self.remote_conn.sock.sendall(telnetlib.IAC + telnetlib.NOP)
self.remote_conn.sock.sendall(telnetlib.IAC + telnetlib.NOP)
self.remote_conn.sock.sendall(telnetlib.IAC + telnetlib.NOP)
return True
except AttributeError:
return False
else:
# SSH
try:
# Try sending ASCII null byte to maintain the connection alive
log.debug("Sending the NULL byte")
self.write_channel(null)
return self.remote_conn.transport.is_active()
except (socket.error, EOFError):
log.error("Unable to send", exc_info=True)
# If unable to send, we can tell for sure that the connection is unusable
return False
return False
new_output = self.read_channel()
username_pattern = r"(username|login|user name)"
if re.search(username_pattern, new_output, flags=re_flags):
output += new_output
new_output = self.send_command_timing(default_username)
if re.search(pattern, new_output, flags=re_flags):
output += new_output
self.write_channel(self.normalize_cmd(self.secret))
new_output = self._read_channel_timing()
if self.check_enable_mode():
output += new_output
return output
output += new_output
i += 1
log.debug(f"{output}")
self.clear_buffer()
msg = (
"Failed to enter enable mode. Please ensure you pass "
"the 'secret' argument to ConnectHandler."
)
if not self.check_enable_mode():
raise ValueError(msg)
return output
try:
# Try sending IAC + NOP (IAC is telnet way of sending command)
# IAC = Interpret as Command; it comes before the NOP.
log.debug("Sending IAC + NOP")
# Need to send multiple times to test connection
self.remote_conn.sock.sendall(telnetlib.IAC + telnetlib.NOP)
self.remote_conn.sock.sendall(telnetlib.IAC + telnetlib.NOP)
self.remote_conn.sock.sendall(telnetlib.IAC + telnetlib.NOP)
return True
except AttributeError:
return False
else:
# SSH
try:
# Try sending ASCII null byte to maintain the connection alive
log.debug("Sending the NULL byte")
self.write_channel(null)
return self.remote_conn.transport.is_active()
except (socket.error, EOFError):
log.error("Unable to send", exc_info=True)
# If unable to send, we can tell for sure that the connection is unusable
return False
return False