Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if sleep_time <= 3:
# Double the sleep_time when it is small
sleep_time *= 2
else:
sleep_time += 1
count += 1
# If multiple lines in the output take the last line
prompt = self.normalize_linefeeds(prompt)
prompt = prompt.split(self.RESPONSE_RETURN)[-1]
prompt = prompt.strip()
if not prompt:
raise ValueError(f"Unable to find prompt: {prompt}")
time.sleep(delay_factor * 0.1)
self.clear_buffer()
log.debug(f"[find_prompt()]: prompt is {prompt}")
return prompt
def set_base_prompt(
self, pri_prompt_terminator=">", alt_prompt_terminator="]", delay_factor=1
):
"""
Sets self.base_prompt
Used as delimiter for stripping of trailing prompt in output.
Should be set to something that is general and applies in multiple contexts. For Comware
this will be the router prompt with < > or [ ] stripped off.
This will be set on logging in, but not when entering system-view
"""
log.debug("In set_base_prompt")
delay_factor = self.select_delay_factor(delay_factor)
self.clear_buffer()
self.write_channel(self.RETURN)
time.sleep(0.5 * delay_factor)
prompt = self.read_channel()
prompt = self.normalize_linefeeds(prompt)
# If multiple lines in the output take the last line
prompt = prompt.split(self.RESPONSE_RETURN)[-1]
prompt = prompt.strip()
# Check that ends with a valid terminator character
if not prompt[-1] in (pri_prompt_terminator, alt_prompt_terminator):
raise ValueError("Router prompt not found: {0}".format(prompt))
if len(outbuf) == 0:
raise EOFError("Channel stream closed by remote device.")
output += outbuf.decode("utf-8", "ignore")
else:
break
elif self.protocol == "telnet":
output = self.remote_conn.read_very_eager().decode("utf-8", "ignore")
elif self.protocol == "serial":
output = ""
while self.remote_conn.in_waiting > 0:
output += self.remote_conn.read(self.remote_conn.in_waiting).decode(
"utf-8", "ignore"
)
if self.ansi_escape_codes:
output = self.strip_ansi_escape_codes(output)
log.debug(f"read_channel: {output}")
self._write_session_log(output)
return output
def disable_paging(self, command="terminal length 0", delay_factor=1):
"""Must be in enable mode to disable paging."""
self.enable()
delay_factor = self.select_delay_factor(delay_factor)
time.sleep(delay_factor * .1)
self.clear_buffer()
command = self.normalize_cmd(command)
log.debug("In disable_paging")
log.debug("Command: {0}".format(command))
self.write_channel(command)
output = self.read_until_prompt()
if self.ansi_escape_codes:
output = self.strip_ansi_escape_codes(output)
log.debug("{0}".format(output))
log.debug("Exiting disable_paging")
return output
ESC[K Erase line from cursor to the end of line
ESC[2K Erase entire line
ESC[1;24r Enable scrolling from start to row end
ESC[?6l Reset mode screen with options 640 x 200 monochrome (graphics)
ESC[?7l Disable line wrapping
ESC[2J Code erase display
ESC[00;32m Color Green (30 to 37 are different colors) more general pattern is
ESC[\d\d;\d\dm and ESC[\d\d;\d\d;\d\dm
ESC[6n Get cursor position
HP ProCurve and Cisco SG300 require this (possible others).
:param string_buffer: The string to be processed to remove ANSI escape codes
:type string_buffer: str
""" # noqa
log.debug("In strip_ansi_escape_codes")
log.debug(f"repr = {repr(string_buffer)}")
code_position_cursor = chr(27) + r"\[\d+;\d+H"
code_show_cursor = chr(27) + r"\[\?25h"
code_next_line = chr(27) + r"E"
code_erase_line_end = chr(27) + r"\[K"
code_erase_line = chr(27) + r"\[2K"
code_erase_start_line = chr(27) + r"\[K"
code_enable_scroll = chr(27) + r"\[\d+;\d+r"
code_form_feed = chr(27) + r"\[1L"
code_carriage_return = chr(27) + r"\[1M"
code_disable_line_wrapping = chr(27) + r"\[\?7l"
code_reset_mode_screen_options = chr(27) + r"\[\?\d+l"
code_reset_graphics_mode = chr(27) + r"\[00m"
code_erase_display = chr(27) + r"\[2J"
code_graphics_mode = chr(27) + r"\[\d\d;\d\dm"
code_graphics_mode4,
code_get_cursor_position,
code_cursor_position,
code_erase_display,
code_attrs_off,
code_reverse,
]
output = string_buffer
for ansi_esc_code in code_set:
output = re.sub(ansi_esc_code, "", output)
# CODE_NEXT_LINE must substitute with return
output = re.sub(code_next_line, self.RETURN, output)
log.debug("Stripping ANSI escape codes")
log.debug(f"new_output = {output}")
log.debug(f"repr = {repr(output)}")
return output
def disable_paging(self, command="terminal length 0", delay_factor=1):
"""Must be in enable mode to disable paging."""
self.enable()
delay_factor = self.select_delay_factor(delay_factor)
time.sleep(delay_factor * .1)
self.clear_buffer()
command = self.normalize_cmd(command)
log.debug("In disable_paging")
log.debug("Command: {0}".format(command))
self.write_channel(command)
output = self.read_until_prompt()
if self.ansi_escape_codes:
output = self.strip_ansi_escape_codes(output)
log.debug("{0}".format(output))
log.debug("Exiting disable_paging")
return output