Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _parse_prompts(self, line):
for prompt_regex in sorted(self.compiled_prompts_regex.keys(), key=attrgetter('pattern')):
if self._regex_helper.search_compiled(prompt_regex, line):
current_ret = {
'line': line,
'prompt_regex': prompt_regex.pattern,
'state': self.compiled_prompts_regex[prompt_regex],
'time': datetime.datetime.now()
}
self.event_occurred(event_data=current_ret)
raise ParsingDone()
def _command_failure(self, line):
if self._regex_helper.search_compiled(Mount._re_error, line):
self.set_exception(CommandFailure(self, "ERROR: {}".format(self._regex_helper.group("ERROR"))))
raise ParsingDone
Parse line containing overwriting warning.
:param line: Line from device.
:return: None but raises ParsingDone if regex matches.
"""
if self._regex_helper.search_compiled(self._re_overwrite, line):
compressed_file_name = self._regex_helper.group("COMPRESSED_FILE_NAME")
if compressed_file_name not in self.answered_files:
if self.overwrite:
self.connection.sendline('y')
else:
self.connection.sendline('n')
self.set_exception(
CommandFailure(
self, "ERROR: {} already exists".format(compressed_file_name)))
self.answered_files.add(compressed_file_name)
raise ParsingDone
"""
Parses the output of the command.
:param line: Line to process, can be only part of line. New line chars are removed from line.
:param is_full_line: True if line had new line chars, False otherwise
:return: None
"""
if is_full_line:
self._line_nr += 1
try:
if self._line_nr > 1:
self._re_fail = None
else:
self._parse_error(line)
self._parse_line(line)
except ParsingDone:
pass
return super(Cat, self).on_new_line(line, is_full_line)
def on_new_line(self, line, is_full_line):
try:
self._parse_prompts(line)
except ParsingDone:
pass
def _push_yes_if_needed(self, line):
"""
Checks if line from device has information about waiting for sent yes/no.
:param line: Line from device.
:return: Nothing but raises ParsingDone if regex matches.
"""
if (not self._sent_continue_connecting) and self._regex_helper.search_compiled(Ssh._re_yes_no, line):
self.connection.sendline('yes')
self._sent_continue_connecting = True
raise ParsingDone()
def _parse_line_with_force_option(self, line):
if self.options:
if self.options.find('-f') or self.options.find('--force'):
if self._regex_helper.search_compiled(Userdel._re_command_error_user_used, line):
self.current_ret['RESULT'].append("User {} currently used by process {} was deleted".format(
self._regex_helper.group("USER"), self._regex_helper.group("PROCESS")))
raise ParsingDone
def _parse_overwrite(self, line):
if re.search(Sshkeygen._re_overwrite, line) and not self._overwrite_sent:
if self.overwrite:
self.connection.sendline("y")
else:
self.connection.sendline("n")
self._overwrite_sent = True
raise ParsingDone
def on_new_line(self, line, is_full_line):
if is_full_line:
try:
self._command_failure(line)
self._remove_features(line)
self._parse_info_msg(line)
self._parse_info_dict_define(line)
self._parse_info_dict_undefined(line)
self._parse_output(line)
except ParsingDone:
pass
return super(Socat, self).on_new_line(line, is_full_line)
def _parse_since(self, line):
"""
Parses date and time from line since when system has started.
:param line: Line from device
:return: Nothing but raises ParsingDone if regex matches.
"""
if self._regex_helper.search_compiled(Uptime._re_date_time, line):
self.current_ret["date"] = self._regex_helper.group("DATE")
self.current_ret["time"] = self._regex_helper.group("TIME")
raise ParsingDone()