Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
except KeyError:
logging.error("Okta auth failed: "
"Could not retrieve list of MFA methods")
logging.debug("Error parsing response: {}".format(
json.dumps(primary_auth)))
sys.exit(1)
mfa_setup_statuses = [
d['status'] for d in mfa_options if 'status' in d and d['status'] != "ACTIVE"]
if len(mfa_setup_statuses) == len(mfa_options):
logging.error("MFA not configured. "
"Please enable MFA on your account and try again.")
sys.exit(2)
preset_mfa = settings.mfa_method
available_mfas = [d['factorType'] for d in mfa_options]
if preset_mfa is not None and preset_mfa in available_mfas:
mfa_index = available_mfas.index(settings.mfa_method)
else:
logging.warning(
"No MFA provided or provided MFA does not exist. [{}]".format(
settings.mfa_method))
mfa_index = helpers.select_preferred_mfa_index(mfa_options)
# time to challenge the mfa option
selected_mfa_option = mfa_options[mfa_index]
logging.debug("Selected MFA is [{}]".format(selected_mfa_option))
mfa_challenge_url = selected_mfa_option['_links']['verify']['href']
payload = helpers.prepare_payload(stateToken=primary_auth['stateToken'],
def process_ini_file(file, profile):
"""Process options from a ConfigParser ini file.
:param file: filename
:param profile: profile to read
:return: None
"""
config = configparser.ConfigParser(default_section=settings.okta_profile)
if config.read(file) == []:
return
try:
for (key, val) in config.items(profile):
if hasattr(settings, key):
logging.debug(
'Set option {}={} from ini file'.format(key, val))
setattr(settings, key, val)
except configparser.NoSectionError:
logging.error('Profile \'{}\' does not exist.'.format(profile))
sys.exit(2)
def create_directory(dir_name):
"""Create directories on the local machine."""
if os.path.isdir(dir_name) is False:
try:
os.mkdir(dir_name)
except OSError as error:
logging.error("Cannot continue creating directory \'{}\': {}".format(
settings.config_dir, error.strerror))
sys.exit(1)
def duo_mfa_challenge(duo_info, mfa_option, passcode):
"""Poke Duo to challenge the selected factor.
After the user has selected their device and factor of choice,
tell Duo to send a challenge. This is where the end user will receive
a phone call or push.
:param duo_info: dict of parameters for Duo
:param mfa_option: the user's selected second factor.
:return txid: Duo transaction ID used to track this auth attempt.
"""
url = "https://{}/frame/prompt".format(duo_info["host"])
device = mfa_option["device"].split(" - ")[0]
mfa_data = helpers.prepare_payload(factor=mfa_option["factor"],
device=device,
sid=duo_info["sid"],
out_of_date=False,
days_out_of_date=0,
days_to_block=None)
mfa_data["async"] = True # async is a reserved keyword
if passcode:
mfa_data["passcode"] = passcode
mfa_challenge = duo_api_post(url, payload=mfa_data)
txid = parse_duo_mfa_challenge(mfa_challenge)
logging.debug("Sent MFA Challenge and obtained Duo transaction ID.")
return txid
d['status'] for d in mfa_options if 'status' in d and d['status'] != "ACTIVE"]
if len(mfa_setup_statuses) == len(mfa_options):
logging.error("MFA not configured. "
"Please enable MFA on your account and try again.")
sys.exit(2)
preset_mfa = settings.mfa_method
available_mfas = [d['factorType'] for d in mfa_options]
if preset_mfa is not None and preset_mfa in available_mfas:
mfa_index = available_mfas.index(settings.mfa_method)
else:
logging.warning(
"No MFA provided or provided MFA does not exist. [{}]".format(
settings.mfa_method))
mfa_index = helpers.select_preferred_mfa_index(mfa_options)
# time to challenge the mfa option
selected_mfa_option = mfa_options[mfa_index]
logging.debug("Selected MFA is [{}]".format(selected_mfa_option))
mfa_challenge_url = selected_mfa_option['_links']['verify']['href']
payload = helpers.prepare_payload(stateToken=primary_auth['stateToken'],
factorType=selected_mfa_option['factorType'],
provider=selected_mfa_option['provider'],
profile=selected_mfa_option['profile'])
selected_factor = okta_verify_api_method(
mfa_challenge_url, payload, headers)
mfa_provider = selected_factor["_embedded"]["factor"]["provider"].lower()
logging.debug("MFA Challenge URL: [{}] headers: {}".format(
:param selected_okta_factor: Duo factor information retrieved from Okta.
:return payload: required payload for Okta callback
:return headers: required headers for Okta callback
"""
try:
duo_info = prepare_duo_info(selected_okta_factor)
except KeyError as missing_key:
logging.error(
"There was an issue parsing the Okta factor."
" Please try again. \n{}".format(missing_key))
sys.exit(1)
# Collect devices, factors, auth params for Duo
duo_info, duo_auth_response = get_duo_sid(duo_info)
factor_options = get_duo_devices(duo_auth_response)
mfa_index = helpers.select_preferred_mfa_index(
factor_options, factor_key="factor", subfactor_key="device")
mfa_option = factor_options[mfa_index]
logging.debug("Selected MFA is [{}]".format(mfa_option))
passcode = set_passcode(mfa_option)
txid = duo_mfa_challenge(duo_info, mfa_option, passcode)
verify_mfa = duo_mfa_verify(duo_info, txid)
# Make factor callback to Duo
sig_response = duo_factor_callback(duo_info, verify_mfa)
# Prepare for Okta callback
payload = helpers.prepare_payload(id=duo_info["factor_id"],
sig_response=sig_response,
stateToken=duo_info["state_token"])
def process_okta_aws_app_url():
"""Process Okta app url.
:param app_url: string with okta tile URL.
:return: None.
"""
if not validate_okta_aws_app_url(settings.okta_aws_app_url):
logging.error("Okta Application URL not found, or invalid. Please check "
"your configuration and try again.")
sys.exit(2)
url = urlparse(settings.okta_aws_app_url)
okta_org = '{}://{}'.format(url.scheme, url.netloc)
okta_aws_app_url = '{}{}'.format(okta_org, url.path)
setattr(settings, 'okta_org', okta_org)
setattr(settings, 'okta_aws_app_url', okta_aws_app_url)
def process_okta_aws_app_url():
"""Process Okta app url.
:param app_url: string with okta tile URL.
:return: None.
"""
if not validate_okta_aws_app_url(settings.okta_aws_app_url):
logging.error("Okta Application URL not found, or invalid. Please check "
"your configuration and try again.")
sys.exit(2)
url = urlparse(settings.okta_aws_app_url)
okta_org = '{}://{}'.format(url.scheme, url.netloc)
okta_aws_app_url = '{}{}'.format(okta_org, url.path)
setattr(settings, 'okta_org', okta_org)
setattr(settings, 'okta_aws_app_url', okta_aws_app_url)
:param role_arns: IAM roles ARN list assigned for the user
:param saml_xml: Decoded saml response from Okta
:param saml_response_string: http response from saml assertion to AWS
:return: User input index selected by the user, the arn of selected role
"""
logging.debug("Select the role user wants to pick [{}]".format(role_arns))
if settings.role_arn is None:
selected_role = prompt_role_choices(
role_arns, saml_xml, saml_response_string)
elif settings.role_arn in role_arns:
selected_role = settings.role_arn
else:
logging.error(
"User provided rolename does not exist [{}]".format(settings.role_arn))
sys.exit(2)
logging.debug("Selected role: [{}]".format(selected_role))
return selected_role
def select_role_arn(role_arns, saml_xml, saml_response_string):
"""Select the role user wants to pick.
:param role_arns: IAM roles ARN list assigned for the user
:param saml_xml: Decoded saml response from Okta
:param saml_response_string: http response from saml assertion to AWS
:return: User input index selected by the user, the arn of selected role
"""
logging.debug("Select the role user wants to pick [{}]".format(role_arns))
if settings.role_arn is None:
selected_role = prompt_role_choices(
role_arns, saml_xml, saml_response_string)
elif settings.role_arn in role_arns:
selected_role = settings.role_arn
else:
logging.error(
"User provided rolename does not exist [{}]".format(settings.role_arn))
sys.exit(2)
logging.debug("Selected role: [{}]".format(selected_role))
return selected_role