Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_rsa_should_parse_pem_public_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
with open(key_path("testkey2_rsa.pub.pem"), "r") as pem_key:
algo.prepare_key(pem_key.read())
def test_rsa_verify_should_return_true_if_signature_valid(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
jwt_message = force_bytes("Hello World!")
jwt_sig = base64.b64decode(
force_bytes(
"yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp"
"10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl"
"2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix"
"sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX"
"fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA"
"APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=="
)
)
with open(key_path("testkey_rsa.pub"), "r") as keyfile:
jwt_pub_key = algo.prepare_key(keyfile.read())
def test_rsa_sign_should_generate_correct_signature_value(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
jwt_message = force_bytes("Hello World!")
expected_sig = base64.b64decode(
force_bytes(
"yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp"
"10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl"
"2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix"
"sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX"
"fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA"
"APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=="
)
)
with open(key_path("testkey_rsa"), "r") as keyfile:
jwt_key = algo.prepare_key(keyfile.read())
def test_rsa_should_accept_unicode_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
with open(key_path("testkey_rsa"), "r") as rsa_key:
algo.prepare_key(force_unicode(rsa_key.read()))
import json
import logging
import os
import ssl
from Crypto.Util import asn1
from google.appengine.api import urlfetch
from google.appengine.api import urlfetch_errors
import jwt
from jwt.contrib.algorithms.pycrypto import RSAAlgorithm
import jwt.exceptions
# For App Engine, pyjwt needs to use PyCrypto instead of Cryptography.
jwt.register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256))
# [START fetch_certificates]
# This URL contains a list of active certificates used to sign Firebase
# auth tokens.
FIREBASE_CERTIFICATES_URL = (
'https://www.googleapis.com/robot/v1/metadata/x509/'
'securetoken@system.gserviceaccount.com')
# [START get_firebase_certificates]
def get_firebase_certificates():
"""Fetches the current Firebase certificates.
Note: in a production application, you should cache this for at least
an hour.
"""
import json
import logging
import jwt
import umapi_client
import helper
import user_sync.config
import user_sync.error
import user_sync.helper
import user_sync.identity_type
from user_sync.version import __version__ as APP_VERSION
try:
from jwt.contrib.algorithms.pycrypto import RSAAlgorithm
jwt.register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256))
except:
pass
class DashboardConnector(object):
def __init__(self, name, caller_options):
'''
:type name: str
:type caller_options: dict
'''
caller_config = user_sync.config.DictConfig('"%s dashboard options"' % name, caller_options.content)
builder = user_sync.config.OptionsBuilder(caller_config)
builder.set_string_value('logger_name', 'dashboard.' + name)
builder.set_bool_value('test_mode', False)
options = builder.get_options()
server_config = caller_config.get_dict_config('server', True)
import json
import logging
import jwt
import umapi_client.auth
import helper
import user_sync.config
import user_sync.helper
import user_sync.identity_type
from user_sync.error import AssertionException
from user_sync.version import __version__ as APP_VERSION
try:
from jwt.contrib.algorithms.pycrypto import RSAAlgorithm
jwt.register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256))
except:
pass
class UmapiConnector(object):
def __init__(self, name, caller_options):
'''
:type name: str
:type caller_options: dict
'''
self.name = 'umapi' + name
caller_config = user_sync.config.DictConfig(self.name + ' configuration', caller_options)
builder = user_sync.config.OptionsBuilder(caller_config)
builder.set_string_value('logger_name', self.name)
builder.set_bool_value('test_mode', False)
builder.set_bool_value('bypass_authentication_mode', False)
options = builder.get_options()
import helper
import json
import jwt
import logging
import umapi_client
import user_sync.identity_type
try:
from jwt.contrib.algorithms.pycrypto import RSAAlgorithm
jwt.register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256))
except:
pass
class DashboardConnector(object):
def __init__(self, name, caller_options):
'''
:type name: str
:type caller_options: dict
'''
caller_config = user_sync.config.DictConfig('"%s dashboard options"' % name, caller_options)
builder = user_sync.config.OptionsBuilder(caller_config)
builder.set_string_value('logger_name', 'dashboard.' + name)
builder.set_bool_value('test_mode', False)
options = builder.get_options()
server_config = caller_config.get_dict_config('server', True)