Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create(self):
super().create()
create_mock("create bad")
raise errors.ProviderBaseError()
def test_shell_fails(self):
# multipass can fail due to several reasons and will display the error
# right above this exception message.
cmd = ["multipass", "shell", self.instance_name]
self.check_call_mock.side_effect = subprocess.CalledProcessError(1, cmd)
self.assertRaises(
errors.ProviderShellError,
self.multipass_command.shell,
instance_name=self.instance_name,
)
self.check_call_mock.assert_called_once_with(cmd, stdin=None)
self.check_output_mock.assert_not_called()
def test_launch_fails(self):
# multipass can fail due to several reasons and will display the error
# right above this exception message, it looks something like:
#
# Creating a build environment named 'nonfilterable-mayola'
# failed to launch: failed to obtain exit status for remote process
# An error occurred when trying to launch the instance with 'multipass'. # noqa: E501
cmd = ["multipass", "launch", "18.04", "--name", self.instance_name]
self.check_call_mock.side_effect = subprocess.CalledProcessError(1, cmd)
self.assertRaises(
errors.ProviderLaunchError,
self.multipass_command.launch,
instance_name=self.instance_name,
image="18.04",
)
self.check_call_mock.assert_called_once_with(cmd, stdin=subprocess.DEVNULL)
self.check_output_mock.assert_not_called()
def test_launch_fails(self):
self.popen_mock().poll.return_value = 2
self.assertRaises(
errors.ProviderLaunchError,
self.qemu_driver.launch,
hda="test-vm.qcow2",
qcow2_drives=["cloudlocal.qcow2"],
project_9p_dev="/snapcraft-project",
ram="1",
)
def test_delete_fails(self):
# multipass can fail due to several reasons and will display the error
# right above this exception message.
cmd = ["multipass", "delete", self.instance_name, "--purge"]
self.check_call_mock.side_effect = subprocess.CalledProcessError(1, cmd)
self.assertRaises(
errors.ProviderDeleteError,
self.multipass_command.delete,
instance_name=self.instance_name,
)
self.check_call_mock.assert_called_once_with(cmd, stdin=subprocess.DEVNULL)
self.check_output_mock.assert_not_called()
(
"ProviderMountError (error message)",
dict(
exception=errors.ProviderMountError,
kwargs=dict(provider_name="multipass", error_message="failed to mount"),
expected_message=(
"An error occurred with the instance when trying to mount with "
"'multipass': failed to mount.\n"
"Ensure that 'multipass' is setup correctly and try again."
),
),
),
(
"ProviderMountError (exit code and error message)",
dict(
exception=errors.ProviderMountError,
kwargs=dict(
provider_name="multipass",
exit_code=1,
error_message="failed to mount",
),
expected_message=(
"An error occurred with the instance when trying to mount with "
"'multipass': returned exit code 1: failed to mount.\n"
"Ensure that 'multipass' is setup correctly and try again."
),
),
),
(
"ProviderFileCopyError (exit code)",
dict(
exception=errors.ProviderFileCopyError,
) from decode_error
try:
instance_info = json_data["info"][instance_name]
except KeyError as missing_key:
raise errors.ProviderInfoDataKeyError(
provider_name="multipass", missing_key=str(missing_key), data=json_data
) from missing_key
try:
return cls(
name=instance_name,
state=instance_info["state"],
image_release=instance_info["image_release"],
mounts=instance_info["mounts"],
)
except KeyError as missing_key:
raise errors.ProviderInfoDataKeyError(
provider_name="multipass",
missing_key=str(missing_key),
data=instance_info,
) from missing_key
def __init__(self, *, ssh_username: str, ssh_key_file: str) -> None:
"""Initialize a QemuDriver instance.
:raises errors.ProviderCommandNotFound:
if the relevant qemu command is not found.
"""
provider_cmd = _get_qemu_command()
if not shutil.which(provider_cmd):
raise errors.ProviderCommandNotFound(command=provider_cmd)
self.provider_cmd = provider_cmd
# TODO detect collisions and make dynamic
self._telnet_port = 64444
# TODO detect collisions and make dynamic
self._ssh_port = 5555
self._ssh_username = ssh_username
self._ssh_key_file = ssh_key_file
self._qemu_proc = None # type: subprocess.Popen
self._ssh_client = paramiko.SSHClient()
self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
"""Destroy the instance, trying to stop it first."""
try:
instance_info = self._instance_info = self._get_instance_info()
except errors.ProviderInfoError:
return
if instance_info.is_stopped():
return
stop_time = _get_stop_time()
if stop_time > 0:
try:
self._multipass_cmd.stop(
instance_name=self.instance_name, time=stop_time
)
except errors.ProviderStopError:
self._multipass_cmd.stop(instance_name=self.instance_name)
else:
self._multipass_cmd.stop(instance_name=self.instance_name)
if self._is_ephemeral:
self.clean_project()
def __init__(self, *, root_dir: str) -> None:
"""Instantiate an SSHKey with the RSA key stored in root_dir.
:param str root_dir: the path to the directory where the private key
is stored.
:raises SSHKeyPathFileNotFoundError:
raised when the private key cannot be found. This exception should
generally be handled by the use of new_key.
"""
private_key_file_path = os.path.join(root_dir, "id_rsa")
if not os.path.exists(private_key_file_path):
raise errors.SSHKeyFileNotFoundError(
private_key_file_path=private_key_file_path
)
self._key = RSAKey.from_private_key_file(private_key_file_path)
self.private_key_file_path = private_key_file_path