Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
kj = KeyJar()
kj.add_symmetric("", "dYMmrcQksKaPkhdgRNYk3zzh5l7ewdDJ", ["sig"])
kj.add_symmetric(
"https://sso.qa.7pass.ctf.prosiebensat1.com",
"dYMmrcQksKaPkhdgRNYk3zzh5l7ewdDJ",
["sig"],
)
packer = JWT(
kj,
sign_alg="HS256",
iss="https://sso.qa.7pass.ctf.prosiebensat1.com",
lifetime=3600,
)
_jws = packer.pack(**idt.to_dict())
msg = AuthorizationResponse(code=code, id_token=_jws)
verify_id_token(
msg,
check_hash=True,
keyjar=kj,
iss="https://sso.qa.7pass.ctf.prosiebensat1.com",
client_id="554295ce3770612820620000",
)
def test_do_user_info_request(self):
resp = AuthorizationResponse(code="code", state="state")
grant = Grant(10) # expired grant
grant.add_code(resp)
resp = AccessTokenResponse(refresh_token="refresh_with_me",
access_token="access")
token = Token(resp)
grant.tokens.append(token)
self.client.grant["state0"] = grant
resp = self.client.do_user_info_request(state="state0")
assert isinstance(resp, OpenIDSchema)
assert _eq(resp.keys(),
['name', 'email', 'verified', 'nickname', 'sub'])
assert resp["name"] == "Melody Gardot"
req = self.parse_authorization_request(query=query)
aevent = AuthnEvent("user", "salt", authn_info="acr")
sid = self.sdb.create_authz_session(aevent, areq=req)
self.sdb.do_sub(sid, "client_salt")
_info = self.sdb[sid]
if "code" in req["response_type"]:
if "token" in req["response_type"]:
grant = _info["code"]
if "offline_access" in _info["scope"]:
_dict = self.sdb.upgrade_to_token(grant, issue_refresh=True)
else:
_dict = self.sdb.upgrade_to_token(grant)
_dict["oauth_state"] = ("authz",)
_dict = by_schema(AuthorizationResponse(), **_dict)
resp = AuthorizationResponse(
**_dict
) # type: Union[AuthorizationResponse, AccessTokenResponse]
else:
_state = req["state"]
resp = AuthorizationResponse(state=_state, code=_info["code"])
else: # "implicit" in req.response_type:
grant = _info["code"]
params = AccessTokenResponse.c_param.keys()
_dict = dict(
[
(k, v)
for k, v in self.sdb.upgrade_to_token(grant).items()
if k in params
def mockParseResponse(self, response_class, info, sformat):
response = message.AuthorizationResponse()
response.from_dict(ARESP)
return response
def test_add_token_info():
server = srv_init
AREQ = AuthorizationRequest(response_type="code", client_id=CLIENT_ID,
redirect_uri="http://example.com/authz",
scope=["openid"], state="state000")
sid = server.sdb.create_authz_session("user_id", AREQ)
session = server.sdb[sid]
scode = session["code"]
aresp = AuthorizationResponse()
if AREQ.state:
aresp.state = AREQ.state
if AREQ.scope:
aresp.scope = AREQ.scope
if AREQ.nonce:
AREQ.nonce = AREQ.nonce
_dic = server.sdb.update_to_token(scode, issue_refresh=False)
add_token_info(aresp, _dic)
print aresp.keys()
assert _eq(aresp.keys(), ['access_token', 'expires_in', 'token_type',
'state', 'scope'])
kj = KeyJar()
kj.add_symmetric("", "dYMmrcQksKaPkhdgRNYk3zzh5l7ewdDJ", ["sig"])
kj.add_symmetric(
"https://sso.qa.7pass.ctf.prosiebensat1.com",
"dYMmrcQksKaPkhdgRNYk3zzh5l7ewdDJ",
["sig"],
)
packer = JWT(
kj,
sign_alg="HS256",
iss="https://sso.qa.7pass.ctf.prosiebensat1.com",
lifetime=3600,
)
_jws = packer.pack(**idt.to_dict())
msg = AuthorizationResponse(code=code, id_token=_jws)
with pytest.raises(MissingRequiredAttribute):
verify_id_token(
msg,
check_hash=True,
keyjar=kj,
iss="https://sso.qa.7pass.ctf.prosiebensat1.com",
client_id="554295ce3770612820620000",
)
retrieved above is stored against this token. Later, when a client
connects with this token, it is assumed to be a valid user.
:return: A display of the authentication token to use in the client. If
OIDC is not configured, raises a NotImplementedException.
"""
if app.oidcClient is None:
raise exceptions.NotImplementedException()
response = dict(flask.request.args.iteritems(multi=True))
aresp = app.oidcClient.parse_response(
message.AuthorizationResponse,
info=response,
sformat='dict')
sessState = flask.session.get('state')
respState = aresp['state']
if (not isinstance(aresp, message.AuthorizationResponse) or
respState != sessState):
raise exceptions.NotAuthenticatedException()
args = {
"code": aresp['code'],
"redirect_uri": app.oidcClient.redirect_uris[0],
"client_id": app.oidcClient.client_id,
"client_secret": app.oidcClient.client_secret
}
atr = app.oidcClient.do_access_token_request(
scope="openid",
state=respState,
request_args=args)
if not isinstance(atr, message.AccessTokenResponse):
raise exceptions.NotAuthenticatedException()
def response_endpoint(self, context, *args):
"""
Handles the authentication response from the OP.
:type context: satosa.context.Context
:type args: Any
:rtype: satosa.response.Response
:param context: SATOSA context
:param args: None
:return:
"""
backend_state = context.state[self.name]
authn_resp = self.client.parse_response(AuthorizationResponse, info=context.request, sformat="dict")
if backend_state[STATE_KEY] != authn_resp["state"]:
msg = "Missing or invalid state in authn response for state: {}".format(backend_state)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
raise SATOSAAuthenticationError(context.state, "Missing or invalid state in authn response")
self._check_error_response(authn_resp, context)
access_token, id_token_claims = self._get_tokens(authn_resp, context)
if id_token_claims:
self._verify_nonce(id_token_claims["nonce"], context)
else:
id_token_claims = {}
userinfo = {}
if access_token:
# make userinfo request
def provider_callback(request):
provider = request.session['provider']
client = build_client(provider)
response = request.META["QUERY_STRING"]
aresp = client.parse_response(
AuthorizationResponse, info=response, sformat="urlencoded")
# Make sure this is a callback we're expecting.
received_state = aresp['state']
if ('state' not in request.session or
request.session['state'] != received_state):
return HttpResponseBadRequest("States didn't match.")
del request.session['state']
args = {
"code": aresp["code"],
"redirect_uri": client.redirect_uris[0],
"client_id": client.client_id,
"client_secret": client.client_secret
}
atresp = client.do_access_token_request(scope="openid",
def _authz_req(self):
req_args = {"scope": ["openid", "profile"],
"redirect_uri": "http://localhost:8087/authz",
"response_type": ["code"],
"client_id": "client1"
}
areq = AuthorizationRequest(**req_args)
resp = self.provider.authorization_endpoint(areq.to_urlencoded())
return AuthorizationResponse().deserialize(
urlparse(resp.message).query, "urlencoded")