Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def f(sjwt, iat_skew=timedelta()):
""" verify token using node-jsjws """
r = spawn(
"fixtures.verify({now}, {sjwt}, {iat_skew}, {key}, {alg})".format(
now=timegm(datetime.utcnow().utctimetuple()),
sjwt=json_encode(sjwt),
iat_skew=iat_skew.total_seconds(),
key=json_encode(base64url_decode(json_decode(key.export())['k']) if key.is_symmetric else key.export_to_pem()),
alg=json_encode(alg)),
True)
return tuple(r)
return f
def f(claims, alg, lifetime=None, expires=None, not_before=None):
""" generate token using node-jsjws """
now = datetime.utcnow()
return spawn(
"fixtures.generate({now}, {header}, {claims}, {expires}, {not_before}, {key})".format(
now=timegm(now.utctimetuple()),
header=json_encode({'alg': alg}),
claims=json_encode(claims),
expires=timegm(((now + lifetime) if lifetime else expires).utctimetuple()),
not_before=timegm((not_before or now).utctimetuple()),
key=json_encode(base64url_decode(json_decode(key.export())['k']) if key.is_symmetric else key.export_to_pem(True, None))),
False)
return f
- Its claims must contain a property **iat** which represents a date in the past (taking into account :obj:`iat_skew`).
- Its claims must contain a property **nbf** which represents a date in the past.
- Its claims must contain a property **exp** which represents a date in the future.
:raises: If the token failed to verify.
"""
if allowed_algs is None:
allowed_algs = []
if not isinstance(allowed_algs, list):
# jwcrypto only supports list of allowed algorithms
raise _JWTError('allowed_algs must be a list')
header, claims, _ = jwt.split('.')
parsed_header = json_decode(base64url_decode(header))
alg = parsed_header.get('alg')
if alg is None:
raise _JWTError('alg header not present')
if alg not in allowed_algs:
raise _JWTError('algorithm not allowed: ' + alg)
if not ignore_not_implemented:
for k in parsed_header:
if k not in JWSHeaderRegistry:
raise _JWTError('unknown header: ' + k)
if not JWSHeaderRegistry[k].supported:
raise _JWTError('header not implemented: ' + k)
if pub_key:
token = JWS()
def process_jwt(jwt):
"""
Process a JSON Web Token without verifying it.
Call this before :func:`verify_jwt` if you need access to the header or claims in the token before verifying it. For example, the claims might identify the issuer such that you can retrieve the appropriate public key.
:param jwt: The JSON Web Token to verify.
:type jwt: str or unicode
:rtype: tuple
:returns: ``(header, claims)``
"""
header, claims, _ = jwt.split('.')
parsed_header = json_decode(base64url_decode(header))
parsed_claims = json_decode(base64url_decode(claims))
return parsed_header, parsed_claims
def process_jwt(jwt):
"""
Process a JSON Web Token without verifying it.
Call this before :func:`verify_jwt` if you need access to the header or claims in the token before verifying it. For example, the claims might identify the issuer such that you can retrieve the appropriate public key.
:param jwt: The JSON Web Token to verify.
:type jwt: str or unicode
:rtype: tuple
:returns: ``(header, claims)``
"""
header, claims, _ = jwt.split('.')
parsed_header = json_decode(base64url_decode(header))
parsed_claims = json_decode(base64url_decode(claims))
return parsed_header, parsed_claims
def extract_kid_from_header(token):
header = token.split('.')[:1][0]
if header is "":
raise InvalidTokenException("Missing Headers")
try:
protected_header = base64url_decode(header).decode()
protected_header_data = json.loads(protected_header)
if 'kid' in protected_header:
return protected_header_data['kid']
except ValueError:
raise InvalidTokenException("Invalid Header")
raise InvalidTokenException("Missing kid")
self._eq_session = data_access.get_by_key(EQSession, self.eq_session_id)
if self._eq_session:
self.user_id = self._eq_session.user_id
if self._eq_session.session_data:
encrypted_session_data = self._eq_session.session_data
session_data = StorageEncryption(self.user_id, self.user_ik, self.pepper) \
.decrypt_data(encrypted_session_data)
session_data = session_data.decode()
# for backwards compatibility
# session data used to be base64 encoded before encryption
try:
session_data = base64url_decode(session_data).decode()
except ValueError:
pass
self.session_data = json.loads(session_data, object_hook=lambda d: SessionData(**d))
logger.debug('found matching eq_session for eq_session_id in database',
session_id=self._eq_session.eq_session_id,
user_id=self._eq_session.user_id)
else:
logger.debug('eq_session_id not found in database', eq_session_id=self.eq_session_id)
return self._eq_session
def _get_base64_encoded_data(self, data):
"""
Legacy data was stored in a dict, base64-encoded, and not compressed:
{ 'data': '
if not ignore_not_implemented:
for k in parsed_header:
if k not in JWSHeaderRegistry:
raise _JWTError('unknown header: ' + k)
if not JWSHeaderRegistry[k].supported:
raise _JWTError('header not implemented: ' + k)
if pub_key:
token = JWS()
token.allowed_algs = allowed_algs
token.deserialize(jwt, pub_key)
elif 'none' not in allowed_algs:
raise _JWTError('no key but none alg not allowed')
parsed_claims = json_decode(base64url_decode(claims))
utcnow = datetime.utcnow()
now = timegm(utcnow.utctimetuple())
typ = parsed_header.get('typ')
if typ is None:
if not checks_optional:
raise _JWTError('typ header not present')
elif typ != 'JWT':
raise _JWTError('typ header is not JWT')
iat = parsed_claims.get('iat')
if iat is None:
if not checks_optional:
raise _JWTError('iat claim not present')
elif iat > timegm((utcnow + iat_skew).utctimetuple()):