Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def azure_authorize_url_for(tentant):
return 'https://login.microsoftonline.com/{0}/oauth2/authorize'.format(
tentant)
class AzureAdMixin(OAuth2Mixin):
tenant_id = os.environ.get('AAD_TENANT_ID', '')
_OAUTH_ACCESS_TOKEN_URL = azure_token_url_for(tenant_id)
_OAUTH_AUTHORIZE_URL = azure_authorize_url_for(tenant_id)
class AzureAdLoginHandler(OAuthLoginHandler, AzureAdMixin):
pass
class AzureAdOAuthenticator(OAuthenticator):
login_service = Unicode(
os.environ.get('LOGIN_SERVICE', 'Azure AD'),
config=True,
help="""Azure AD domain name string, e.g. My College"""
)
login_handler = AzureAdLoginHandler
tenant_id = Unicode(config=True)
username_claim = Unicode(config=True)
@default('tenant_id')
def _tenant_id_default(self):
return os.environ.get('AAD_TENANT_ID', '')
@default('username_claim')
return {"Accept": "application/json",
"User-Agent": "JupyterHub",
"Authorization": "Bearer {}".format(access_token)
}
class BitbucketMixin(OAuth2Mixin):
_OAUTH_AUTHORIZE_URL = "https://bitbucket.org/site/oauth2/authorize"
_OAUTH_ACCESS_TOKEN_URL = "https://bitbucket.org/site/oauth2/access_token"
class BitbucketLoginHandler(OAuthLoginHandler, BitbucketMixin):
pass
class BitbucketOAuthenticator(OAuthenticator):
login_service = "Bitbucket"
client_id_env = 'BITBUCKET_CLIENT_ID'
client_secret_env = 'BITBUCKET_CLIENT_SECRET'
login_handler = BitbucketLoginHandler
team_whitelist = Set(
config=True,
help="Automatically whitelist members of selected teams",
)
bitbucket_team_whitelist = team_whitelist
headers = {"Accept": "application/json",
"User-Agent": "JupyterHub",
from jupyterhub.auth import LocalAuthenticator
from .oauth2 import OAuthLoginHandler, OAuthenticator
AUTH0_SUBDOMAIN = os.getenv('AUTH0_SUBDOMAIN')
class Auth0Mixin(OAuth2Mixin):
_OAUTH_AUTHORIZE_URL = "https://%s.auth0.com/authorize" % AUTH0_SUBDOMAIN
_OAUTH_ACCESS_TOKEN_URL = "https://%s.auth0.com/oauth/token" % AUTH0_SUBDOMAIN
class Auth0LoginHandler(OAuthLoginHandler, Auth0Mixin):
pass
class Auth0OAuthenticator(OAuthenticator):
login_service = "Auth0"
login_handler = Auth0LoginHandler
async def authenticate(self, handler, data=None):
code = handler.get_argument("code")
# TODO: Configure the curl_httpclient for tornado
http_client = AsyncHTTPClient()
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code':code,
'redirect_uri': self.get_callback_url(handler)
from traitlets import Dict, Unicode, List, default, validate
from jupyterhub.auth import LocalAuthenticator
from jupyterhub.utils import url_path_join
from .oauth2 import OAuthLoginHandler, OAuthCallbackHandler, OAuthenticator
def check_user_in_groups(member_groups, allowed_groups):
# Check if user is a member of any group in the allowed groups
if any(g in member_groups for g in allowed_groups):
return True # user _is_ in group
else:
return False
class GoogleOAuthenticator(OAuthenticator, GoogleOAuth2Mixin):
google_api_url = Unicode("https://www.googleapis.com", config=True)
@default('google_api_url')
def _google_api_url(self):
"""get default google apis url from env"""
google_api_url = os.getenv('GOOGLE_API_URL')
# default to googleapis.com
if not google_api_url:
google_api_url = 'https://www.googleapis.com'
return google_api_url
@default('scope')
def _scope_default(self):
return ['openid', 'email']
from traitlets import List, Set, Unicode, default
from .common import next_page_from_links
from .oauth2 import OAuthLoginHandler, OAuthenticator
def _api_headers(access_token):
return {
"Accept": "application/json",
"User-Agent": "JupyterHub",
"Authorization": "token {}".format(access_token),
}
class GitHubOAuthenticator(OAuthenticator):
# see github_scopes.md for details about scope config
# set scopes via config, e.g.
# c.GitHubOAuthenticator.scope = ['read:org']
login_service = "GitHub"
github_url = Unicode("https://github.com", config=True)
@default("github_url")
def _github_url_default(self):
github_url = os.environ.get("GITHUB_URL")
if not github_url:
# fallback on older GITHUB_HOST config,
# treated the same as GITHUB_URL
host = os.environ.get("GITHUB_HOST")
return {"Accept": "application/json",
"User-Agent": "JupyterHub",
"Authorization": "Bearer {}".format(access_token)
}
class GitLabMixin(OAuth2Mixin):
_OAUTH_AUTHORIZE_URL = "%s/oauth/authorize" % GITLAB_URL
_OAUTH_ACCESS_TOKEN_URL = "%s/oauth/access_token" % GITLAB_URL
class GitLabLoginHandler(OAuthLoginHandler, GitLabMixin):
pass
class GitLabOAuthenticator(OAuthenticator):
# see gitlab_scopes.md for details about scope config
# set scopes via config, e.g.
# c.GitLabOAuthenticator.scope = ['read_user']
login_service = "GitLab"
client_id_env = 'GITLAB_CLIENT_ID'
client_secret_env = 'GITLAB_CLIENT_SECRET'
login_handler = GitLabLoginHandler
gitlab_group_whitelist = Set(
config=True,
help="Automatically whitelist members of selected groups",
)
gitlab_project_id_whitelist = Set(
config=True,
from tornado.auth import OAuth2Mixin
from tornado import web
from tornado.httputil import url_concat
from tornado.httpclient import HTTPRequest, AsyncHTTPClient
from jupyterhub.auth import LocalAuthenticator
from traitlets import Unicode, Dict, Bool, Union, default, observe
from .traitlets import Callable
from .oauth2 import OAuthLoginHandler, OAuthenticator
class GenericOAuthenticator(OAuthenticator):
login_service = Unicode("OAuth 2.0", config=True)
extra_params = Dict(help="Extra parameters for first POST request").tag(config=True)
username_key = Union(
[Unicode(os.environ.get('OAUTH2_USERNAME_KEY', 'username')), Callable()],
config=True,
help="""
Userdata username key from returned json for USERDATA_URL.
Can be a string key name or a callable that accepts the returned
json (as a dict) and returns the username. The callable is useful
e.g. for extracting the username from a nested object in the
response.
""",
async def clear_tokens(self, user):
"""Revoke and clear user tokens from the database"""
state = await user.get_auth_state()
if state:
await self.authenticator.revoke_service_tokens(state.get('tokens'))
self.log.info(
'Logout: Revoked tokens for user "{}" services: {}'.format(
user.name, ','.join(state['tokens'].keys())
)
)
state['tokens'] = {}
await user.save_auth_state(state)
class GlobusOAuthenticator(OAuthenticator):
"""The Globus OAuthenticator handles both authorization and passing
transfer tokens to the spawner. """
login_service = 'Globus'
logout_handler = GlobusLogoutHandler
@default("userdata_url")
def _userdata_url_default(self):
return "https://auth.globus.org/v2/oauth2/userinfo"
@default("authorize_url")
def _authorize_url_default(self):
return "https://auth.globus.org/v2/oauth2/authorize"
@default("revocation_url")
def _revocation_url_default(self):
class CILogonLoginHandler(OAuthLoginHandler, CILogonMixin):
"""See http://www.cilogon.org/oidc for general information."""
def authorize_redirect(self, *args, **kwargs):
"""Add idp, skin to redirect params"""
extra_params = kwargs.setdefault('extra_params', {})
if self.authenticator.idp:
extra_params["selected_idp"] = self.authenticator.idp
if self.authenticator.skin:
extra_params["skin"] = self.authenticator.skin
return super().authorize_redirect(*args, **kwargs)
class CILogonOAuthenticator(OAuthenticator):
login_service = "CILogon"
client_id_env = 'CILOGON_CLIENT_ID'
client_secret_env = 'CILOGON_CLIENT_SECRET'
login_handler = CILogonLoginHandler
scope = List(Unicode(), default_value=['openid', 'email', 'org.cilogon.userinfo'],
config=True,
help="""The OAuth scopes to request.
See cilogon_scope.md for details.
At least 'openid' is required.
""",
)
@validate('scope')
OPENSHIFT_REST_API_URL = os.environ.get('OPENSHIFT_REST_API_URL') or OPENSHIFT_URL
class OpenShiftMixin(OAuth2Mixin):
_OAUTH_AUTHORIZE_URL = "%s/oauth/authorize" % OPENSHIFT_AUTH_API_URL
_OAUTH_ACCESS_TOKEN_URL = "%s/oauth/token" % OPENSHIFT_AUTH_API_URL
class OpenShiftLoginHandler(OAuthLoginHandler, OpenShiftMixin):
# This allows `Service Accounts as OAuth Clients` scenario
# https://docs.openshift.org/latest/architecture/additional_concepts/authentication.html#service-accounts-as-oauth-clients
@property
def scope(self):
return self.authenticator.scope
class OpenShiftOAuthenticator(OAuthenticator):
login_service = "OpenShift"
login_handler = OpenShiftLoginHandler
scope = ['user:info']
users_rest_api_path = '/apis/user.openshift.io/v1/users/~'
async def authenticate(self, handler, data=None):
code = handler.get_argument("code")
# TODO: Configure the curl_httpclient for tornado
http_client = AsyncHTTPClient()
# Exchange the OAuth code for a OpenShift Access Token
#