Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_or_update_bucket_update_bucket_service_error(
object_storage_client, get_existing_bucket_patch
):
get_existing_bucket_patch.return_value = oci.object_storage.models.Bucket()
error_message = "Namespace does not exist."
object_storage_client.update_bucket.side_effect = ServiceError(
400, "NoNameSpaceExists", dict(), error_message
)
object_storage_client.get_bucket.return_value = Bucket()
module = get_module()
try:
oci_bucket.create_or_update_bucket(object_storage_client, module)
except Exception as ex:
assert error_message in ex.args[0]
def test_list_users_failure_service_error(identity_client, list_all_resources_patch):
error_message = "Internal Server Error"
list_all_resources_patch.side_effect = ServiceError(
499, "InternalServerError", dict(), error_message
)
try:
oci_user_facts.list_users(identity_client, get_module(dict()))
except Exception as ex:
assert error_message in ex.args[0]
def test_list_db_systems_service_error(db_client):
error_message = "Internal Server Error"
module = get_module(dict({"db_node_id": "ocid1.dbnode.aaaa"}))
db_client.get_db_system.side_effect = ServiceError(
499, "InternalServerError", dict(), error_message
)
try:
oci_db_node_facts.list_db_nodes(db_client, module)
except Exception as ex:
assert error_message in ex.args[0]
def test_list_db_systems_service_error(db_client):
error_message = "Internal Server Error"
module = get_module(dict({"db_system_id": "ocid1.dbsystem.aaaa"}))
db_client.get_db_system.side_effect = ServiceError(
499, "InternalServerError", dict(), error_message
)
try:
oci_db_system_facts.list_db_systems(db_client, module)
except Exception as ex:
assert error_message in ex.args[0]
def validate_service_error(error, status, code, message):
assert isinstance(error, oci.exceptions.ServiceError)
assert error.status == status
assert error.code == code
assert error.message.startswith(message)
assert error.headers is not None
assert error.request_id.strip()
if error.headers.get('opc-request-id'):
assert len(error.headers.get('opc-request-id')) == 98
else:
assert len(error.request_id) == 32
# Check to string
for info in [str(status), code, "opc-request-id", message]:
assert info in str(error)
def test_list_mount_targets_service_error(file_storage_client):
error_message = "Internal Server Error"
module = get_module(dict({"mount_target_id": "ocid1.mounttarget.aaaa"}))
file_storage_client.get_mount_target.side_effect = ServiceError(
499, "InternalServerError", dict(), error_message
)
try:
oci_mount_target_facts.list_mount_targets(file_storage_client, module)
except Exception as ex:
assert error_message in ex.args[0]
skip_source_dest_check = module.params["skip_source_dest_check"]
if not oci_utils.are_attrs_equal(vnic, module, vnic.attribute_map.keys()):
debug("Need to update VNIC " + str(id))
uvd = UpdateVnicDetails()
uvd.skip_source_dest_check = skip_source_dest_check
uvd.hostname_label = hostname_label
uvd.display_name = name
oci_utils.call_with_backoff(
virtnetwork_client.update_vnic, vnic_id=id, update_vnic_details=uvd
)
result["changed"] = True
resp = oci_utils.call_with_backoff(virtnetwork_client.get_vnic, vnic_id=id)
result["vnic"] = to_dict(resp.data)
except ServiceError as ex:
module.fail_json(msg=ex.message)
module.exit_json(**result)
identity_client.list_customer_secret_keys,
user_id=user_id,
display_name=module.params["display_name"],
)
if csk_id:
return next(
(
to_dict([secret_key])
for secret_key in customer_secret_keys
if secret_key.id == csk_id
),
{},
)
return to_dict(customer_secret_keys)
except ServiceError as ex:
module.fail_json(msg=ex.message)
def _make_retrying_head_object_call(self):
try:
return self.object_storage_client.head_object(
namespace_name=self.kwargs['namespace'],
bucket_name=self.kwargs['bucket_name'],
object_name=self.kwargs['object_name'],
if_match=self.kwargs.get('if_match'),
if_none_match=self.kwargs.get('if_none_match'),
opc_client_request_id=self.kwargs.get('request_id')
)
except oci.exceptions.ServiceError as e:
if e.status == 404:
return None
else:
raise
:param list[str] wait_for_states:
An array of states to wait on. These should be valid values for :py:attr:`~oci.core.models.VolumeBackup.lifecycle_state`
:param dict operation_kwargs:
A dictionary of keyword arguments to pass to :py:func:`~oci.core.BlockstorageClient.delete_volume_backup`
:param dict waiter_kwargs:
A dictionary of keyword arguments to pass to the :py:func:`oci.wait_until` function. For example, you could pass ``max_interval_seconds`` or ``max_interval_seconds``
as dictionary keys to modify how long the waiter function will wait between retries and the maximum amount of time it will wait
"""
initial_get_result = self.client.get_volume_backup(volume_backup_id)
operation_result = None
try:
operation_result = self.client.delete_volume_backup(volume_backup_id, **operation_kwargs)
except oci.exceptions.ServiceError as e:
if e.status == 404:
return WAIT_RESOURCE_NOT_FOUND
else:
raise e
if not wait_for_states:
return operation_result
lowered_wait_for_states = [w.lower() for w in wait_for_states]
try:
waiter_result = oci.wait_until(
self.client,
initial_get_result,
evaluate_response=lambda r: getattr(r.data, 'lifecycle_state') and getattr(r.data, 'lifecycle_state').lower() in lowered_wait_for_states,
succeed_on_not_found=True,