Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
supports_check_mode=False,
required_one_of=[["compartment_id", "volume_group_id"]],
)
if not HAS_OCI_PY_SDK:
module.fail_json(msg="oci python sdk required for this module.")
block_storage_client = oci_utils.create_service_client(module, BlockstorageClient)
volume_group_id = module.params["volume_group_id"]
try:
if volume_group_id is not None:
result = [
to_dict(
oci_utils.call_with_backoff(
block_storage_client.get_volume_group,
volume_group_id=volume_group_id,
).data
)
]
else:
compartment_id = module.params["compartment_id"]
optional_list_method_params = [
"display_name",
"lifecycle_state",
"availability_domain",
]
optional_kwargs = {
param: module.params[param]
for param in optional_list_method_params
def list_api_keys(identity_client, user_id, api_key_id, module):
try:
api_keys = oci_utils.call_with_backoff(
identity_client.list_api_keys, user_id=user_id
).data
if api_key_id:
return next(
(
to_dict([api_key])
for api_key in api_keys
if api_key.key_id == api_key_id
),
{},
)
return to_dict(api_keys)
except ServiceError as ex:
module.fail_json(msg=ex.message)
def get_group(identity_client, group_id, module):
group_dict_list = []
compartment_id = module.params.get("compartment_id")
group = oci_utils.call_with_backoff(
identity_client.get_group, group_id=group_id
).data
append_group_with_associated_users(
identity_client, group, compartment_id, group_dict_list
)
return group_dict_list
if not HAS_OCI_PY_SDK:
module.fail_json(msg="oci python sdk required for this module.")
virtual_network_client = oci_utils.create_service_client(
module, VirtualNetworkClient
)
service_gateway_id = module.params["service_gateway_id"]
compartment_id = module.params["compartment_id"]
result = []
try:
if service_gateway_id is not None:
result = [
to_dict(
oci_utils.call_with_backoff(
virtual_network_client.get_service_gateway,
service_gateway_id=service_gateway_id,
).data
)
]
else:
optional_list_method_params = ["display_name", "lifecycle_state", "vcn_id"]
optional_kwargs = {
param: module.params[param]
for param in optional_list_method_params
if module.params.get(param) is not None
}
result = to_dict(
oci_utils.list_all_resources(
virtual_network_client.list_service_gateways,
compartment_id=compartment_id,
def delete_object(object_storage_client, module):
namespace = module.params["namespace_name"]
bucket = module.params["bucket_name"]
obj = module.params["object_name"]
changed = False
result = dict()
head_obj = head_object(object_storage_client, module)
if head_obj is not None:
try:
oci_utils.call_with_backoff(
object_storage_client.delete_object,
namespace_name=namespace,
bucket_name=bucket,
object_name=obj,
)
result["object"] = dict(head_obj.headers)
changed = True
except ServiceError as ex:
module.fail_json(msg=ex.message)
result["changed"] = changed
return result
result = dict()
result["changed"] = False
try:
volume_attachment = oci_utils.call_with_backoff(
compute_client.get_volume_attachment,
volume_attachment_id=volume_attachment_id,
).data
if volume_attachment.lifecycle_state in ["DETACHING", "DETACHED"]:
result["changed"] = False
result["volume_attachment"] = to_dict(volume_attachment)
else:
oci_utils.call_with_backoff(
compute_client.detach_volume, volume_attachment_id=volume_attachment_id
)
response = oci_utils.call_with_backoff(
compute_client.get_volume_attachment,
volume_attachment_id=volume_attachment_id,
)
result["volume_attachment"] = to_dict(
oci.wait_until(
compute_client, response, "lifecycle_state", "DETACHED"
).data
)
result["changed"] = True
except MaximumWaitTimeExceeded as ex:
module.fail_json(msg=str(ex))
except ServiceError as ex:
module.fail_json(msg=ex.message)
return result
def list_dhcp_options(virtual_network_client, module):
result = dict()
compartment_id = module.params.get("compartment_id")
vcn_id = module.params.get("vcn_id")
dhcp_id = module.params.get("dhcp_id")
try:
if compartment_id and vcn_id:
existing_dhcp_options = oci_utils.list_all_resources(
virtual_network_client.list_dhcp_options,
compartment_id=compartment_id,
vcn_id=vcn_id,
display_name=module.params["display_name"],
)
elif dhcp_id:
response = oci_utils.call_with_backoff(
virtual_network_client.get_dhcp_options, dhcp_id=dhcp_id
)
existing_dhcp_options = [response.data]
except ServiceError as ex:
module.fail_json(msg=ex.message)
result["dhcp_options_list"] = to_dict(existing_dhcp_options)
return result
elif autonomous_data_warehouse_id:
get_logger().debug(
"Listing all Autonomous Data Warehouse Backups for Autonomous Data Warehouse Id%s",
compartment_id,
)
autonomous_data_warehouse_backups = oci_utils.list_all_resources(
db_client.list_autonomous_data_warehouse_backups,
autonomous_data_warehouse_id=autonomous_data_warehouse_id,
display_name=module.params.get("display_name"),
)
elif autonomous_data_warehouse_backup_id:
get_logger().debug(
"Listing Autonomous Data Warehouse %s",
autonomous_data_warehouse_backup_id,
)
response = oci_utils.call_with_backoff(
db_client.get_autonomous_data_warehouse_backup,
autonomous_data_warehouse_backup_id=autonomous_data_warehouse_backup_id,
)
autonomous_data_warehouse_backups = [response.data]
except ServiceError as ex:
get_logger().error(
"Unable to list Autonomous Data Warehouse due to %s", ex.message
)
module.fail_json(msg=ex.message)
result["autonomous_data_warehouse_backups"] = to_dict(
autonomous_data_warehouse_backups
)
return result
def get_tenancy_details(identity_client, module):
try:
tenancy_ocid = module.params["tenancy_id"]
tenancy = oci_utils.call_with_backoff(
identity_client.get_tenancy, tenancy_id=tenancy_ocid
).data
except ServiceError as ex:
module.fail_json(msg=ex.message)
return to_dict(tenancy)
mutually_exclusive=[["compartment_id", "volume_id"]],
required_one_of=[["compartment_id", "volume_id"]],
)
if not HAS_OCI_PY_SDK:
module.fail_json(msg="oci python sdk required for this module.")
block_storage_client = oci_utils.create_service_client(module, BlockstorageClient)
volume_id = module.params["volume_id"]
try:
if volume_id is not None:
result = [
to_dict(
oci_utils.call_with_backoff(
block_storage_client.get_volume, volume_id=volume_id
).data
)
]
else:
compartment_id = module.params["compartment_id"]
availability_domain = module.params["availability_domain"]
if availability_domain is not None:
result = to_dict(
oci_utils.list_all_resources(
block_storage_client.list_volumes,
compartment_id=compartment_id,
availability_domain=availability_domain,
display_name=module.params["display_name"],
)