Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_token_response(self):
server = self.create_server()
data = {'response_type': 'code', 'client_id': 'client'}
request = self.factory.post('/authorize', data=data)
grant_user = User.objects.get(username='foo')
resp = server.create_authorization_response(request, grant_user=grant_user)
self.assertEqual(resp.status_code, 302)
params = dict(url_decode(urlparse.urlparse(resp['Location']).query))
code = params['code']
request = self.factory.post(
'/oauth/token',
data={'grant_type': 'authorization_code', 'code': code},
HTTP_AUTHORIZATION=self.create_basic_auth('client', 'secret'),
)
resp = server.create_token_response(request)
self.assertEqual(resp.status_code, 200)
data = json.loads(resp.content)
return data
self.prepare_data()
rv = self.client.post('/oauth/authorize', data={
'client_id': 'hybrid-client',
'response_type': 'code id_token token',
'response_mode': 'query',
'state': 'bar',
'nonce': 'abc',
'scope': 'openid profile',
'redirect_uri': 'https://a.b',
'user_id': '1',
})
self.assertIn('code=', rv.location)
self.assertIn('id_token=', rv.location)
self.assertIn('access_token=', rv.location)
params = dict(url_decode(urlparse.urlparse(rv.location).query))
self.assertEqual(params['state'], 'bar')
def test_code_access_token(self):
self.prepare_data()
rv = self.client.post('/oauth/authorize', data={
'client_id': 'hybrid-client',
'response_type': 'code token',
'state': 'bar',
'nonce': 'abc',
'scope': 'openid profile',
'redirect_uri': 'https://a.b',
'user_id': '1',
})
self.assertIn('code=', rv.location)
self.assertIn('access_token=', rv.location)
self.assertNotIn('id_token=', rv.location)
params = dict(url_decode(urlparse.urlparse(rv.location).fragment))
self.assertEqual(params['state'], 'bar')
code = params['code']
headers = self.create_basic_header('hybrid-client', 'hybrid-secret')
rv = self.client.post('/oauth/token', data={
'grant_type': 'authorization_code',
'redirect_uri': 'https://a.b',
'code': code,
}, headers=headers)
resp = json.loads(rv.data)
self.assertIn('access_token', resp)
self.assertIn('id_token', resp)
def test_authorize_access_token(self):
self.prepare_data()
rv = self.client.post('/oauth/authorize', data={
'response_type': 'id_token token',
'client_id': 'implicit-client',
'scope': 'openid profile',
'state': 'bar',
'nonce': 'abc',
'redirect_uri': 'https://a.b/c',
'user_id': '1'
})
self.assertIn('access_token=', rv.location)
self.assertIn('id_token=', rv.location)
self.assertIn('state=bar', rv.location)
params = dict(url_decode(urlparse.urlparse(rv.location).fragment))
self.validate_claims(params['id_token'], params)
def test_authorize_token(self):
self.prepare_data()
rv = self.client.post('/oauth/authorize', data={
'response_type': 'code',
'client_id': 'code-client',
'state': 'bar',
'scope': 'openid profile',
'redirect_uri': 'https://a.b',
'user_id': '1'
})
self.assertIn('code=', rv.location)
params = dict(url_decode(urlparse.urlparse(rv.location).query))
self.assertEqual(params['state'], 'bar')
code = params['code']
headers = self.create_basic_header('code-client', 'code-secret')
rv = self.client.post('/oauth/token', data={
'grant_type': 'authorization_code',
'redirect_uri': 'https://a.b',
'code': code,
}, headers=headers)
resp = json.loads(rv.data)
self.assertIn('access_token', resp)
self.assertIn('id_token', resp)
jwt = JWT()
claims = jwt.decode(
resp['id_token'], 'secret',
def test_invalid_redirect_uri(self):
self.prepare_data()
uri = self.authorize_url + '&redirect_uri=https%3A%2F%2Fa.c'
rv = self.client.post(uri, data={'user_id': '1'})
resp = json.loads(rv.data)
self.assertEqual(resp['error'], 'invalid_request')
uri = self.authorize_url + '&redirect_uri=https%3A%2F%2Fa.b'
rv = self.client.post(uri, data={'user_id': '1'})
self.assertIn('code=', rv.location)
params = dict(url_decode(urlparse.urlparse(rv.location).query))
code = params['code']
headers = self.create_basic_header('code-client', 'code-secret')
rv = self.client.post('/oauth/token', data={
'grant_type': 'authorization_code',
'code': code,
}, headers=headers)
resp = json.loads(rv.data)
self.assertEqual(resp['error'], 'invalid_request')
client.
:param uri: The full redirect URL back to the client.
:param state: The state parameter from the authorization request.
For example, the authorization server redirects the user-agent by
sending the following HTTP response:
.. code-block:: http
HTTP/1.1 302 Found
Location: https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA
&state=xyz
"""
query = urlparse.urlparse(uri).query
params = dict(urlparse.parse_qsl(query))
if 'code' not in params:
raise MissingCodeError('Missing `code` in response.')
if state and params.get('state', None) != state:
raise MismatchingStateError()
return params
def validate_issuer(self):
"""REQUIRED. The authorization server's issuer identifier, which is
a URL that uses the "https" scheme and has no query or fragment
components.
"""
issuer = self.get('issuer')
#: 1. REQUIRED
if not issuer:
raise ValueError('"issuer" is required')
parsed = urlparse.urlparse(issuer)
#: 2. uses the "https" scheme
if parsed.scheme != 'https':
raise ValueError('"issuer" MUST use "https" scheme')
#: 3. has no query or fragment
if parsed.query or parsed.fragment:
raise ValueError('"issuer" has no query or fragment')
**state**
REQUIRED if the "state" parameter was present in the client
authorization request. The exact value received from the
client.
Similar to the authorization code response, but with a full token provided
in the URL fragment:
.. code-block:: http
HTTP/1.1 302 Found
Location: http://example.com/cb#access_token=2YotnFZFEjr1zCsicMWpAA
&state=xyz&token_type=example&expires_in=3600
"""
fragment = urlparse.urlparse(uri).fragment
params = dict(urlparse.parse_qsl(fragment, keep_blank_values=True))
if 'access_token' not in params:
raise MissingTokenError('Missing `access_token` in response.')
if 'token_type' not in params:
raise MissingTokenTypeError()
if state and params.get('state', None) != state:
raise MismatchingStateError()
return params
is represented by the base string URI: "http://example.com/r%20v/X".
In another example, the HTTPS request::
GET /?q=1 HTTP/1.1
Host: www.example.net:8080
is represented by the base string URI: "https://www.example.net:8080/".
.. _`Section 3.4.1.2`: https://tools.ietf.org/html/rfc5849#section-3.4.1.2
The host argument overrides the netloc part of the uri argument.
"""
uri = to_unicode(uri)
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
# The scheme, authority, and path of the request resource URI `RFC3986`
# are included by constructing an "http" or "https" URI representing
# the request resource (without the query or fragment) as follows:
#
# .. _`RFC3986`: https://tools.ietf.org/html/rfc3986
if not scheme or not netloc:
raise ValueError('uri must include a scheme and netloc')
# Per `RFC 2616 section 5.1.2`_:
#
# Note that the absolute path cannot be empty; if none is present in
# the original URI, it MUST be given as "/" (the server root).
#
# .. _`RFC 2616 section 5.1.2`: https://tools.ietf.org/html/rfc2616#section-5.1.2