Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __check_response(self, resp, attempts=1):
if resp is None:
raise ValueError("A response wasn't received")
if 200 <= resp.status_code < 300:
return True
# If we made it this far, we need to handle an exception
if attempts >= self.max_attempts or resp.status_code != 429:
raise OktaError(json.loads(resp.text))
# Assume we're going to retry with exponential backoff
time.sleep(2 ** (attempts - 1))
return False
# Get a list of apps for this user and include extended info about the user
params = {
'limit': 200,
'filter': 'user.id+eq+%22' + user['id'] + '%22&expand=user%2F' + user['id']
}
try:
# Get first page of results
result = usersClient.get_path('/{0}/appLinks'.format(user['id']))
final_result = result.json()
# Loop through other pages
while 'next' in result.links:
result = appClient.get(result.links['next']['url'])
final_result = final_result + result.json()
except OktaError as e:
if e.error_code == 'E0000007':
statusCode = 404
else:
statusCode = 500
return {
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin' : '*',
'Access-Control-Allow-Credentials' : True
},
"statusCode": statusCode,
"body": e.error_summary
}
# Loop through the list of apps and filter it down to just the info we need
appList = []
:return: Array of group id
"""
app_groups = []
next_url = None
while True:
try:
if next_url:
paged_response = api_client.get(next_url)
else:
params = {
'limit': 500,
}
paged_response = api_client.get_path(f'/{app_id}/groups', params)
except OktaError as okta_error:
logger.debug(f"Got error while going through list application assigned groups {okta_error}")
break
app_groups.append(paged_response.text)
if not is_last_page(paged_response):
next_url = paged_response.links.get("next").get("url")
else:
break
return app_groups
def _get_aws_account_info(okta_org_url, okta_api_key, username):
""" Call the Okta User API and process the results to return
just the information we need for gimme_aws_creds"""
# We need access to the entire JSON response from the Okta APIs, so we need to
# use the low-level ApiClient instead of UsersClient and AppInstanceClient
users_client = ApiClient(okta_org_url, okta_api_key, pathname='/api/v1/users')
# Get User information
try:
result = users_client.get_path('/{0}'.format(username))
user = result.json()
except OktaError as e:
if e.error_code == 'E0000007':
raise errors.GimmeAWSCredsError("Error: " + username + " was not found!")
else:
raise errors.GimmeAWSCredsError("Error: " + e.error_summary)
try:
# Get first page of results
result = users_client.get_path('/{0}/appLinks'.format(user['id']))
final_result = result.json()
# Loop through other pages
while 'next' in result.links:
result = users_client.get(result.links['next']['url'])
final_result = final_result + result.json()
ui.default.info("done\n")
except OktaError as e:
organization.create_okta_organization(neo4j_session, config.okta_org_id, config.update_tag)
users.sync_okta_users(neo4j_session, config.okta_org_id, config.update_tag, config.okta_api_key, state)
groups.sync_okta_groups(neo4j_session, config.okta_org_id, config.update_tag, config.okta_api_key, state)
applications.sync_okta_applications(neo4j_session, config.okta_org_id, config.update_tag, config.okta_api_key)
factors.sync_users_factors(neo4j_session, config.okta_org_id, config.update_tag, config.okta_api_key, state)
origins.sync_trusted_origins(neo4j_session, config.okta_org_id, config.update_tag, config.okta_api_key)
awssaml.sync_okta_aws_saml(neo4j_session, config.okta_saml_role_regex, config.update_tag)
# need creds with permission
# soft fail as some won't be able to get such high priv token
# when we get the E0000006 error
# see https://developer.okta.com/docs/reference/error-codes/
try:
roles.sync_roles(neo4j_session, config.okta_org_id, config.update_tag, config.okta_api_key, state)
except OktaError as okta_error:
logger.warning(f"Unable to pull admin roles got {okta_error}")
# Getting roles requires super admin which most won't be able to get easily
if okta_error.error_code == "E0000006":
logger.warning("Unable to sync admin roles - api token needs admin rights to pull admin roles data")
_cleanup_okta_organizations(neo4j_session, common_job_parameters)
def find_group(self, group):
"""
:type group: str
:rtype UserGroup
"""
group = group.strip()
options = self.options
group_filter_format = options['group_filter_format']
try:
results = self.groups_client.get_groups(query=group_filter_format.format(group=group))
except KeyError as e:
raise AssertionException("Bad format key in group query (%s): %s" % (group_filter_format, e))
except OktaError as e:
self.logger.warning("Unable to query group")
raise AssertionException("Okta error querying for group: %s" % e)
if results is None:
self.logger.warning("No group found for: %s", group)
else:
for result in results:
if result.profile.name == group:
return result
return None
def _get_factor_for_user_id(factor_client, user_id):
"""
Get factor for user from the Okta server
:param factor_client: factor client
:param user_id: user to fetch the data from
:return: Array of user factor information
"""
try:
factor_results = factor_client.get_lifecycle_factors(user_id)
except OktaError as okta_error:
logger.debug(
f"Unable to get factor for user id {user_id} with "
f"error code {okta_error.error_code} with description {okta_error.error_summary}",
)
return []
return factor_results
:return: Array or group membership information
"""
member_list = []
next_url = None
while True:
try:
# https://developer.okta.com/docs/reference/api/groups/#list-group-members
if next_url:
paged_response = api_client.get(next_url)
else:
params = {
'limit': 1000,
}
paged_response = api_client.get_path(f'/{group_id}/users', params)
except OktaError as okta_error:
logger.debug(f"Got error while going through list group member {okta_error}")
break
member_list.append(paged_response.text)
if not is_last_page(paged_response):
next_url = paged_response.links.get("next").get("url")
else:
break
return member_list