Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ip_scope.append(E.Dns1(primary_dns_ip))
if secondary_dns_ip is not None:
ip_scope.append(E.Dns2(secondary_dns_ip))
if dns_suffix is not None:
ip_scope.append(E.DnsSuffix(dns_suffix))
e_ip_ranges = E.IpRanges()
for ip_range in ip_ranges:
e_ip_range = E.IpRange()
ip_range_token = ip_range.split('-')
e_ip_range.append(E.StartAddress(ip_range_token[0]))
e_ip_range.append(E.EndAddress(ip_range_token[1]))
e_ip_ranges.append(e_ip_range)
ip_scope.append(e_ip_ranges)
ip_scopes.append(ip_scope)
config.append(ip_scopes)
config.append(E.FenceMode(FenceMode.ISOLATED.value))
vmw_external_network.append(config)
vim_port_group_refs = E_VMEXT.VimPortGroupRefs()
for pg_moref in pg_morefs:
vim_object_ref = E_VMEXT.VimObjectRef()
vim_object_ref.append(E_VMEXT.VimServerRef(href=vc_href))
vim_object_ref.append(E_VMEXT.MoRef(pg_moref[0]))
vim_object_ref.append(E_VMEXT.VimObjectType(pg_moref[1]))
vim_port_group_refs.append(vim_object_ref)
vmw_external_network.append(vim_port_group_refs)
return self.client.post_linked_resource(
self.extension.get_resource(),
rel=RelationType.ADD,
media_type=EntityType.EXTERNAL_NETWORK.value,
contents=vmw_external_network)
"""
end_points = self.end_point.split('-')
local_ip = end_points[0]
peer_ip = end_points[1]
ipsec_vpn = self.resource
vpn_sites = ipsec_vpn.sites
for site in vpn_sites.site:
if site.localIp == local_ip and site.peerIp == peer_ip:
if is_enabled is not None:
site.enabled = E.enabled(is_enabled)
if name is not None:
site.name = E.name(name)
if description is not None:
site.description = E.description(description)
if local_id is not None:
site.localId = E.localId(local_id)
if local_ip_address is not None:
site.localIp = E.localIp(local_ip_address)
if peer_id is not None:
site.peerId = E.peerId(peer_id)
if peer_ip_address is not None:
site.peerIp = E.peerIp(peer_ip_address)
if encryption_protocol is not None:
site.encryptionAlgorithm = \
E.encryptionAlgorithm(encryption_protocol)
if mtu is not None:
site.mtu = E.mtu(mtu)
if enable_pfs is not None:
site.enablePfs = E.enablePfs(enable_pfs)
if local_subnet is not None:
local_subnets = E.localSubnets()
if ',' in local_subnet:
vm_instantiation_param = E.InstantiationParams()
if 'network' in spec:
primary_index = int(source_vm_resource.NetworkConnectionSection.
PrimaryNetworkConnectionIndex.text)
if 'ip_allocation_mode' in spec:
ip_allocation_mode = spec['ip_allocation_mode']
else:
ip_allocation_mode = 'DHCP'
vm_instantiation_param.append(
E.NetworkConnectionSection(
E_OVF.Info(),
E.NetworkConnection(
E.NetworkConnectionIndex(primary_index),
E.IsConnected(True),
E.IpAddressAllocationMode(ip_allocation_mode.upper()),
network=spec['network'])))
needs_customization = 'disk_size' in spec or 'password' in spec or \
'cust_script' in spec or 'hostname' in spec
if needs_customization:
guest_customization_param = E.GuestCustomizationSection(
E_OVF.Info(),
E.Enabled(True),
)
if 'password' in spec:
guest_customization_param.append(E.AdminPasswordEnabled(True))
guest_customization_param.append(E.AdminPasswordAuto(False))
guest_customization_param.append(
E.AdminPassword(spec['password']))
else:
if 'password_auto' in spec:
- description: (str): (optional) description of the vm.
- os_type: (str): (required) operating system type of vm.
- virtual_cpu: (int): (optional) no of virtual cpu.
- core_per_socket: (int):(optional) core per socket in cpu.
- cpu_resource_mhz: (int):(optional) cpu resource frequency in Mhz
- memory: (int):(optional) memory in Mb.
- media_href: (str): (optional) boot image of os href.
- media_id: (str): (optional) c boot image of os id.
- media_name: (str): (optional) boot image of os name.
:return: an object containing CreateItem XML element.
:rtype: lxml.objectify.ObjectifiedElement
"""
create_item = E.CreateItem()
create_item.set('name', specs['vm_name'])
desc = E.Description()
if 'description' in specs:
desc = E.Description(specs['description'])
create_item.append(desc)
guest_customization_param = E.GuestCustomizationSection()
guest_customization_param.append(
E_OVF.Info('Specifies Guest OS Customization Settings'))
guest_customization_param.append(E.ComputerName(specs['comp_name']))
create_item.append(guest_customization_param)
vm_spec_section = E.VmSpecSection()
vm_spec_section.set('Modified', 'true')
vm_spec_section.append(E_OVF.Info('Virtual Machine specification'))
vm_spec_section.append(E.OsType(specs['os_type']))
if 'virtual_cpu' in specs:
vm_spec_section.append(E.NumCpus(specs['virtual_cpu']))
else:
vm_spec_section.append(E.NumCpus(1))
self.client.get_org_by_name(
access_setting['name']).get('href'))
subject_type = EntityType.ADMIN_ORG.value
else:
raise InvalidParameterException("Invalid subject type")
subject_name = access_setting['name']
# Make 'ReadOnly' the default access_level if it is not specified.
if 'access_level' in access_setting:
access_level = access_setting['access_level']
else:
access_level = 'ReadOnly'
access_setting_params = E.AccessSetting(
E.Subject(
name=subject_name, href=subject_href, type=subject_type),
E.AccessLevel(access_level))
access_settings_params.append(access_setting_params)
return access_settings_params
if 'password' in spec:
guest_customization_param.append(E.AdminPasswordEnabled(True))
guest_customization_param.append(E.AdminPasswordAuto(False))
guest_customization_param.append(
E.AdminPassword(spec['password']))
else:
if 'password_auto' in spec:
guest_customization_param.append(
E.AdminPasswordEnabled(True))
guest_customization_param.append(E.AdminPasswordAuto(True))
else:
guest_customization_param.append(
E.AdminPasswordEnabled(False))
if 'password_reset' in spec:
guest_customization_param.append(
E.ResetPasswordRequired(spec['password_reset']))
if 'cust_script' in spec:
guest_customization_param.append(
E.CustomizationScript(spec['cust_script']))
if 'hostname' in spec:
guest_customization_param.append(
E.ComputerName(spec['hostname']))
vm_instantiation_param.append(guest_customization_param)
vm_general_params.append(E.NeedsCustomization(needs_customization))
sourced_item.append(vm_general_params)
sourced_item.append(vm_instantiation_param)
vdc_compute_policy_element, compute_policy_element = \
get_compute_policy_tags(float(self.client.get_api_version()),
spec.get('sizing_policy_href'))
if vdc_compute_policy_element is not None:
:param str name: New name of org vdc network. It is mandatory.
:param str description: New description of org vdc network
:param bool is_shared: True if user want to share else False.
:return: object containing EntityType.TASK XML data
representing the asynchronous task.
:rtype: lxml.objectify.ObjectifiedElement
"""
if name is None:
raise InvalidParameterException("Name can't be None or empty")
vdc_network = self.get_resource()
vdc_network.set('name', name)
if description is not None:
vdc_network.Description = E.Description(description)
if is_shared is not None:
vdc_network.IsShared = E.IsShared(is_shared)
return self.client.put_linked_resource(
self.resource, RelationType.EDIT, EntityType.ORG_VDC_NETWORK.value,
vdc_network)
:rtype: str
:raises MissingRecordException: if an extension with the given name and
namespace couldn't be found.
:raise MultipleRecordsException: if more than one service with the
given name and namespace are found.
"""
record = self._get_extension_record(name=name,
namespace=namespace,
format=QueryResultFormat.RECORDS)
params = E_VMEXT.Service({'name': name})
description = description or record.get('description')
if description is not None:
params.append(E.Description(description))
params.append(E_VMEXT.Namespace(record.get('namespace')))
params.append(E_VMEXT.Enabled(record.get('enabled')))
params.append(E_VMEXT.RoutingKey(
routing_key if routing_key else record.get('routingKey')))
params.append(E_VMEXT.Exchange(
exchange if exchange else record.get('exchange')))
self.client.put_resource(record.get('href'), params, None)
return record.get('href')
E.NumCoresPerSocket(specs['core_per_socket']))
else:
vm_spec_section.append(E.NumCoresPerSocket(1))
cpu_resource_mhz = E.CpuResourceMhz()
if 'cpu_resource_mhz' in specs:
cpu_resource_mhz.append(E.Configured(specs['cpu_resource_mhz']))
else:
cpu_resource_mhz.append(E.Configured(0))
vm_spec_section.append(cpu_resource_mhz)
memory_resource_mb = E.MemoryResourceMb()
if 'memory' in specs:
memory_resource_mb.append(E.Configured(specs['memory']))
else:
memory_resource_mb.append(E.Configured(512))
vm_spec_section.append(memory_resource_mb)
vm_spec_section.append(E.DiskSection())
vm_spec_section.append(E.HardwareVersion('vmx-14'))
vm_spec_section.append(E.VirtualCpuType('VM64'))
create_item.append(vm_spec_section)
if 'media_href' in specs and 'media_id' in specs and 'media_name' in \
specs:
media = E.Media()
media.set('href', specs['media_href'])
media.set('id', specs['media_id'])
media.set('name', specs['media_name'])
create_item.append(media)
return create_item
:return: an object containing EntityType.TASK XML data which represents
the asynchronous task that is updating the vApp network.
:rtype: lxml.objectify.ObjectifiedElement
:raises: InvalidParameterException: Enable firewall service failed as
given network's connection is not routed
"""
self._get_resource()
fence_mode = self.resource.Configuration.FenceMode
if fence_mode != 'natRouted':
raise InvalidParameterException(
"Enable firewall service failed as given network's connection "
"is not routed")
firewall_service = self.resource.Configuration.Features.FirewallService
firewall_rule = E.FirewallRule()
firewall_rule.append(E.IsEnabled(is_enabled))
firewall_rule.append(E.Description(name))
firewall_rule.append(E.Policy(policy))
protocol = E.Protocols()
for proto in protocols:
if proto == 'Any':
protocol.append(E.Any(True))
elif proto == 'Icmp':
protocol.append(E.Icmp(True))
elif proto == 'Tcp':
protocol.append(E.Tcp(True))
elif proto == 'Udp':
protocol.append(E.Udp(True))
firewall_rule.append(protocol)
firewall_rule.append(E.DestinationPortRange(destination_port_range))
firewall_rule.append(E.DestinationIp(destination_ip))
firewall_rule.append(E.SourcePortRange(source_port_range))
firewall_rule.append(E.SourceIp(source_ip))