Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
mock_response = MockDispatch(self.token, assert_func=assert_func)
with OAuth2Client(self.client_id, dispatch=mock_response) as client:
token = client.fetch_token(url, authorization_response='https://i.b/?code=v')
self.assertEqual(token, self.token)
with OAuth2Client(
self.client_id,
token_endpoint_auth_method='none',
dispatch=mock_response
) as client:
token = client.fetch_token(url, code='v')
self.assertEqual(token, self.token)
mock_response = MockDispatch({'error': 'invalid_request'})
with OAuth2Client(self.client_id, dispatch=mock_response) as client:
self.assertRaises(OAuthError, client.fetch_token, url)
def test_add_token_to_uri(self):
def assert_func(request):
self.assertIn(self.token['access_token'], str(request.url))
mock_response = MockDispatch({'a': 'a'}, assert_func=assert_func)
with OAuth2Client(
self.client_id,
token=self.token,
token_placement='uri',
dispatch=mock_response
) as client:
resp = client.get('https://i.b')
data = resp.json()
self.assertEqual(data['a'], 'a')
def test_invalid_token_type(self):
token = {
'token_type': 'invalid',
'access_token': 'a',
'refresh_token': 'b',
'expires_in': '3600',
'expires_at': int(time.time()) + 3600,
}
with OAuth2Client(self.client_id, token=token) as client:
self.assertRaises(OAuthError, client.get, 'https://i.b')
def test_code_challenge(self):
sess = OAuth2Client(client_id=self.client_id, code_challenge_method='S256')
url = 'https://example.com/authorize'
auth_url, _ = sess.create_authorization_url(
url, code_verifier=generate_token(48))
self.assertIn('code_challenge', auth_url)
self.assertIn('code_challenge_method=S256', auth_url)
def test_token_status(self):
token = dict(access_token='a', token_type='bearer', expires_at=100)
sess = OAuth2Client('foo', token=token)
self.assertTrue(sess.token.is_expired())
)
dispatch = MockDispatch(self.token)
with OAuth2Client(
'foo', token=old_token, token_endpoint='https://i.b/token',
update_token=update_token, dispatch=dispatch
) as sess:
sess.get('https://i.b/user')
self.assertTrue(update_token.called)
old_token = dict(
access_token='a',
token_type='bearer',
expires_at=100
)
with OAuth2Client(
'foo', token=old_token, token_endpoint='https://i.b/token',
update_token=update_token, dispatch=dispatch
) as sess:
self.assertRaises(OAuthError, sess.get, 'https://i.b/user')
def test_request_without_token(self):
with OAuth2Client('a') as client:
self.assertRaises(OAuthError, client.get, 'https://i.b/token')
def test_token_auth_method_client_secret_post(self):
url = 'https://example.com/token'
def assert_func(request):
body = request.content.decode()
self.assertIn('code=v', body)
self.assertIn('client_id=', body)
self.assertIn('client_secret=bar', body)
self.assertIn('grant_type=authorization_code', body)
mock_response = MockDispatch(self.token, assert_func=assert_func)
with OAuth2Client(
self.client_id, 'bar',
token_endpoint_auth_method='client_secret_post',
dispatch=mock_response
) as client:
token = client.fetch_token(url, code='v')
self.assertEqual(token, self.token)
def test_fetch_token_post(self):
url = 'https://example.com/token'
def assert_func(request):
body = request.content.decode()
self.assertIn('code=v', body)
self.assertIn('client_id=', body)
self.assertIn('grant_type=authorization_code', body)
mock_response = MockDispatch(self.token, assert_func=assert_func)
with OAuth2Client(self.client_id, dispatch=mock_response) as client:
token = client.fetch_token(url, authorization_response='https://i.b/?code=v')
self.assertEqual(token, self.token)
with OAuth2Client(
self.client_id,
token_endpoint_auth_method='none',
dispatch=mock_response
) as client:
token = client.fetch_token(url, code='v')
self.assertEqual(token, self.token)
mock_response = MockDispatch({'error': 'invalid_request'})
with OAuth2Client(self.client_id, dispatch=mock_response) as client:
self.assertRaises(OAuthError, client.fetch_token, url)
def _update_token(token, refresh_token=None, access_token=None):
self.assertEqual(access_token, 'a')
self.assertEqual(token, self.token)
update_token = mock.Mock(side_effect=_update_token)
old_token = dict(
access_token='a',
token_type='bearer',
expires_at=100
)
dispatch = MockDispatch(self.token)
with OAuth2Client(
'foo', token=old_token,
token_endpoint='https://i.b/token',
grant_type='client_credentials',
dispatch=dispatch
) as sess:
sess.get('https://i.b/user')
self.assertFalse(update_token.called)
with OAuth2Client(
'foo', token=old_token, token_endpoint='https://i.b/token',
update_token=update_token, grant_type='client_credentials',
dispatch=dispatch
) as sess:
sess.get('https://i.b/user')
self.assertTrue(update_token.called)