Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_current_user_oauth_token(self):
"""Get the current user identified by OAuth access token
Separate from API token because OAuth access tokens
can only be used for identifying users,
not using the API.
"""
token = self.get_auth_token()
if token is None:
return None
orm_token = orm.OAuthAccessToken.find(self.db, token)
if orm_token is None:
return None
orm_token.last_activity = \
orm_token.user.last_activity = datetime.utcnow()
self.db.commit()
return self._user_from_orm(orm_token.user)
kwargs['domain'] = self.domain
user = self.get_current_user_cookie()
session_id = self.get_session_cookie()
if session_id:
# clear session id
self.clear_cookie(SESSION_COOKIE_NAME, **kwargs)
if user:
# user is logged in, clear any tokens associated with the current session
# don't clear session tokens if not logged in,
# because that could be a malicious logout request!
count = 0
for access_token in (
self.db.query(orm.OAuthAccessToken)
.filter(orm.OAuthAccessToken.user_id==user.id)
.filter(orm.OAuthAccessToken.session_id==session_id)
):
self.db.delete(access_token)
count += 1
if count:
self.log.debug("Deleted %s access tokens for %s", count, user.name)
self.db.commit()
# clear hub cookie
self.clear_cookie(self.hub.cookie_name, path=self.hub.base_url, **kwargs)
# clear services cookie
self.clear_cookie('jupyterhub-services', path=url_path_join(self.base_url, 'services'), **kwargs)
kwargs['domain'] = self.domain
user = self.get_current_user_cookie()
session_id = self.get_session_cookie()
if session_id:
# clear session id
self.clear_cookie(SESSION_COOKIE_NAME, **kwargs)
if user:
# user is logged in, clear any tokens associated with the current session
# don't clear session tokens if not logged in,
# because that could be a malicious logout request!
count = 0
for access_token in (
self.db.query(orm.OAuthAccessToken)
.filter(orm.OAuthAccessToken.user_id==user.id)
.filter(orm.OAuthAccessToken.session_id==session_id)
):
self.db.delete(access_token)
count += 1
if count:
self.log.debug("Deleted %s access tokens for %s", count, user.name)
self.db.commit()
# clear hub cookie
self.clear_cookie(self.hub.cookie_name, path=self.hub.base_url, **kwargs)
# clear services cookie
self.clear_cookie('jupyterhub-services', path=url_path_join(self.base_url, 'services'), **kwargs)
def delete(self, name, token_id):
"""Delete a token"""
user = self.find_user(name)
if not user:
raise web.HTTPError(404, "No such user: %s" % name)
token = self.find_token_by_id(user, token_id)
# deleting an oauth token deletes *all* oauth tokens for that client
if isinstance(token, orm.OAuthAccessToken):
client_id = token.client_id
tokens = [
token for token in user.oauth_tokens if token.client_id == client_id
]
else:
tokens = [token]
for token in tokens:
self.db.delete(token)
self.db.commit()
self.set_header('Content-Type', 'text/plain')
self.set_status(204)
raise ValueError("Only 'identify' scope is supported")
# redact sensitive keys in log
for key in ('access_token', 'refresh_token', 'state'):
if key in token:
value = token[key]
if isinstance(value, str):
log_token[key] = 'REDACTED'
app_log.debug("Saving bearer token %s", log_token)
if request.user is None:
raise ValueError("No user for access token: %s" % request.user)
client = (
self.db.query(orm.OAuthClient)
.filter_by(identifier=request.client.client_id)
.first()
)
orm_access_token = orm.OAuthAccessToken(
client=client,
grant_type=orm.GrantType.authorization_code,
expires_at=datetime.utcnow().timestamp() + token['expires_in'],
refresh_token=token['refresh_token'],
# TODO: save scopes,
# scopes=scopes,
token=token['access_token'],
session_id=request.session_id,
user=request.user,
)
self.db.add(orm_access_token)
self.db.commit()
return client.redirect_uri
def find_token_by_id(self, user, token_id):
"""Find a token object by token-id key
Raises 404 if not found for any reason
(e.g. wrong owner, invalid key format, etc.)
"""
not_found = "No such token %s for user %s" % (token_id, user.name)
prefix, id = token_id[0], token_id[1:]
if prefix == 'a':
Token = orm.APIToken
elif prefix == 'o':
Token = orm.OAuthAccessToken
else:
raise web.HTTPError(404, not_found)
try:
id = int(id)
except ValueError:
raise web.HTTPError(404, not_found)
orm_token = self.db.query(Token).filter(Token.id == id).first()
if orm_token is None or orm_token.user is not user.orm_user:
raise web.HTTPError(404, "Token not found %s", orm_token)
return orm_token
class OAuthClient(Base):
__tablename__ = 'oauth_clients'
id = Column(Integer, primary_key=True, autoincrement=True)
identifier = Column(Unicode(255), unique=True)
description = Column(Unicode(1023))
secret = Column(Unicode(255))
redirect_uri = Column(Unicode(1023))
@property
def client_id(self):
return self.identifier
access_tokens = relationship(
OAuthAccessToken,
backref='client',
cascade='all, delete-orphan',
)
codes = relationship(
OAuthCode,
backref='client',
cascade='all, delete-orphan',
)
# General database utilities
class DatabaseSchemaMismatch(Exception):
"""Exception raised when the database schema version does not match
the current version of JupyterHub.
"""
class OAuthClient(Base):
__tablename__ = 'oauth_clients'
id = Column(Integer, primary_key=True, autoincrement=True)
identifier = Column(Unicode(255), unique=True)
description = Column(Unicode(1023))
secret = Column(Unicode(255))
redirect_uri = Column(Unicode(1023))
@property
def client_id(self):
return self.identifier
access_tokens = relationship(
OAuthAccessToken, backref='client', cascade='all, delete-orphan'
)
codes = relationship(OAuthCode, backref='client', cascade='all, delete-orphan')
# General database utilities
class DatabaseSchemaMismatch(Exception):
"""Exception raised when the database schema version does not match
the current version of JupyterHub.
"""
def register_foreign_keys(engine):
"""register PRAGMA foreign_keys=on on connection"""