Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_events(self, limit=None, start_date=None, filter_string=None):
"""Get a list of Events
:param limit: maximum number of events to return
:type limit: int or None
:param filter_string: string to filter events
:type filter_string: str or None
:rtype: list of Event
"""
params = {
'limit': limit,
'startDate': start_date,
'filter': filter_string
}
response = ApiClient.get_path(self, '/', params=params)
return Utils.deserialize(response.text, Event)
def aws_account_info(event, context):
# We need access to the entire JSON response from the Okta APIs, so we need to
# use the low-level ApiClient instead of UsersClient and AppInstanceClient
usersClient = ApiClient(os.environ['OKTA_ORG_URL'],
os.environ['OKTA_API_KEY'],
pathname='/api/v1/users')
appClient = ApiClient(os.environ['OKTA_ORG_URL'],
os.environ['OKTA_API_KEY'],
pathname='/api/v1/apps')
# Get User information
username = event['requestContext']['authorizer']['principalId']
try:
result = usersClient.get_path('/{0}'.format(username))
user = result.json()
except OktaError as e:
if e.error_code == 'E0000007':
statusCode = 404
else:
statusCode = 500
def _get_aws_account_info(okta_org_url, okta_api_key, username):
""" Call the Okta User API and process the results to return
just the information we need for gimme_aws_creds"""
# We need access to the entire JSON response from the Okta APIs, so we need to
# use the low-level ApiClient instead of UsersClient and AppInstanceClient
users_client = ApiClient(okta_org_url, okta_api_key, pathname='/api/v1/users')
# Get User information
try:
result = users_client.get_path('/{0}'.format(username))
user = result.json()
except OktaError as e:
if e.error_code == 'E0000007':
raise errors.GimmeAWSCredsError("Error: " + username + " was not found!")
else:
raise errors.GimmeAWSCredsError("Error: " + e.error_summary)
try:
# Get first page of results
result = users_client.get_path('/{0}/appLinks'.format(user['id']))
final_result = result.json()
from okta.framework.ApiClient import ApiClient
from okta.framework.Utils import Utils
from okta.models.factor.OrgAuthFactor import OrgAuthFactor
class FactorsAdminClient(ApiClient):
def __init__(self, *args, **kwargs):
kwargs['pathname'] = '/api/v1/org'
ApiClient.__init__(self, *args, **kwargs)
def get_org_factors(self, filter_string=None):
"""Get a list of OrgAuthFactors
:param filter_string: string to filter factors
:type filter_string: str or None
:rtype: list of OrgAuthFactor
"""
params = {
'filter': filter_string
}
response = ApiClient.get_path(self, '/factors', params=params)
return Utils.deserialize(response.text, OrgAuthFactor)
def _create_api_client(okta_org, path_name):
"""
Create Okta ApiClient
:param okta_org: Okta organization name
:param path_name: API Path
:return: Instance of ApiClient
"""
api_client = ApiClient(
base_url=f"https://{okta_org}.okta.com/",
pathname=path_name,
api_token=OKTA_API_KEY,
)
return api_client
def enroll_factor(self, user_id, factor_enroll_request, update_phone=None):
"""Enroll a user into a factor
:param user_id: target user id
:type user_id: str
:param factor_enroll_request: the details to enroll the user
:type factor_enroll_request: FactorEnrollRequest
:param update_phone: whether to update the user's phone during enrollment
:type update_phone: bool
:rtype: Factor
"""
params = {
'updatePhone': update_phone
}
response = ApiClient.post_path(self, '/{0}/factors'.format(user_id), factor_enroll_request, params=params)
return Utils.deserialize(response.text, Factor)
from okta.framework.ApiClient import ApiClient
from okta.framework.Utils import Utils
from okta.models.auth.AuthResult import AuthResult
class AuthClient(ApiClient):
def __init__(self, *args, **kwargs):
kwargs['pathname'] = '/api/v1/authn'
ApiClient.__init__(self, *args, **kwargs)
def authenticate(self, username, password,
relay_state=None, response_type=None, force_mfa=None, context=None):
"""Begin the authentication process with a username and password
:param username: user's username
:type username: str
:param password: user's password
:type password: str
:param relay_state: data that will persist for the lifetime of the authentication or recovery token
:type relay_state: str or None
:param response_type: the type of session to return (session_token or session_token_url usually)
:type response_type: str
def create_user(self, user, activate=None):
"""Create a user
:param user: the data to create a user
:type user: User
:param activate: whether to activate the user
:type activate: bool
:rtype: User
"""
if activate is None:
response = ApiClient.post_path(self, '/', user)
else:
params = {
'activate': activate
}
response = ApiClient.post_path(self, '/', user, params=params)
return Utils.deserialize(response.text, User)