Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
super(TestAddonAuth, self).setUp()
self.user = AuthUserFactory()
self.auth_obj = Auth(user=self.user)
self.node = ProjectFactory(creator=self.user)
self.session = Session(data={'auth_user_id': self.user._id})
self.session.save()
self.cookie = itsdangerous.Signer(settings.SECRET_KEY).sign(self.session._id)
self.configure_addon()
self.JWE_KEY = jwe.kdf(settings.WATERBUTLER_JWE_SECRET.encode('utf-8'), settings.WATERBUTLER_JWE_SALT.encode('utf-8'))
def test_user_get_cookie(self):
user = UserFactory()
super_secret_key = 'children need maps'
signer = itsdangerous.Signer(super_secret_key)
session = Session(data={
'auth_user_id': user._id,
'auth_user_username': user.username,
'auth_user_fullname': user.fullname,
})
session.save()
assert signer.unsign(user.get_or_create_cookie(super_secret_key)) == session._id
def test_iter_unsigners(self, serializer, serializer_factory):
class Signer256(serializer.signer):
default_digest_method = hashlib.sha256
serializer = serializer_factory(
secret_key="secret_key",
fallback_signers=[
{"digest_method": hashlib.sha256},
(Signer, {"digest_method": hashlib.sha256}),
Signer256,
],
)
unsigners = serializer.iter_unsigners()
assert next(unsigners).digest_method == hashlib.sha1
for signer in unsigners:
assert signer.digest_method == hashlib.sha256
transform = kwargs.get('transform')
if transform:
if isinstance(transform, str):
transform = re.split(r'[,;:_ ]', transform)
# We replace delimiters with underscores, and percent with p, since
# these won't need escaping.
kwargs['transform'] = '_'.join(str(x).replace('%', 'p') for x in transform)
# Sign the query.
public_kwargs = (
(LONG_TO_SHORT.get(k, k), v)
for k, v in list(kwargs.items())
if v is not None and not k.startswith('_')
)
query = urlencode(sorted(public_kwargs), True)
signer = Signer(current_app.secret_key)
sig = signer.get_signature('%s?%s' % (local_path, query))
url = '%s/%s?%s&s=%s' % (
current_app.config['IMAGES_URL'],
urlquote(local_path),
query,
sig,
)
if external:
url = '%s://%s%s/%s' % (
scheme or request.scheme,
request.host,
request.script_root,
url.lstrip('/')
)
def handle_request(self, path):
# Verify the signature.
query = dict(iteritems(request.args))
old_sig = str(query.pop('s', None))
if not old_sig:
abort(404)
signer = Signer(current_app.secret_key)
new_sig = signer.get_signature('%s?%s' % (path, urlencode(sorted(iteritems(query)), True)))
if not constant_time_compare(str(old_sig), str(new_sig)):
log.warning("Signature mismatch: url's {} != expected {}".format(old_sig, new_sig))
abort(404)
# Expand kwargs.
query = dict((SHORT_TO_LONG.get(k, k), v) for k, v in iteritems(query))
remote_url = query.get('url')
if remote_url:
# This is redundant for newly built URLs, but not for those which
# have already been generated and cached.
parsed = urlparse(remote_url)
if parsed.scheme not in ALLOWED_SCHEMES:
abort(404)
def unsign(data, secret=None):
if secret is None:
secret = current_app.config["SECRET_KEY"]
s = Signer(secret)
return s.unsign(data)
def socketio(remaining):
real_request = request._get_current_object()
# add redis connection
real_request._conn = get_connection()
real_request._data_root = current_app.config.get('DATA_ROOT')
real_request._signer = itsdangerous.Signer(current_app.secret_key)
socketio_manage(request.environ, {
'/status': StatusNamespace,
'/build': BuildNamespace,
'/listen': CommandNamespace
}, request=real_request)
return Response()
def get_session_from_cookie(cookie_val):
"""
Given a cookie value, return the `Session` object or `None`.
:param cookie_val: the cookie
:return: the `Session` object or None
"""
try:
session_id = itsdangerous.Signer(settings.SECRET_KEY).unsign(cookie_val)
except itsdangerous.BadSignature:
return None
try:
session = Session.objects.get(_id=session_id)
return session
except Session.DoesNotExist:
return None