Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_020_get_all_disks(self):
logged_in_org = self.client.get_org()
org = Org(self.client, resource=logged_in_org)
vdc_resource = org.get_vdc(self.config['vcd']['vdc'])
vdc = VDC(self.client, resource=vdc_resource)
disks = vdc.get_disks()
assert len(disks) > 0
assert disks[0].get('name') == self.config['vcd']['idisk_name']
def test_010_get_storage_profiles(self):
logged_in_org = self.client.get_org()
org = Org(self.client, resource=logged_in_org)
vdc_resource = org.get_vdc(self.config['vcd']['vdc'])
vdc = VDC(self.client, resource=vdc_resource)
profiles = vdc.get_storage_profiles()
assert len(profiles) > 0
def test_0060_vdc_metadata(self):
"""Test the methods related to metadata manipulation in vdc.py.
This test passes if all the metadata operations are successful.
"""
vapp_author_client = None
sys_admin_client = None
try:
logger = Environment.get_default_logger()
vapp_author_client = Environment.get_client_in_default_org(
CommonRoles.VAPP_AUTHOR)
vdc_vapp_author_view = VDC(client=vapp_author_client,
href=get_non_admin_href(
TestOrgVDC._new_vdc_href))
sys_admin_client = Environment.get_sys_admin_client()
vdc_sys_admin_view = VDC(client=sys_admin_client,
href=TestOrgVDC._new_vdc_href)
# try to add new metadata as vapp author
try:
logger.debug(f'Adding metadata [key={TestOrgVDC._metadata_key}'
', value={TestOrgVDC._metadata_value}]) as vApp '
'author')
vdc_vapp_author_view.set_metadata(
key=TestOrgVDC._metadata_key,
value=TestOrgVDC._metadata_value)
self.assertFail('vApp author shouldn\'t have been able to '
'add new metadta entry.')
except OperationNotSupportedException as e:
pass
# add new metadata as sys admin
logger.debug(f'Adding metadata [key={TestOrgVDC._metadata_key},'
'value={TestOrgVDC._metadata_value}]) as Sys admin.')
def test_10_remove_last_vdc_access(self):
logged_in_org = self.client.get_org()
org = Org(self.client, resource=logged_in_org)
vdc_resource = org.get_vdc(self.config['vcd']['vdc'])
vdc = VDC(self.client, resource=vdc_resource)
vdc.share_with_org_members()
control_access = vdc.remove_access_settings(
access_settings_list=[
{'name': self.config['vcd']['access_user1'], 'type': 'user'}
])
self.assertFalse(hasattr(control_access, 'AccessSettings'))
api_version='27.0',
verify_ssl_certs=False,
log_file='pyvcloud.log',
log_requests=True,
log_headers=True,
log_bodies=True)
client.set_credentials(BasicLoginCredentials(user, org, password))
task_monitor = client.get_task_monitor()
print("Fetching Org...")
org_resource = client.get_org()
org = Org(client, resource=org_resource)
print("Fetching VDC...")
vdc_resource = org.get_vdc(vdc)
vdc = VDC(client, resource=vdc_resource)
print("Fetching vApp...")
vapp_resource = vdc.get_vapp(vapp)
vapp = VApp(client, resource=vapp_resource)
print("Fetching VM...")
vm_resource = vapp.get_vm(vm)
vm = VM(client, resource=vm_resource)
print("Creating Snapshot...")
snaphot_resource = vm.snapshot_create(memory=False, quiesce=False)
print("Waiting for Snapshot finish...")
task_monitor.wait_for_success(snaphot_resource)
print("Revert Back To Current Snapshot...")
vm.reload()
def _create_nodes_async(self, *args,
cluster_name, cluster_vdc_href, vapp_href,
cluster_id, template_name, template_revision,
num_workers, network_name, num_cpu, mb_memory,
storage_profile_name, ssh_key, enable_nfs,
rollback):
try:
org = vcd_utils.get_org(self.context.client)
vdc = VDC(self.context.client, href=cluster_vdc_href)
vapp = vcd_vapp.VApp(self.context.client, href=vapp_href)
template = get_template(name=template_name,
revision=template_revision)
server_config = utils.get_server_runtime_config()
catalog_name = server_config['broker']['catalog']
node_type = NodeType.WORKER
if enable_nfs:
node_type = NodeType.NFS
msg = f"Creating {num_workers} node(s) from template " \
f"'{template_name}' (revision {template_revision}) and " \
f"adding to cluster '{cluster_name}' ({cluster_id})"
LOGGER.debug(msg)
self._update_task(vcd_client.TaskStatus.RUNNING, message=msg)
def get_gateway(ctx, name):
"""Get the sdk's gateway resource.
It will restore sessions if expired. It will read the client and vdc
from context and make get_gateway call to VDC for gateway object.
"""
restore_session(ctx, vdc_required=True)
client = ctx.obj['client']
vdc_href = ctx.obj['profiles'].get('vdc_href')
vdc = VDC(client, href=vdc_href)
gateway = vdc.get_gateway(name)
gateway_resource = Gateway(client, href=gateway.get('href'))
return gateway_resource
def capture_vapp(self):
vapp_name = self.params.get('vapp_name')
vdc_name = self.params.get('vdc_name')
catalog_name = self.params.get('catalog_name')
item_name = self.params.get('item_name')
desc = self.params.get('description')
customize_on_instantiate = self.params.get('customize_on_instantiate')
overwrite = self.params.get('overwrite')
response = dict()
response['changed'] = False
v = self.org.get_vdc(vdc_name)
vdc = VDC(self.client, href=v.get('href'))
vapp = vdc.get_vapp(vapp_name)
catalog = self.org.get_catalog(catalog_name)
self.org.capture_vapp(
catalog_resource=catalog, vapp_href=vapp.get('href'),
catalog_item_name=item_name, description=desc,
customize_on_instantiate=customize_on_instantiate,
overwrite=overwrite)
self.ova_check_resolved()
response['msg'] = "Catalog Item {} has been captured".format(item_name)
response['changed'] = True
return response
def process_request_provision_pv(self, cluster_name, cluster_id, vapp, moid, json_request):
vdc_resource = self.client_sysadmin.get_linked_resource(vapp.resource, RelationType.UP, EntityType.VDC.value)
vdc = VDC(self.client_sysadmin, resource=vdc_resource)
disk_name_orig = json_request['PVC']['metadata']['name']
disk_name = '%s-%s' % (cluster_name, disk_name_orig)
disk_size = json_request['PVC']['spec']['resources']['requests']['storage']
d = None
try:
d = vdc.get_disk(disk_name)
except Exception as e:
if str(e).startswith('Found multiple'):
raise Exception('disk already exists')
if d is not None:
raise Exception('disk already exists')
# if disk doesn't exist:
# create it
# if disk is already attached: