Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# app_log.info("Response %s", resp_json)
access_token = resp_json['access_token']
id_token = resp_json['id_token']
decoded = jwt.decode(id_token, verify=False)
userdict = {"name": decoded[self.username_claim]}
userdict["auth_state"] = auth_state = {}
auth_state['access_token'] = access_token
# results in a decoded JWT for the user data
auth_state['user'] = decoded
return userdict
class LocalAzureAdOAuthenticator(LocalAuthenticator, AzureAdOAuthenticator):
"""A version that mixes in local system user creation"""
pass
validate_cert=False,
headers=headers)
resp = await http_client.fetch(req)
resp_json = json.loads(resp.body.decode('utf8', 'replace'))
return {
'name': resp_json['metadata']['name'],
'auth_state': {
'access_token': access_token,
'openshift_user': resp_json,
}
}
class LocalOpenShiftOAuthenticator(LocalAuthenticator, OpenShiftOAuthenticator):
"""A version that mixes in local system user creation"""
pass
odr_base_url = 'http://delta.odr.io'
# _OAUTH_ACCESS_TOKEN_URL = os.environ.get('OAUTH2_TOKEN_URL', '')
_OAUTH_ACCESS_TOKEN_URL = odr_base_url + "/oauth/v2/token"
# _OAUTH_AUTHORIZE_URL = os.environ.get('OAUTH2_AUTHORIZE_URL', '')
_OAUTH_AUTHORIZE_URL = odr_base_url + "/oauth/v2/auth"
class ODRLoginHandler(OAuthLoginHandler, ODREnvMixin):
pass
class ODROAuthenticator(OAuthenticator):
pass
class LocalODROAuthenticator(LocalAuthenticator, ODROAuthenticator):
"""Uses local system user creation so that ODR is able to define the user list"""
login_service = "ODR OAuth"
login_handler = ODRLoginHandler
# Currently defined in the jupyterhub_config.py file
manager_token = Unicode(
os.environ.get('OAUTH2_MANAGER_TOKEN', ''),
config=True,
help="TODO"
)
manager_port = Unicode(
os.environ.get('OAUTH2_MANAGER_PORT', ''),
config=True,
help="TODO"
"OAuth user contains no key %s: %s", self.username_key, resp_json
)
return
return {
'name': name,
'auth_state': {
'access_token': access_token,
'refresh_token': refresh_token,
'oauth_user': resp_json,
'scope': scope,
},
}
class LocalGenericOAuthenticator(LocalAuthenticator, GenericOAuthenticator):
"""A version that mixes in local system user creation"""
pass
method="GET",
headers=headers
)
resp = await http_client.fetch(req)
resp_json = json.loads(resp.body.decode('utf8', 'replace'))
return {
'name': resp_json["email"],
'auth_state': {
'access_token': access_token,
'auth0_user': resp_json,
}
}
class LocalAuth0OAuthenticator(LocalAuthenticator, Auth0OAuthenticator):
"""A version that mixes in local system user creation"""
pass
raise web.HTTPError(500, 'Authentication Failed: Token Not Acquired')
state = json.loads(response.body.decode('utf8', 'replace'))
access_token = state['access_token']
info_request = self.get_user_info_request(access_token)
response = await http_client.fetch(info_request)
user = json.loads(response.body.decode('utf8', 'replace'))
# TODO: preserve state in auth_state when JupyterHub supports encrypted auth_state
return {
'name': user['email'],
'auth_state': {
'access_token': access_token,
'okpy_user': user,
}
}
class LocalOkpyOAuthenticator(LocalAuthenticator, OkpyOAuthenticator):
"""A version that mixes in local system user creation"""
pass
'name' holds the username and 'auth_state' holds all data requested from shibboleth.
"""
user_data = {
'name': data['jh_name'],
'auth_state': data
}
return user_data
def get_handlers(self, app):
return [
(r'/login', ShibbolethLoginHandler),
(r'/logout', ShibbolethLogoutHandler),
]
class ShibbolethLocalAuthenticator(ShibbolethAuthenticator, LocalAuthenticator):
def add_system_user(self, user):
super(ShibbolethLocalAuthenticator, self).add_system_user(user)
name = user.name
notebooks_folder = '/home/{}/notebooks'.format(name)
add_system_user(name, notebooks_folder)
header_name = Unicode(
default_value='REMOTE_USER',
config=True,
help="""HTTP header to inspect for the authenticated username.""")
def get_handlers(self, app):
return [
(r'/login', RemoteUserLoginHandler),
]
@gen.coroutine
def authenticate(self, *args):
raise NotImplementedError()
class RemoteUserLocalAuthenticator(LocalAuthenticator):
"""
Accept the authenticated user name from the REMOTE_USER HTTP header.
Derived from LocalAuthenticator for use of features such as adding
local accounts through the admin interface.
"""
header_name = Unicode(
default_value='REMOTE_USER',
config=True,
help="""HTTP header to inspect for the authenticated username.""")
def get_handlers(self, app):
return [
(r'/login', RemoteUserLoginHandler),
]
@gen.coroutine
username = gotten_name
userdict = {"name": username}
# Now we set up auth_state
userdict["auth_state"] = auth_state = {}
# Save the token response and full CILogon reply in auth state
# These can be used for user provisioning
# in the Lab/Notebook environment.
auth_state['token_response'] = token_response
# store the whole user model in auth_state.cilogon_user
# keep access_token as well, in case anyone was relying on it
auth_state['access_token'] = access_token
auth_state['cilogon_user'] = resp_json
return userdict
class LocalCILogonOAuthenticator(LocalAuthenticator, CILogonOAuthenticator):
"""A version that mixes in local system user creation"""
pass
)
resp = None
try:
resp = yield http_client.fetch(req)
except HTTPError as e:
# This is likely a 400 Bad Request error due to an invalid username or password
return
if resp is not None and resp.code == 200:
return username
else:
return
class LocalGenePatternAuthenticator(LocalAuthenticator, GenePatternAuthenticator):
"""A version that mixes in local system user creation"""
pass