Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_extra_event(self):
# more the one event
lt = LogoutToken(
iss="https://example.com",
aud=["https://rp.example.org"],
events={
BACK_CHANNEL_LOGOUT_EVENT: {},
"http://schemas.openid.net/event/other}": {},
},
jti=rndstr(16),
iat=utc_time_sans_frac(),
sub="https://example.com/sub",
)
with pytest.raises(ValueError):
lt.verify()
def test_do_user_info_request_with_access_token_refresh(self):
args = {"response_type": ["code"],
"scope": ["openid"]}
r = self.client.do_authorization_request(state="state0",
request_args=args)
self.client.parse_response(AuthorizationResponse, r.headers["location"],
sformat="urlencoded")
self.client.do_access_token_request(scope="openid",
state="state0")
token = self.client.get_token(state="state0", scope="openid")
token.token_expiration_time = utc_time_sans_frac() - 86400
resp = self.client.do_user_info_request(state="state0")
assert isinstance(resp, OpenIDSchema)
assert _eq(resp.keys(), ['name', 'email', 'verified', 'nickname',
'sub'])
assert resp["name"] == "Melody Gardot"
def test_with_sub(self):
# All the required claims. Note there must be a sub, a sid or both
lt = LogoutToken(
iss="https://example.com",
aud=["https://rp.example.org"],
events={BACK_CHANNEL_LOGOUT_EVENT: {}},
iat=utc_time_sans_frac(),
jti=rndstr(16),
sub="https://example.com/sub",
)
assert lt.verify()
def test_get_access_token_refresh_with_refresh_token(self):
self.client.grant["foo"] = Grant()
_get = time_util.utc_time_sans_frac() + 60
self.client.grant["foo"].grant_expiration_time = _get
self.client.grant["foo"].code = "access_code"
resp = AccessTokenResponse(
refresh_token="refresh_with_me", access_token="access"
)
token = Token(resp)
self.client.grant["foo"].tokens.append(token)
# Uses refresh_token from previous response
atr = self.client.construct_RefreshAccessTokenRequest(token=token)
assert atr["grant_type"] == "refresh_token"
assert atr["refresh_token"] == "refresh_with_me"
def test_get_access_token_refresh_from_state(self):
self.client.grant["foo"] = Grant()
_get = time_util.utc_time_sans_frac() + 60
self.client.grant["foo"].grant_expiration_time = _get
self.client.grant["foo"].code = "access_code"
resp = AccessTokenResponse(
refresh_token="refresh_with_me", access_token="access"
)
self.client.grant["foo"].tokens.append(Token(resp))
# Uses refresh_token from previous response
atr = self.client.construct_RefreshAccessTokenRequest(state="foo")
assert isinstance(atr, RefreshAccessTokenRequest)
assert atr["grant_type"] == "refresh_token"
assert atr["refresh_token"] == "refresh_with_me"
def test_with_nonce(self):
lt = LogoutToken(
iss="https://example.com",
aud=["https://rp.example.org"],
events={BACK_CHANNEL_LOGOUT_EVENT: {}},
iat=utc_time_sans_frac(),
jti=rndstr(16),
nonce=rndstr(16),
)
with pytest.raises(MessageException):
lt.verify()
def registration_endpoint(self, data):
try:
req = self.parse_registration_request(data, "json")
except DecodeError:
req = self.parse_registration_request(data)
client_secret = rndstr()
expires = utc_time_sans_frac() + self.registration_expires_in
kwargs = {} # type: Dict[str, str]
if "client_id" not in req:
client_id = rndstr(10)
registration_access_token = rndstr(20)
_client_info = req.to_dict()
kwargs.update(_client_info)
_client_info.update(
{
"client_secret": client_secret,
"info": req.to_dict(),
"expires": expires,
"registration_access_token": registration_access_token,
"registration_client_uri": "register_endpoint",
}
)
self.client[client_id] = _client_info
auth_args["audience"] = token_object.audience if token_object else kwargs.get('audience', " ")
if token_object:
issuer = token_object.identity.split(", ")[1].split("=")[1]
oidc_client = OIDC_CLIENTS[issuer]
auth_args["client_id"] = oidc_client.client_id
token = ''
if not token_type:
token_type = kwargs.get('token_type', None)
if token_type == 'subject_token':
token = token_object.token
if token_type == 'refresh_token':
token = token_object.refresh_token
if token_type and token:
oidc_client.grant[auth_args['state']] = Grant()
oidc_client.grant[auth_args['state']].grant_expiration_time = time_util.utc_time_sans_frac() + 300
resp = AccessTokenResponse()
resp[token_type] = token
oidc_client.grant[auth_args['state']].tokens.append(Token(resp))
else:
secrets, client_secret = {}, {}
try:
with open(IDPSECRETS) as client_secret_file:
secrets = json.load(client_secret_file)
except:
raise CannotAuthenticate("Rucio server is missing information from the idpsecrets.json file.")
if 'issuer_id' in kwargs:
client_secret = secrets[kwargs.get('issuer_id', ADMIN_ISSUER_ID)]
elif 'issuer' in kwargs:
client_secret = next((secrets[i] for i in secrets if 'issuer' in secrets[i] and # NOQA: W504
secrets[i]['issuer'] == kwargs.get('issuer')), None)
redirect_url = kwargs.get('redirect_uri', None)
def pack_init(self):
argv = {"iss": self.iss, "iat": utc_time_sans_frac()}
argv["exp"] = argv["iat"] + self.lifetime
return argv
client_secret = secret(self.seed, client_id)
_rat = rndstr(32)
reg_enp = ""
for endp in self.endp:
if endp.etype == "registration":
reg_enp = urljoin(self.baseurl, endp.url)
break
self.cdb[client_id] = {
"client_id": client_id,
"client_secret": client_secret,
"registration_access_token": _rat,
"registration_client_uri": "%s?client_id=%s" % (reg_enp, client_id),
"client_secret_expires_at": self.client_secret_expiration_time(),
"client_id_issued_at": utc_time_sans_frac(),
"client_salt": rndstr(8),
}
_cinfo = self.do_client_registration(
request,
client_id,
ignore=["redirect_uris", "policy_uri", "logo_uri", "tos_uri"],
)
if isinstance(_cinfo, Response):
return _cinfo
response_cls = self.server.message_factory.get_response_type(
"registration_endpoint"
)
args = dict([(k, v) for k, v in _cinfo.items() if k in response_cls.c_param])