Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#
# Additionally, each secondary private IP address can have:
# (E) A hostname label (for DNS). This is the same as (D) but is just called
# out for completeness
# (F) An assigned RESERVED public IP address
# This handles (C)
if vnic.public_ip:
instance_info['public_ips'].append(vnic.public_ip)
if vnic.subnet_id not in subnet_info:
subnet_info[vnic.subnet_id] = get_subnet_info(virtual_network_client, vnic.subnet_id)
# Listing accounts for both primary and secondary private IPs so listing them
# all allows us to account for (A) and (B)
private_ips_for_vnic = oci.pagination.list_call_get_all_results(
virtual_network_client.list_private_ips,
vnic_id=vnic.id
).data
for private_ip in private_ips_for_vnic:
instance_info['private_ips'].append(private_ip.ip_address)
subnet = subnet_info[private_ip.subnet_id] # Could also use vnic_id rather than private_ip.subnet_id
vcn = vcn_info[subnet.vcn_id]
# This accounts for (D) / (E) - the hostname labels for private IPs. Additionally, only
# try and form a FQDN if we have DNS labels for the subnet and the VCN. See:
# https://docs.cloud.oracle.com/Content/Network/Concepts/dns.htm
if subnet.dns_label and vcn.dns_label and private_ip.hostname_label:
instance_info['dns_info'].append(
'{}.{}.{}.oraclevcn.com'.format(private_ip.hostname_label, subnet.dns_label, vcn.dns_label)
def identity_read_compartments(identity, tenancy):
compartments = []
print("Loading Compartments...")
try:
# read all compartments to variable
all_compartments = []
try:
all_compartments = oci.pagination.list_call_get_all_results(
identity.list_compartments,
tenancy.id,
compartment_id_in_subtree=True
).data
except oci.exceptions.ServiceError:
raise
###################################################
# Build Compartments - return nested compartment list
###################################################
def build_compartments_nested(identity_client, cid, path):
try:
compartment_list = [item for item in all_compartments if str(item.compartment_id) == str(cid)]
path='/files'
),
retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY
)
# We can list exports. This operation also takes optional filters so we can narrow this list down by file system
# or export set (mount target).
# Since listing is a paginated operation, we can use the functionality in oci.pagination
all_exports_by_file_system = oci.pagination.list_call_get_all_results(
file_storage_client.list_exports,
compartment_id=compartment_id,
file_system_id=file_system.id
)
print('All exports by file system:\n{}'.format(all_exports_by_file_system.data))
print('=============================\n')
all_exports_by_export_set = oci.pagination.list_call_get_all_results(
file_storage_client.list_exports,
compartment_id=compartment_id,
export_set_id=mount_target.export_set_id
)
print('All exports by export set:\n{}'.format(all_exports_by_export_set.data))
print('=============================\n')
# We can also retrieve information on an export set itself
get_export_set_response = file_storage_client.get_export_set(mount_target.export_set_id)
print('Export set on mount target:\n{}'.format(get_export_set_response.data))
print('=============================\n')
# Exports have a lifecycle state, so we can wait on it to become available. Also, if we no longer need an export
# then we can delete it.
#
# When deleting, since the resource may be gone, we set succeed_on_not_found on the waiter so that we consider
"""
Find a unique Vcn by name.
:param network_client: OCI VirtualNetworkClient client
:type network_client: oci.core.VirtualNetworkClient
:param compartment_id: The OCID of the compartment which owns the Vcn.
:type compartment_id: str
:param display_name: The display name of the Vcn.
:type display_name: str
:return: The Vcn
:rtype: core_models.Vcn
"""
result = pagination.list_call_get_all_results(
network_client.list_vcns,
compartment_id,
display_name=display_name
)
for vcn in result.data:
if display_name == vcn.display_name:
return vcn
lambda: list_call_get_all_results(self._client.list_objects, namespace, bucket_name))
return response.data
# Updating an existing monitor:
# Note: You only need to specify any properties you wish to change.
# It returns the updated monitor.
ping_monitor = healthchecks_client.update_ping_monitor(
monitor_id=ping_monitor.id,
update_ping_monitor_details=oci.healthchecks.models.UpdatePingMonitorDetails(
targets=["example.com", "other.example.com"],
is_enabled=True
)
).data
print('Display Name: {}, isEnabled: {}'.format(ping_monitor.display_name, ping_monitor.is_enabled))
# Retrieving monitor results:
# There's a pagination helper to get all the pages for you.
ping_monitor_results = oci.pagination.list_call_get_all_results(healthchecks_client.list_ping_probe_results, ping_monitor.id)
for monitor_result in ping_monitor_results.data:
print('Result: {}, Start Time: {}, isHealthy: {}'.format(monitor_result.target, format_time(monitor_result.start_time), monitor_result.is_healthy))
# To change the compartment:
healthchecks_client.change_ping_monitor_compartment(
monitor_id=ping_monitor.id,
change_ping_monitor_compartment_details=oci.healthchecks.models.ChangePingMonitorCompartmentDetails(
compartment_id="NEW_COMPARTMENT_ID"
)
)
# The delete will have no return if successful
healthchecks_client.delete_ping_monitor(monitor_id=ping_monitor.id)
:param network_client: OCI VirtualNetworkClient client
:type network_client: oci.core.VirtualNetworkClient
:param compartment_id: The OCID of the compartment which owns the RouteTable.
:type compartment_id: str
:param vcn_id: The OCID of the Vcn which will own the RouteTable.
:type vcn_id: str
:param display_name: The display name of the RouteTable.
:type display_name: str
:return: The RouteTable.
:rtype: core_models.RouteTable
"""
result = pagination.list_call_get_all_results(
network_client.list_route_tables,
compartment_id,
vcn_id,
display_name=display_name
)
for rt in result.data:
if display_name == rt.display_name:
return rt
)
print('Attached paravirtualized volume')
print('')
# We can wait until the volume have attached
oci.wait_until(
compute_client,
compute_client.get_volume_attachment(paravirtualized_volume_attachment_response.data.id),
'lifecycle_state',
'ATTACHED'
)
# Listing volume attachments is a paginated operation, so we can use the functions in the
# oci.pagination module to get them all. We can also supply keyword argument filters - in
# this case we are only interested in the volume attachments on our instance
volume_attachments = oci.pagination.list_call_get_all_results(
compute_client.list_volume_attachments,
compartment_id,
instance_id=instance.id
).data
# Note that each element could be a different attachment_type, denoting the different types of volume attachments
# there are.
#
# These are all subclasses of oci.core.models.VolumeAttachment
print('Volume attachments:\n{}'.format(volume_attachments))
print('')
print('')
detach_volume(compute_client, paravirtualized_volume_attachment_response.data)
print('Detached paravirtualized volume')
print('')
def DeleteLoadBalancers(config, compartment):
AllItems = []
object = oci.load_balancer.LoadBalancerClient(config)
print("Getting all Load Balancer objects")
items = oci.pagination.list_call_get_all_results(object.list_load_balancers, compartment_id=compartment.id).data
for item in items:
if (item.lifecycle_state != "DELETED"):
AllItems.append(item)
print("- {} - {}".format(item.display_name, item.lifecycle_state))
itemsPresent = True
while itemsPresent:
count = 0
for item in AllItems:
try:
itemstatus = object.get_load_balancer(load_balancer_id=item.id).data
if itemstatus.lifecycle_state != "DELETED":
if itemstatus.lifecycle_state != "DELETING":
try:
print("Deleting: {}".format(itemstatus.display_name))
:raises MaximumWaitTimeExceededError: When maximum wait time is exceeded while invoking target_fn
"""
filter_params = None
try:
response = oci.pagination.list_call_get_all_results(target_fn, **kwargs)
except ValueError as ex:
if "unknown kwargs" in str(ex):
if "display_name" in kwargs:
if kwargs["display_name"]:
filter_params = {"display_name": kwargs["display_name"]}
del kwargs["display_name"]
elif "name" in kwargs:
if kwargs["name"]:
filter_params = {"name": kwargs["name"]}
del kwargs["name"]
response = oci.pagination.list_call_get_all_results(target_fn, **kwargs)
# If the underlying SDK Service list* method doesn't support filtering by name or display_name, filter the resources
# and return the matching list of resources
return filter_response_data(response.data, filter_params)