Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_fetch_request_token_with_optional_arguments(self):
auth = OAuth1Session('foo')
auth.send = mock_text_response('oauth_token=foo')
resp = auth.fetch_request_token('https://example.com/token',
verify=False, stream=True)
self.assertEqual(resp['oauth_token'], 'foo')
for k, v in resp.items():
self.assertTrue(isinstance(k, unicode_type))
self.assertTrue(isinstance(v, unicode_type))
def test_fetch_token_invalid_response(self):
auth = OAuth1Session('foo')
auth.send = mock_text_response('not valid urlencoded response!')
self.assertRaises(
ValueError, auth.fetch_request_token, 'https://example.com/token')
for code in (400, 401, 403):
auth.send = mock_text_response('valid=response', code)
# use try/catch rather than self.assertRaises, so we can
# assert on the properties of the exception
try:
auth.fetch_request_token('https://example.com/token')
except OAuthError as err:
self.assertEqual(err.error, 'fetch_token_denied')
else: # no exception raised
self.fail("ValueError not raised")
def test_fetch_request_token(self):
auth = OAuth1Session('foo')
auth.send = mock_text_response('oauth_token=foo')
resp = auth.fetch_request_token('https://example.com/token')
self.assertEqual(resp['oauth_token'], 'foo')
for k, v in resp.items():
self.assertTrue(isinstance(k, unicode_type))
self.assertTrue(isinstance(v, unicode_type))
resp = auth.fetch_request_token('https://example.com/token', realm='A')
self.assertEqual(resp['oauth_token'], 'foo')
resp = auth.fetch_request_token('https://example.com/token', realm=['A', 'B'])
self.assertEqual(resp['oauth_token'], 'foo')
def test_signature_types(self):
def verify_signature(getter):
def fake_send(r, **kwargs):
signature = to_unicode(getter(r))
self.assertIn('oauth_signature', signature)
resp = mock.MagicMock(spec=requests.Response)
resp.cookies = []
return resp
return fake_send
header = OAuth1Session('foo')
header.send = verify_signature(lambda r: r.headers['Authorization'])
header.post('https://i.b')
query = OAuth1Session('foo', signature_type=SIGNATURE_TYPE_QUERY)
query.send = verify_signature(lambda r: r.url)
query.post('https://i.b')
body = OAuth1Session('foo', signature_type=SIGNATURE_TYPE_BODY)
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
body.send = verify_signature(lambda r: r.body)
body.post('https://i.b', headers=headers, data='')
def test_create_authorization_url(self):
auth = OAuth1Session('foo')
url = 'https://example.comm/authorize'
token = 'asluif023sf'
auth_url = auth.create_authorization_url(url, request_token=token)
self.assertEqual(auth_url, url + '?oauth_token=' + token)
redirect_uri = 'https://c.b'
auth = OAuth1Session('foo', redirect_uri=redirect_uri)
auth_url = auth.create_authorization_url(url, request_token=token)
self.assertIn(escape(redirect_uri), auth_url)
def test_fetch_access_token_with_optional_arguments(self):
auth = OAuth1Session('foo', verifier='bar')
auth.send = mock_text_response('oauth_token=foo')
resp = auth.fetch_access_token('https://example.com/token',
verify=False, stream=True)
self.assertEqual(resp['oauth_token'], 'foo')
for k, v in resp.items():
self.assertTrue(isinstance(k, unicode_type))
self.assertTrue(isinstance(v, unicode_type))
def test_redirect_uri(self):
sess = OAuth1Session('foo')
self.assertIsNone(sess.redirect_uri)
url = 'https://i.b'
sess.redirect_uri = url
self.assertEqual(sess.redirect_uri, url)
self.assertRaises(ValueError, lambda: OAuth1Session(None))
'access_token_url': None,
'issuer': None
})
oidc_settings.setdefault('logout_url', None)
oidc_settings.setdefault('jwks_url', None)
oidc_settings.setdefault('jwks', None) # used as a cache, but could also be pre-populated
client_kwargs = oidc_settings.setdefault('client_kwargs', {})
scopes = set(client_kwargs.get('scope', '').split()) | {'openid'}
client_kwargs['scope'] = ' '.join(sorted(scopes))
self.oauth_app = RemoteApp(self.name + '_flaskmultipass',
client_id=oidc_settings['client_id'],
client_secret=oidc_settings['client_secret'],
authorize_url=oidc_settings['authorize_url'],
access_token_url=oidc_settings['access_token_url'],
client_kwargs=oidc_settings['client_kwargs'],
oauth1_client_cls=OAuth1Session,
oauth2_client_cls=OAuth2Session)
self.authorized_endpoint = '_flaskmultipass_oidc_' + self.name
current_app.add_url_rule(self.settings['callback_uri'], self.authorized_endpoint,
self._authorize_callback, methods=('GET', 'POST'))
def _import_oauth_clients():
rv = {}
try:
from ..requests_client import OAuth1Session, OAuth2Session
rv['requests'] = OAuth1Session, OAuth2Session
except ImportError:
pass
return rv