Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_openid_authorize(self):
app = Flask(__name__)
app.secret_key = '!'
oauth = OAuth(app)
client = oauth.register(
'dev',
client_id='dev',
api_base_url='https://i.b/api',
access_token_url='https://i.b/token',
authorize_url='https://i.b/authorize',
client_kwargs={'scope': 'openid profile'},
)
with app.test_request_context():
resp = client.authorize_redirect('https://b.com/bar')
self.assertEqual(resp.status_code, 302)
nonce = session['_dev_authlib_nonce_']
self.assertIsNotNone(nonce)
url = resp.headers.get('Location')
self.assertIn('nonce={}'.format(nonce), url)
def test_register_oauth2_remote_app(self):
app = Flask(__name__)
oauth = OAuth(app)
oauth.register(
'dev',
client_id='dev',
client_secret='dev',
api_base_url='https://i.b/api',
access_token_url='https://i.b/token',
refresh_token_url='https://i.b/token',
authorize_url='https://i.b/authorize',
update_token=lambda name: 'hi'
)
self.assertEqual(oauth.dev.name, 'dev')
session = oauth.dev._get_oauth_client()
self.assertIsNotNone(session.update_token)
def test_init_app_later(self):
app = Flask(__name__)
app.config.update({
'DEV_CLIENT_ID': 'dev',
'DEV_CLIENT_SECRET': 'dev',
})
oauth = OAuth()
remote = oauth.register('dev')
self.assertRaises(RuntimeError, lambda: oauth.dev.client_id)
oauth.init_app(app)
self.assertEqual(oauth.dev.client_id, 'dev')
self.assertEqual(remote.client_id, 'dev')
self.assertIsNone(oauth.cache)
self.assertIsNone(oauth.fetch_token)
self.assertIsNone(oauth.update_token)
def test_oauth1_authorize(self):
app = Flask(__name__)
app.secret_key = '!'
oauth = OAuth(app, cache=SimpleCache())
client = oauth.register(
'dev',
client_id='dev',
client_secret='dev',
request_token_url='https://i.b/reqeust-token',
api_base_url='https://i.b/api',
access_token_url='https://i.b/token',
authorize_url='https://i.b/authorize'
)
with app.test_request_context():
with mock.patch('requests.sessions.Session.send') as send:
send.return_value = mock_send_value('oauth_token=foo&oauth_verifier=baz')
resp = client.authorize_redirect('https://b.com/bar')
self.assertEqual(resp.status_code, 302)
url = resp.headers.get('Location')
def test_oauth2_authorize(self):
app = Flask(__name__)
app.secret_key = '!'
oauth = OAuth(app)
client = oauth.register(
'dev',
client_id='dev',
client_secret='dev',
api_base_url='https://i.b/api',
access_token_url='https://i.b/token',
authorize_url='https://i.b/authorize'
)
with app.test_request_context():
resp = client.authorize_redirect('https://b.com/bar')
self.assertEqual(resp.status_code, 302)
url = resp.headers.get('Location')
self.assertIn('state=', url)
state = session['_dev_authlib_state_']
self.assertIsNotNone(state)
def test_oauth2_access_token_with_post(self):
app = Flask(__name__)
app.secret_key = '!'
oauth = OAuth(app)
client = oauth.register(
'dev',
client_id='dev',
client_secret='dev',
api_base_url='https://i.b/api',
access_token_url='https://i.b/token',
authorize_url='https://i.b/authorize'
)
payload = {'code': 'a', 'state': 'b'}
with app.test_request_context(data=payload, method='POST'):
session['_dev_authlib_state_'] = 'b'
with mock.patch('requests.sessions.Session.send') as send:
send.return_value = mock_send_value(get_bearer_token())
token = client.authorize_access_token()
self.assertEqual(token['access_token'], 'a')
from ..libs.exceptions import MsgException, APIFailure, APIForbidden, APIClosed
from ..libs.helper import is_accept_json
from ..models.bgp import TBBGP
from ..models.user import TBUser, TBRole
class LoginManager(_LoginManager):
"""特定场景返回登录失效提示"""
def unauthorized(self):
if is_accept_json():
raise APIFailure('登录状态过期, 请刷新')
return super(LoginManager, self).unauthorized()
oauth = OAuth()
oauth.register('OA')
login_manager = LoginManager()
login_manager.session_protection = 'strong'
login_manager.login_view = 'web.web_login'
login_manager.login_message = '请先登录'
login_manager.login_message_category = 'info'
@login_manager.user_loader
def load_user(user_id):
"""Flask-login 获取用户信息"""
try:
user = pickle.loads(session['load_user'])
except Exception:
user = None
def create_oauth_app(service_config, name):
service_oauth = OAuth(app)
service_app = service_oauth.register(name, **service_config)
return service_app
import jwt
import json
import base64
import logging
from urllib.request import urlopen
from authlib.integrations.flask_client import OAuth
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends import default_backend
from aleph import signals, settings
oauth = OAuth()
log = logging.getLogger(__name__)
def configure_oauth(app, cache):
if settings.OAUTH:
oauth.provider = oauth.register(
name=settings.OAUTH_NAME,
client_id=settings.OAUTH_KEY,
client_secret=settings.OAUTH_SECRET,
client_kwargs={'scope': settings.OAUTH_SCOPE},
request_token_url=settings.OAUTH_REQUEST_TOKEN_URL,
access_token_method=settings.OAUTH_TOKEN_METHOD,
access_token_url=settings.OAUTH_TOKEN_URL,
api_base_url=settings.OAUTH_BASE_URL,
authorize_url=settings.OAUTH_AUTHORIZE_URL
)
from data.models import User
bp = Blueprint("auth", __name__, url_prefix="/auth")
db = DEFAULT_DATABASE.db
def fetch_google_token():
return session.get('google_token')
def update_google_token(token):
session['google_token'] = token
return session['google_token']
oauth = OAuth() # pylint: disable=invalid-name
oauth.register(
name='google', # nosec
api_base_url='https://www.googleapis.com/',
server_metadata_url=
'https://accounts.google.com/.well-known/openid-configuration',
fetch_token=fetch_google_token,
update_token=update_google_token,
client_kwargs={'scope': 'openid email profile'})
@bp.route("/login", methods=["GET"])
def login():
if is_authenticated():
return redirect("/")
# Allow OAuth bypass on local dev environment.