Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_0030_get_non_existent_vdc(self):
"""Test the method VDC.get_vdc().
This test passes if the non-existent vdc can't be successfully
retrieved by name.
"""
org = Environment.get_test_org(TestOrgVDC._client)
try:
resource = org.get_vdc(TestOrgVDC._non_existent_vdc_name)
self.assertIsNone(resource)
except EntityNotFoundException as e:
pass
def get_vm(self, vm_name):
"""Retrieve the vm with the given name in this vApp.
:param str vm_name: name of the vm to be retrieved.
:return: an object contains EntityType.VM XML data that represents the
vm.
:rtype: lxml.objectify.ObjectifiedElement
:raises: EntityNotFoundException: if the named vm could not be found.
"""
for vm in self.get_all_vms():
if vm.get('name') == vm_name:
return vm
raise EntityNotFoundException('Can\'t find VM \'%s\'' % vm_name)
def get_default_storage_profile(self):
"""Fetch the default Storage Profile within an org vdc.
:return: an object containing VdcStorageProfile XML element that
represents the requested storage profile.
:rtype: lxml.objectify.ObjectifiedElement
"""
for profile in self.get_storage_profiles():
profile_admin_href = get_admin_href(profile.get('href'))
profile_admin_res = self.client.get_resource(profile_admin_href)
if bool(profile_admin_res.Default):
return profile_admin_res
raise EntityNotFoundException('Default Storage Profile not found')
memory = params.get('memory')
cpu = params.get('cpu')
disk_size = params.get('disk_size')
vmpassword = params.get('vmpassword')
cust_script = params.get('cust_script')
vm_name = params.get('vm_name')
hostname = params.get('hostname')
ip_address = params.get('ip_address')
storage_profile = params.get('storage_profile')
network_adapter_type = params.get('network_adapter_type')
response = dict()
response['changed'] = False
try:
self.vdc.get_vapp(vapp_name)
except EntityNotFoundException:
create_vapp_task = self.vdc.instantiate_vapp(
name=vapp_name,
catalog=catalog_name,
template=template_name,
description=description,
network=network,
fence_mode=fence_mode,
ip_allocation_mode=ip_allocation_mode,
deploy=deploy,
power_on=power_on,
accept_all_eulas=accept_all_eulas,
memory=memory,
cpu=cpu,
disk_size=disk_size,
password=vmpassword,
cust_script=cust_script,
def ___validate_vapp_records(self, vapp_name, resource_type):
name_filter = ('name', vapp_name)
q1 = self.client.get_typed_query(
resource_type,
query_result_format=QueryResultFormat.REFERENCES,
equality_filter=name_filter)
records = list(q1.execute())
if records is None or len(records) == 0:
raise EntityNotFoundException(
'Vapp with name \'%s\' not found.' % vapp_name)
elif len(records) > 1:
raise MultipleRecordsException("Found multiple vapp named "
"'%s'," % vapp_name)
return records
def get_user(self, user_name):
"""Retrieve info of an user in current organization.
:param str user_name: name of the user whose info we want to retrieve.
:return: an object containing EntityType.USER XML data describing the
named user.
:rtype: lxml.objectify.ObjectifiedElement
"""
user_record = list(self.list_users(('name', user_name)))
if len(user_record) < 1:
raise EntityNotFoundException(
'User \'%s\' does not exist.' % user_name)
return self.client.get_resource(user_record[0].get('href'))
result = disk
# disk-id's are unique so it is ok to break the loop
# and stop looking further.
break
elif name is not None:
for disk in disks:
if disk.get('name') == name:
if result is None:
result = disk
else:
raise MultipleRecordsException(
'Found multiple disks with name %s'
', please identify disk via disk-id.' %
disk.get('name'))
if result is None:
raise EntityNotFoundException(
'No disk found with the given name/id.')
else:
return result
"""Retrieve an isolated org vdc network in the current vdc.
:param str name: name of the org vdc network we want to retrieve.
:return: an object containing EntityType.ORG_VDC_NETWORK XML data which
represents an org vdc network.
:rtype: lxml.objectify.ObjectifiedElement
:raises: EntityNotFoundException: if org vdc network with the given
name is not found.
"""
result = self.list_orgvdc_network_resources(
name=name, type=FenceMode.ISOLATED.value)
if len(result) == 0:
raise EntityNotFoundException(
'Org vdc network with name \'%s\' not found.' % name)
return result[0]
def __execute_gateway_query_api(self, filter=None):
query = self.client.get_typed_query(
ResourceType.EDGE_GATEWAY.value,
query_result_format=QueryResultFormat.RECORDS,
qfilter=filter)
query_records = query.execute()
if query_records is None:
raise EntityNotFoundException('No Gateway found associated')
return query_records
cpm = compute_policy_manager.ComputePolicyManager(
op_ctx.sysadmin_client,
log_wire=utils.str_to_bool(config['service'].get('log_wire'))) # noqa: E501
cp_href = None
cp_id = None
if cp_name == SYSTEM_DEFAULT_COMPUTE_POLICY_NAME:
for _cp in cpm.list_compute_policies_on_vdc(ovdc_id):
if _cp['name'] == cp_name:
cp_href = _cp['href']
cp_id = _cp['id']
else:
try:
_cp = cpm.get_vdc_compute_policy(cp_name)
cp_href = _cp['href']
cp_id = _cp['id']
except vcd_e.EntityNotFoundException:
pass
if cp_href is None:
raise e.BadRequestError(f"Compute policy '{cp_name}' not found.")
if action == ComputePolicyAction.ADD:
cpm.add_compute_policy_to_vdc(ovdc_id, cp_href)
# Record telemetry data
record_user_action(CseOperation.OVDC_COMPUTE_POLICY_ADD)
return f"Added compute policy '{cp_name}' ({cp_id}) to ovdc " \
f"({ovdc_id})"
if action == ComputePolicyAction.REMOVE:
# TODO: fix remove_compute_policy by implementing a proper way
# for calling async methods without having to pass op_ctx
# outside handlers.