Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Make a RefreshTokenAuthorizer given the tokens stored on disk
Returns:
(RefreshTokenAuthorizer): Tool to generate authorization credentials
"""
if not check_logged_in():
safeprint("No authorization credentials present. You must log in")
do_login_flow()
# Get the authorization client
auth_client = internal_auth_client()
# Get the tokens needed by the service
rf_token = lookup_option(FUNCX_RT_OPTNAME)
at_token = lookup_option(FUNCX_AT_OPTNAME)
at_expires = int(lookup_option(FUNCX_AT_EXPIRES_OPTNAME))
authorizer = RefreshTokenAuthorizer(rf_token, auth_client, access_token=at_token,
expires_at=at_expires)
return authorizer
def _revoke_current_tokens(native_client):
"""
Revoke the tokens associated with a particular scope
Args:
native_client (NativeAppAuthClient): Authorization client for scope to be cleared
"""
for token_opt in (FUNCX_RT_OPTNAME, FUNCX_AT_OPTNAME):
token = lookup_option(token_opt)
if token:
native_client.oauth2_revoke_token(token)
Make a RefreshTokenAuthorizer given the tokens stored on disk
Returns:
(RefreshTokenAuthorizer): Tool to generate authorization credentials
"""
if not check_logged_in():
safeprint("No authorization credentials present. You must log in")
do_login_flow()
# Get the authorization client
auth_client = internal_auth_client()
# Get the tokens needed by the service
rf_token = lookup_option(FUNCX_RT_OPTNAME)
at_token = lookup_option(FUNCX_AT_OPTNAME)
at_expires = int(lookup_option(FUNCX_AT_EXPIRES_OPTNAME))
authorizer = RefreshTokenAuthorizer(rf_token, auth_client, access_token=at_token,
expires_at=at_expires)
return authorizer
Returns:
(RefreshTokenAuthorizer): Tool to generate authorization credentials
"""
if not check_logged_in():
safeprint("No authorization credentials present. You must log in")
do_login_flow()
# Get the authorization client
auth_client = internal_auth_client()
# Get the tokens needed by the service
rf_token = lookup_option(FUNCX_RT_OPTNAME)
at_token = lookup_option(FUNCX_AT_OPTNAME)
at_expires = int(lookup_option(FUNCX_AT_EXPIRES_OPTNAME))
authorizer = RefreshTokenAuthorizer(rf_token, auth_client, access_token=at_token,
expires_at=at_expires)
return authorizer
def logout():
"""
Remove Globus credentials from configuration file and revokes their authorization
"""
native_client = internal_auth_client()
# remove tokens from config and revoke them
# also, track whether or not we should print the rescind help
for token_opt in (FUNCX_RT_OPTNAME, FUNCX_AT_OPTNAME):
# first lookup the token -- if not found we'll continue
token = lookup_option(token_opt)
if not token:
safeprint(('Warning: Found no token named "{}"! '
'Recommend rescinding consent').format(token_opt))
continue
# token was found, so try to revoke it
try:
native_client.oauth2_revoke_token(token)
# if we network error, revocation failed -- print message and abort so
# that we can revoke later when the network is working
except globus_sdk.NetworkError:
safeprint(('Failed to reach Globus to revoke tokens. '
'Because we cannot revoke these tokens, cancelling '
'logout'))
return
# finally, we revoked, so it's safe to remove the token
remove_option(token_opt)