Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def confirm_account(self, token):
"""Verify that the provided token is for this user's id."""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except (BadSignature, SignatureExpired):
return False
if data.get('confirm') != self.id:
return False
self.confirmed = True
db.session.add(self)
db.session.commit()
return True
To unsign url safe data.
If expires_in is provided it will Time the signature
:param token:
:param secret_key:
:param salt: (string) a namespace key
:param kw:
:return:
"""
if len(token.split(".")) == 3:
s = URLSafeTimedSerializer2(secret_key=__CRYPT.get("secret_key"), salt=__CRYPT.get("salt"), **kw)
value, timestamp = s.loads(token, max_age=None, return_timestamp=True)
now = datetime.datetime.utcnow()
if timestamp > now:
return value
else:
raise itsdangerous.SignatureExpired(
'Signature age %s < %s ' % (timestamp, now),
payload=value,
date_signed=timestamp)
else:
s = itsdangerous.URLSafeSerializer(secret_key=__CRYPT.get("secret_key"), salt=__CRYPT.get("salt"), **kw)
return s.loads(token)
def decorated_view(*args, **kwargs):
if not current_user.is_authenticated:
token = request.headers.get('token', None)
if token:
try:
user = Users.verify_auth_token(current_app.secret_key, token)
request.api_user = user
except BadSignature:
abort(401, 'Token did not match')
except SignatureExpired:
abort(401, 'Signature Expired')
except Exception:
abort(401, 'Unknown error')
else:
abort(400, 'Missing token')
return func(*args, **kwargs)
return decorated_view
request.META.get('REMOTE_ADDR'))
if d['remote_addr'] != remote_addr:
return None
return get_object_or_404(User, username=d['username'])
except BadSignature:
# try timed key
s2 = URLSafeTimedSerializer(settings.API_SECRET,
salt="rolf-timed-key")
try:
d = s2.loads(key, max_age=60 * 60 * 24 * 7)
return get_object_or_404(User, username=d['username'])
except BadSignature:
# invalid
return None
except SignatureExpired:
return None
return None
def validate_state(state):
"""Validate state token returned by authentication."""
state_token = None
try:
# Attempt to decode state
state_token = GENERATOR.loads(
state,
max_age=timedelta(minutes=60).total_seconds()
)
except SignatureExpired:
# Token has expired
report_event('token_expired', {
'state': state
})
abort(400)
except BadSignature:
# Token is not authorized
report_event('token_not_authorized', {
'state': state
})
abort(401)
if not state_token or state_token != PROJECT_INFO['client_id']:
# Token is not authorized
report_event('token_not_valid', {
def loads(self, raw_token):
"""
Transforms a JWT into a flaskbb.core.tokens.Token.
If a token is invalid due to it being malformed,
tampered with or expired, a flaskbb.core.tokens.TokenError
is raised. Errors not related to token parsing are
simply propagated.
:str raw_token: JWT to be parsed
:returns flaskbb.core.tokens.Token: Parsed token
"""
try:
parsed = self._serializer.loads(raw_token)
except SignatureExpired:
raise tokens.TokenError.expired()
except BadSignature: # pragma: no branch
raise tokens.TokenError.invalid()
# ideally we never end up here as BadSignature should
# catch everything else, however since this is the root
# exception for itsdangerous we'll catch it down and
# and re-raise our own
except BadData: # pragma: no cover
raise tokens.TokenError.bad()
else:
return tokens.Token(user_id=parsed['id'], operation=parsed['op'])
def verify_auth_token(token, secret_key):
s = Serializer(secret_key)
try:
data = s.loads(token)
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
def get_user_by_token(token: str) -> Optional[User]:
"""Return User using passed token.
:param token: authorization token
:return: User if found or None
"""
serializer = Serializer(AppConfiguration.get('api', 'secret_key'))
try:
data = serializer.loads(token)
except SignatureExpired:
return None # Valid token, but expired
except BadSignature:
return None # Invalid token
return User.query.get(data['id'])
def check_api_key(cls, token, max_age=7776000):
'''
max_age is three months
'''
serializer = cls._token_serializer()
try:
email = serializer.loads(token, max_age=max_age)
except BadSignature:
return False
except SignatureExpired:
return False
user = cls.query.filter_by(email=email, key=token).first()
if user is None:
return False
return user
def authenticate(self, auth_info, userstore):
print '***** attempting to authenticate using TokenAuthenticator'
token = auth_info.token
# current_app = flask_globals.current_app
print '***** verifying auth token: ', token
if not token:
raise Exception('token is missing or empty')
serializer = URLSafeTimedSerializer(self.secret_key)
try:
print '***** attempting to deserialize the token'
open_token = serializer.loads(token)
except SignatureExpired:
print '***** exception SignatureExpired, returning None'
return None # valid token, but expired
except BadSignature:
print '***** exception BadSignature, returning None'
return None # invalid token
print '***** token loaded successfully, user email from token is: ', \
open_token['email']
# TODO should the identity field in the token be configurable?
user_id = open_token['user_id']
print '***** getting user from userstore: ', userstore
user = userstore.get_user(user_id)
# user = userstore.find_user(email=data['email'])
# for the SQLAlchemy model: user = User.query.get(data['id'])
return user