Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
size and description. Revert the changes back once the test is over.
This test passes if the updated disk's name, size and description
matches the expected values.
"""
vdc = Environment.get_test_vdc(TestDisk._client)
result = vdc.update_disk(
name=self._idisk1_name,
new_name=self._idisk1_new_name,
new_size=self._idisk1_new_size,
new_description=self._idisk1_new_description)
task = TestDisk._client.get_task_monitor().wait_for_success(
task=result)
self.assertEqual(task.get('status'), TaskStatus.SUCCESS.value)
disk = vdc.get_disk(name=self._idisk1_new_name)
self.assertIsNotNone(disk)
self.assertEqual(disk.Description, self._idisk1_new_description)
def test_0098_teardown(self):
"""Test the method vdc.delete_vapp().
Invoke the method for all the vApps created by setup.
This test passes if all the tasks for deleting the vApps succeed.
"""
vdc = Environment.get_test_vdc(TestVappDhcp._client)
task = vdc.delete_vapp(name=TestVappDhcp._vapp_name, force=True)
result = TestVappDhcp._client.get_task_monitor().wait_for_success(task)
self.assertEqual(result.get('status'), TaskStatus.SUCCESS.value)
def test_02_cluster_create(self):
logged_in_org = self.client.get_org()
assert self.config['vcd']['org'] == logged_in_org.get('name')
cluster = Cluster(self.client)
result = cluster.create_cluster(self.config['vcd']['vdc'],
self.config['vcd']['network'],
self.config['vcd']['cluster_name'])
task = self.client.get_resource(result['task_href'])
task = self.client.get_task_monitor().wait_for_status(task)
assert task.get('status') == TaskStatus.SUCCESS.value
def test_04_cluster_delete(self):
logged_in_org = self.client.get_org()
assert self.config['vcd']['org'] == logged_in_org.get('name')
cluster = Cluster(self.client)
result = cluster.delete_cluster(self.config['vcd']['cluster_name'])
task = self.client.get_resource(result['task_href'])
task = self.client.get_task_monitor().wait_for_status(task)
assert task.get('status') == TaskStatus.SUCCESS.value
def test_0073_move_to(self):
"""Test the method related to move VM from one vapp to another.
This test passes if move VM operation is successful.
"""
target_vapp_name = TestVM._test_vapp_name
source_vapp_name = TestVM._empty_vapp_name
target_vm_name = TestVM._target_vm_name
vapp = VApp(TestVM._client, href=TestVM._empty_vapp_href)
vm_resource = vapp.get_vm(TestVM._target_vm_name)
TestVM._target_vm_href = vm_resource.get('href')
vm = VM(TestVM._client, href=TestVM._target_vm_href)
task = vm.move_to(source_vapp_name=source_vapp_name,
target_vapp_name=target_vapp_name,
target_vm_name=target_vm_name)
result = TestVM._client.get_task_monitor().wait_for_success(task)
self.assertEqual(result.get('status'), TaskStatus.SUCCESS.value)
LOGGER.info('PV %s(%s), umount, VM: %s, moid: %s' % (disk_name, disk_id, vm_name, target_moid))
assert result[0] == 0
LOGGER.info('PV %s(%s), detaching from: %s, moid: %s' % (disk_name, disk_id, vm_name, target_moid))
task = target_vapp.detach_disk_from_vm(
disk_href=disk_resource.get('href'),
disk_type=disk_resource.get('type'),
disk_name=disk_resource.get('name'),
vm_name=vm_name)
task_resource = self.client_sysadmin.get_task_monitor().wait_for_status(
task=task,
timeout=60,
poll_frequency=2,
fail_on_status=None,
expected_target_statuses=[
TaskStatus.SUCCESS,
TaskStatus.ABORTED,
TaskStatus.ERROR,
TaskStatus.CANCELED])
task_status = task_resource.get('status').lower()
LOGGER.info('PV %s(%s), detach disk status: %s' % (disk_name, disk_id, task_status))
assert task_status == TaskStatus.SUCCESS.value
target_vapp = VApp(self.client_sysadmin, href=target_node['href'])
target_moid = target_vapp.get_vm_moid(target_node['name'])
LOGGER.info('PV %s(%s), attaching to: %s, moid: %s' % (disk_name, disk_id, target_node['name'], target_moid))
task = target_vapp.attach_disk_to_vm(
disk_href=disk_href,
vm_name=target_node['name'])
task_resource = self.client_sysadmin.get_task_monitor().wait_for_status(
task=task,
timeout=60,
poll_frequency=2,
fail_on_status=None,
expected_target_statuses=[
TaskStatus.SUCCESS,
TaskStatus.ABORTED,
TaskStatus.ERROR,
TaskStatus.CANCELED])
task_status = task_resource.get('status').lower()
LOGGER.info('PV %s(%s), attach disk status: %s' % (disk_name, disk_id, task_status))
assert task_status == TaskStatus.SUCCESS.value
target_vapp.reload()
vm_name = target_node['name']
vm_resource = target_vapp.get_vm(vm_name)
unit_number = get_unit_number(vm_resource, disk_resource)
vs = VSphere(
self.config['vcs']['host'],
self.config['vcs']['username'],
self.config['vcs']['password'],
port=int(self.config['vcs']['port']))
vs.connect()
vm = vs.get_vm_by_moid(target_moid)
max_retries = 25
if upgrade_docker:
msg = f"Upgrading Docker-CE ({c_docker} -> {t_docker}) " \
f"in nodes {all_node_names}"
self._update_task(vcd_client.TaskStatus.RUNNING, message=msg)
filepath = ltm.get_script_filepath(template_name,
template_revision,
ScriptFile.DOCKER_UPGRADE)
script = utils.read_data_file(filepath, logger=LOGGER)
run_script_in_nodes(self.context.sysadmin_client, vapp_href,
all_node_names, script)
if upgrade_cni:
msg = "Applying CNI " \
f"({curr_entity.entity.status.cni} " \
f"-> {t_cni}) in master node {master_node_names}"
self._update_task(vcd_client.TaskStatus.RUNNING, message=msg)
filepath = ltm.get_script_filepath(template_name,
template_revision,
ScriptFile.MASTER_CNI_APPLY)
script = utils.read_data_file(filepath, logger=LOGGER)
run_script_in_nodes(self.context.sysadmin_client, vapp_href,
master_node_names, script)
# uncordon all nodes (sometimes redundant)
msg = f"Uncordoning all nodes {all_node_names}"
self._update_task(vcd_client.TaskStatus.RUNNING, message=msg)
_uncordon_nodes(self.context.sysadmin_client, vapp_href,
all_node_names, cluster_name=cluster_name)
# update cluster metadata
msg = f"Updating metadata for cluster '{cluster_name}'"
self._update_task(vcd_client.TaskStatus.RUNNING, message=msg)
def wait_for_status(self,
task,
timeout=_DEFAULT_TIMEOUT_SEC,
poll_frequency=_DEFAULT_POLL_SEC,
fail_on_statuses=[
TaskStatus.ABORTED, TaskStatus.CANCELED,
TaskStatus.ERROR
],
expected_target_statuses=[TaskStatus.SUCCESS],
callback=None):
"""Waits for task to reach expected status.
:param Task task: Task returned by post or put calls.
:param float timeout: Time (in seconds, floating point, fractional)
to wait for task to finish.
:param float poll_frequency: time (in seconds, as above) with which
task will be polled.
:param list fail_on_statuses: method will raise an exception if any
of the TaskStatus in this list is reached. If this parameter is
None then either task will achieve expected target status or throw
TimeOutException.
:param list expected_target_statuses: list of expected target
def _delete_cluster_async(self, cluster_id):
try:
curr_entity: def_models.DefEntity = self.entity_svc.get_entity(
cluster_id)
cluster_name = curr_entity.name
org_name = curr_entity.entity.metadata.org_name
ovdc_name = curr_entity.entity.metadata.ovdc_name
cluster_vdc_href = self._get_vdc_href(org_name, ovdc_name)
msg = f"Deleting cluster '{cluster_name}'"
self._update_task(vcd_client.TaskStatus.RUNNING, message=msg)
_delete_vapp(self.context.client, cluster_vdc_href, cluster_name)
msg = f"Deleted cluster '{cluster_name}'"
self._update_task(vcd_client.TaskStatus.SUCCESS, message=msg)
self.entity_svc.delete_entity(cluster_id)
except Exception as err:
self._fail_operation_and_resolve_entity(cluster_id,
DefEntityOperation.DELETE)
LOGGER.error(f"Unexpected error while deleting cluster: {err}",
exc_info=True)
self._update_task(vcd_client.TaskStatus.ERROR,
error_message=str(err))
finally:
self.context.end()
# all parameters following '*args' are required and keyword-only