Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_or_update_backend_set_client_error(
lb_client, check_and_create_resource_patch, get_existing_resource_patch
):
module = get_module(dict())
error_message = "Work Request Failed"
check_and_create_resource_patch.side_effect = ClientError(
Exception("Work Request Failed")
)
get_existing_resource_patch.return_value = None
try:
oci_load_balancer_backend_set.create_or_update_backend_set(lb_client, module)
except Exception as ex:
assert error_message in ex.args[0]
def get_group_ids_from_group_names(identity_client, group_names, module):
group_ids = []
compartment_id = module.params.get("compartment_id")
all_existing_groups = oci_utils.list_all_resources(
identity_client.list_groups, compartment_id=compartment_id
)
group_id_dict = {
group_name: group.id
for group in all_existing_groups
for group_name in group_names
if group.name == group_name
}
try:
group_ids = [group_id_dict[group_name] for group_name in group_names]
except KeyError as ex:
raise ClientError("Group " + ex.args[0] + " does not exists")
return group_ids
),
SUFFIX_MATCH=HashedPathMatchType(
match_type=PathMatchType.MATCH_TYPE_SUFFIX_MATCH
),
)
result_path_routes = list()
HashedPathRoute = oci_utils.generate_subclass(PathRoute)
for path_route_entry in path_routes_list:
path_route = HashedPathRoute()
backend_set_name = path_route_entry.get("backend_set_name", None)
path = path_route_entry.get("path", None)
path_match_type = path_match_type.get(
path_route_entry.get("path_match_type", None).get("match_type", None)
)
if backend_set_name is None or path is None or path_match_type is None:
raise ClientError(
Exception(
"backend_set_name, path and path_match_type are mandatory attributes for"
" back_ends and can not be empty."
)
)
path_route.backend_set_name = backend_set_name
path_route.path = path
path_route.path_match_type = path_match_type
result_path_routes.append(path_route)
return result_path_routes
result = oci_utils.check_and_create_resource(
resource_type="snapshot",
create_fn=create_snapshot,
kwargs_create={
"file_storage_client": file_storage_client,
"module": module,
},
list_fn=file_storage_client.list_snapshots,
kwargs_list={"file_system_id": module.params.get("file_system_id")},
module=module,
model=CreateSnapshotDetails(),
)
except ServiceError as ex:
get_logger().error("Unable to create/update Snapshot due to: %s", ex.message)
module.fail_json(msg=ex.message)
except ClientError as ex:
get_logger().error("Unable to create/update Snapshot due to: %s", str(ex))
module.fail_json(msg=str(ex))
return result
def get_export_options(export_options):
if export_options is None:
return None
HashedClientOptions = oci_utils.generate_subclass(ClientOptions)
result_client_options = []
for export_option_entry in export_options:
client_options = HashedClientOptions()
if export_option_entry.get("source") is None:
raise ClientError(
"Export Options attribute source must contain a valid value"
)
client_options.source = export_option_entry.get("source")
client_options.require_privileged_source_port = export_option_entry.get(
"require_privileged_source_port", True
)
client_options.access = export_option_entry.get("access", "READ_ONLY")
client_options.identity_squash = export_option_entry.get(
"identity_squash", "ROOT"
)
client_options.anonymous_uid = export_option_entry.get("anonymous_uid", 65534)
client_options.anonymous_gid = export_option_entry.get("anonymous_gid", 65534)
result_client_options.append(client_options)
return result_client_options
def is_patch_already_applied(
patch_getter_method, current_version, input_version_dict, **kwargs
):
if input_version_dict is None:
return True
patch_id = input_version_dict.get("patch_id")
kwargs.update(patch_id=patch_id)
existing_patch = oci_utils.call_with_backoff(patch_getter_method, **kwargs).data
if existing_patch is None:
raise ClientError(Exception("No Patch with id " + patch_id + " is available"))
if existing_patch.version != current_version:
return False
return True
else:
result = oci_utils.check_and_create_resource(
resource_type=RESOURCE_NAME,
create_fn=create_smtp_credential,
kwargs_create={"identity_client": identity_client, "module": module},
list_fn=identity_client.list_smtp_credentials,
kwargs_list={"user_id": module.params.get("user_id")},
module=module,
model=CreateSmtpCredentialDetails(),
)
except ServiceError as ex:
get_logger().error(
"Unable to create/update SMTP credential due to: %s", ex.message
)
module.fail_json(msg=ex.message)
except ClientError as ex:
get_logger().error(
"Unable to create/update SMTP credential due to: %s", str(ex)
)
module.fail_json(msg=str(ex))
return result
def update_dhcp_options(virtual_network_client, existing_dhcp_options, module):
if existing_dhcp_options is None:
raise ClientError(
Exception(
"No Dhcp Options with id "
+ module.params.get("dhcp_id")
+ " is found for update"
)
)
result = dict(dhcp_options=to_dict(existing_dhcp_options), changed=False)
name_tag_changed = False
options_changed = False
input_options = module.params.get("options")
update_dhcp_details = UpdateDhcpDetails()
existing_options = existing_dhcp_options.options
attributes_to_compare = ["display_name", "freeform_tags", "defined_tags"]
for attribute in attributes_to_compare:
name_tag_changed = oci_utils.check_and_update_attributes(
update_dhcp_details,
def update_user(identity_client, existing_user, module):
if existing_user is None:
raise ClientError(
Exception(
"No User with id "
+ module.params.get("user_id")
+ " is found for update"
)
)
groups = module.params["user_groups"]
reset_password = module.params["create_or_reset_ui_password"]
blocked = module.params["blocked"]
ui_password = None
group_changed = modify_group_memberships(
identity_client, groups, existing_user, module
)
existing_user, state_changed = unblock_user(identity_client, blocked, existing_user)
ui_password, password_changed = reset_ui_password(
def get_user_ids_from_user_names(identity_client, user_names, module):
user_ids = []
users = oci_utils.list_all_resources(
identity_client.list_users, compartment_id=module.params.get("compartment_id")
)
user_id_dict = {
user_name: user.id
for user in users
for user_name in user_names
if user.name.strip() == user_name.strip()
}
try:
user_ids = [user_id_dict[user_name] for user_name in user_names]
except KeyError as ex:
raise ClientError("User " + ex.args[0] + " does not exists")
return user_ids