Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_su_catches_authentication_failure(buffer_connection, command_output_and_expected_result_auth):
from moler.exceptions import CommandFailure
command_output, expected_result = command_output_and_expected_result_auth
buffer_connection.remote_inject_response([command_output])
su_cmd = Su(connection=buffer_connection.moler_connection, prompt=r"xyz@debian:", expected_prompt=r"root@debian")
with pytest.raises(CommandFailure):
su_cmd()
def test_run_script_raise_exception(buffer_connection, command_output_and_expected_result):
command_output, expected_result = command_output_and_expected_result
buffer_connection.remote_inject_response([command_output])
cmd = RunScript(connection=buffer_connection.moler_connection, script_command="./myScript.sh")
with pytest.raises(CommandFailure):
cmd()
def test_socat_raise_connection_refused(buffer_connection, command_output_and_expected_result_on_connection_refused):
socat_cmd = Socat(connection=buffer_connection.moler_connection, input_options='STDIO',
output_options='tcp:localhost:3334', options='-d')
command_output, expected_result = command_output_and_expected_result_on_connection_refused
buffer_connection.remote_inject_response([command_output])
assert 'socat -d STDIO tcp:localhost:3334' == socat_cmd.command_string
with pytest.raises(CommandFailure):
socat_cmd()
def _check_command_failure(self, line):
"""
Checks if line has info about command failure.
:param line: Line from device.
:return: None
:raise ParsingDone: if regex matches.
"""
if self._regex_helper.search_compiled(self._re_command_fail, line):
self.set_exception(CommandFailure(self, "Found error regex in line '{}'".format(line)))
raise ParsingDone
# instead of setting it
raise CommandFailure(
self,
"Neither 'cmd_class_name' nor 'cmd_object' nor 'sudo_params' was provided to Sudo constructor."
"Please specific parameter.")
if self.cmd_object and self.cmd_class_name:
# _validate_start is called before running command on connection, so we raise exception instead
# of setting it
raise CommandFailure(
self,
"Both 'cmd_object' and 'cmd_class_name' parameters provided. Please specify only one."
)
if self.cmd_object and self.cmd_object.done():
# _validate_start is called before running command on connection, so we raise exception
# instead of setting it
raise CommandFailure(
self,
"Not allowed to run again the embedded command (embedded command is done): {}.".format(
self.cmd_object))
if not self.cmd_object:
self._finish_on_final_prompt = True
self._validated_embedded_parameters = True
def _parse_errors(self, line):
if self._regex_helper.search(Mv._reg_fail, line):
self.set_exception(CommandFailure(self, "ERROR: {}".format(self._regex_helper.group(1))))
raise ParsingDone
def _host_key_verification(self, line):
"""
Checks regex host key verification.
:param line: Line from device.
:return: Nothing but raises ParsingDone if regex matches.
"""
if self._regex_helper.search_compiled(Ssh._re_host_key_verification_failed, line):
if self._hosts_file:
self._handle_failed_host_key_verification()
else:
self.set_exception(CommandFailure(self, "command failed in line '{}'".format(line)))
raise ParsingDone()
def _parse_error(self, line):
if self._regex_helper.search_compiled(Cut._re_parse_error, line):
self.set_exception(CommandFailure(self, "ERROR: {}".format(self._regex_helper.group("ERROR"))))
raise ParsingDone
def _parse_error(self, line):
if self._regex_helper.search(Chmod._reg_fail, line):
self.set_exception(CommandFailure(self, "ERROR: {} or {}".format(self._regex_helper.group("ERROR"),
self._regex_helper.group("ERROR1"))))
raise ParsingDone
def _parse_too_simple_password(self, line):
"""
Parse too simple password error.
:param line: Line from device.
:return: None but raises ParsingDone if all required commands are sent.
"""
if self._regex_helper.search_compiled(Passwd._re_too_simple_password, line):
self.set_exception(CommandFailure(self, "New password is too simple."))
self._cancel_cmd = True