Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def validate_service_error(result, error_message=None, debug=False):
try:
assert result.status != 200
if debug:
assert isinstance(result.exception, oci.exceptions.ServiceError)
if error_message:
assert error_message in str(result.exception)
else:
if error_message:
assert error_message in result.message
except AssertionError:
# TODO better inspection for failing tests
print(str(result))
raise
#
# OCI_RESOURCE_PRINCIPAL_VERSION="2.2"
# OCI_RESOURCE_PRINCIPAL_RPST
# OCI_RESOURCE_PRINCIPAL_PRIVATE_PEM
# OCI_RESOURCE_PRINCIPAL_PRIVATE_PEM_PASSPHRASE
# OCI_RESOURCE_PRINCIPAL_REGION
#
# OCI_RESOURCE_PRINCIPAL_VERSION="1.1"
# OCI_RESOURCE_PRINCIPAL_RPT_ENDPOINT
# OCI_RESOURCE_PRINCIPAL_RPST_ENDPOINT
signer = oci.auth.signers.resource_principals_signer.get_resource_principals_signer()
kwargs['signer'] = signer
try:
config.validate_config(client_config, **kwargs)
except exceptions.InvalidConfig as bad_config:
table = render_config_errors(bad_config)
template = "ERROR: The config file at {config_file} is invalid:\n\n{errors}"
sys.exit(template.format(
config_file=ctx.obj['config_file'],
errors=table
))
return ConfigAndSigner(config=client_config, signer=signer, uses_instance_principals_auth=instance_principal_auth)
Unique Integration Instance identifier.
:param list[str] wait_for_states:
An array of states to wait on. These should be valid values for :py:attr:`~oci.integration.models.WorkRequest.status`
:param dict operation_kwargs:
A dictionary of keyword arguments to pass to :py:func:`~oci.integration.IntegrationInstanceClient.delete_integration_instance`
:param dict waiter_kwargs:
A dictionary of keyword arguments to pass to the :py:func:`oci.wait_until` function. For example, you could pass ``max_interval_seconds`` or ``max_interval_seconds``
as dictionary keys to modify how long the waiter function will wait between retries and the maximum amount of time it will wait
"""
operation_result = None
try:
operation_result = self.client.delete_integration_instance(integration_instance_id, **operation_kwargs)
except oci.exceptions.ServiceError as e:
if e.status == 404:
return WAIT_RESOURCE_NOT_FOUND
else:
raise e
if not wait_for_states:
return operation_result
lowered_wait_for_states = [w.lower() for w in wait_for_states]
wait_for_resource_id = operation_result.headers['opc-work-request-id']
try:
waiter_result = oci.wait_until(
self.client,
self.client.get_work_request(wait_for_resource_id),
evaluate_response=lambda r: getattr(r.data, 'status') and getattr(r.data, 'status').lower() in lowered_wait_for_states,
def main():
print("Adding IAM policy for WAF Access...")
create_soc_group()
waf_policy = CreatePolicyDetails()
waf_policy.compartment_id = compartment_id
waf_policy.name = 'waas_iam_policy'
waf_policy.description = "Allows SOC team to appropriately access WAF, audit and metric services"
waf_policy.statements = statements
try:
identity.create_policy(waf_policy)
except oci.exceptions.ServiceError as e:
if e.status == 409:
print("Policy exists, do nothing")
return
else:
print(e)
return
print("Added the policy successfully")
def identity_read_compartments(identity, tenancy):
compartments = []
print("Loading Compartments...")
try:
# read all compartments to variable
all_compartments = []
try:
all_compartments = oci.pagination.list_call_get_all_results(
identity.list_compartments,
tenancy.id,
compartment_id_in_subtree=True
).data
except oci.exceptions.ServiceError:
raise
###################################################
# Build Compartments - return nested compartment list
###################################################
def build_compartments_nested(identity_client, cid, path):
try:
compartment_list = [item for item in all_compartments if str(item.compartment_id) == str(cid)]
if path != "":
path = path + " / "
for c in compartment_list:
if c.lifecycle_state == oci.identity.models.Compartment.LIFECYCLE_STATE_ACTIVE:
cvalue = {'id': str(c.id), 'name': str(c.name), 'path': path + str(c.name)}
lowered_wait_for_states = [w.lower() for w in wait_for_states]
wait_for_resource_id = operation_result.data.id
try:
waiter_result = oci.wait_until(
self.client,
self.client.get_transfer_job(wait_for_resource_id),
evaluate_response=lambda r: getattr(r.data, 'lifecycle_state') and getattr(r.data, 'lifecycle_state').lower() in lowered_wait_for_states,
**waiter_kwargs
)
result_to_return = waiter_result
return result_to_return
except Exception as e:
raise oci.exceptions.CompositeOperationError(partial_results=[operation_result], cause=e)
Unique Digital Assistant instance identifier.
:param list[str] wait_for_states:
An array of states to wait on. These should be valid values for :py:attr:`~oci.oda.models.WorkRequest.status`
:param dict operation_kwargs:
A dictionary of keyword arguments to pass to :py:func:`~oci.oda.OdaClient.delete_oda_instance`
:param dict waiter_kwargs:
A dictionary of keyword arguments to pass to the :py:func:`oci.wait_until` function. For example, you could pass ``max_interval_seconds`` or ``max_interval_seconds``
as dictionary keys to modify how long the waiter function will wait between retries and the maximum amount of time it will wait
"""
operation_result = None
try:
operation_result = self.client.delete_oda_instance(oda_instance_id, **operation_kwargs)
except oci.exceptions.ServiceError as e:
if e.status == 404:
return WAIT_RESOURCE_NOT_FOUND
else:
raise e
if not wait_for_states:
return operation_result
lowered_wait_for_states = [w.lower() for w in wait_for_states]
wait_for_resource_id = operation_result.headers['opc-work-request-id']
try:
waiter_result = oci.wait_until(
self.client,
self.client.get_work_request(wait_for_resource_id),
evaluate_response=lambda r: getattr(r.data, 'status') and getattr(r.data, 'status').lower() in lowered_wait_for_states,
:param list[str] wait_for_states:
An array of states to wait on. These should be valid values for :py:attr:`~oci.database.models.AutonomousDatabase.lifecycle_state`
:param dict operation_kwargs:
A dictionary of keyword arguments to pass to :py:func:`~oci.database.DatabaseClient.delete_autonomous_database`
:param dict waiter_kwargs:
A dictionary of keyword arguments to pass to the :py:func:`oci.wait_until` function. For example, you could pass ``max_interval_seconds`` or ``max_interval_seconds``
as dictionary keys to modify how long the waiter function will wait between retries and the maximum amount of time it will wait
"""
initial_get_result = self.client.get_autonomous_database(autonomous_database_id)
operation_result = None
try:
operation_result = self.client.delete_autonomous_database(autonomous_database_id, **operation_kwargs)
except oci.exceptions.ServiceError as e:
if e.status == 404:
return WAIT_RESOURCE_NOT_FOUND
else:
raise e
if not wait_for_states:
return operation_result
lowered_wait_for_states = [w.lower() for w in wait_for_states]
try:
waiter_result = oci.wait_until(
self.client,
initial_get_result,
evaluate_response=lambda r: getattr(r.data, 'lifecycle_state') and getattr(r.data, 'lifecycle_state').lower() in lowered_wait_for_states,
succeed_on_not_found=True,
if not private_ip_to_public_ip:
load_reserved_public_ips_for_all_compartments(virtual_network_client, identity_client, config)
if private_ip.id in private_ip_to_public_ip: # This is (F1)
print('Found public IP mapping for private IP in list. Public IP details: {}'.format(private_ip_to_public_ip[private_ip.id]))
try:
public_ip = virtual_network_client.get_public_ip_by_private_ip_id(
oci.core.models.GetPublicIpByPrivateIpIdDetails(
private_ip_id=private_ip.id
)
).data
if public_ip.ip_address not in instance_info['public_ips']:
instance_info['public_ips'].append(public_ip.ip_address)
except oci.exceptions.ServiceError as e:
if e.status == 404:
print('No public IP mapping found for private IP: {} ({})'.format(private_ip.id, private_ip.ip_address))
else:
print('Unexpected error when retrieving public IPs: {}'.format(str(e)))
return instance_info