Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def registration_endpoint(self, data):
try:
req = self.parse_registration_request(data, "json")
except DecodeError:
req = self.parse_registration_request(data)
client_secret = rndstr()
expires = utc_time_sans_frac() + self.registration_expires_in
kwargs = {} # type: Dict[str, str]
if "client_id" not in req:
client_id = rndstr(10)
registration_access_token = rndstr(20)
_client_info = req.to_dict()
kwargs.update(_client_info)
_client_info.update(
{
"client_secret": client_secret,
"info": req.to_dict(),
"expires": expires,
"registration_access_token": registration_access_token,
"registration_client_uri": "register_endpoint",
}
)
def registration_endpoint(self, data):
try:
req = self.parse_registration_request(data, "json")
except ValueError:
req = self.parse_registration_request(data)
client_secret = rndstr()
expires = utc_time_sans_frac() + self.registration_expires_in
kwargs = {} # type: Dict[str, str]
if "client_id" not in req:
client_id = rndstr(10)
registration_access_token = rndstr(20)
_client_info = req.to_dict()
kwargs.update(_client_info)
_client_info.update(
{
"client_secret": client_secret,
"info": req.to_dict(),
"expires": expires,
"registration_access_token": registration_access_token,
"registration_client_uri": "register_endpoint",
}
)
:param session_state: state value of the first authorization request
:param token_object: DB token token to be included in a Grant for
the token exchange or token refresh mechanisms
:param token_type: e.g. "subject_token" for token exchange or "refresh_token"
:param kwargs: optional strings which contain expected oauth session parameters:
issuer_id/issuer, redirect_uri, redirect_to, state, nonce, code,
scope, audience,
:returns: if first_init == True: dict {'client': oidc client object, 'request': auth_url}
for all other cases return oidc client object. If anything goes wrong, exception is thrown.
"""
try:
auth_args = {"grant_types": ["authorization_code"],
"response_type": "code",
"state": kwargs.get('state', rndstr()),
"nonce": kwargs.get('nonce', rndstr())}
auth_args["scope"] = token_object.oidc_scope if token_object else kwargs.get('scope', " ")
auth_args["audience"] = token_object.audience if token_object else kwargs.get('audience', " ")
if token_object:
issuer = token_object.identity.split(", ")[1].split("=")[1]
oidc_client = OIDC_CLIENTS[issuer]
auth_args["client_id"] = oidc_client.client_id
token = ''
if not token_type:
token_type = kwargs.get('token_type', None)
if token_type == 'subject_token':
token = token_object.token
if token_type == 'refresh_token':
token = token_object.refresh_token
if token_type and token:
exit()
# oas.key_setup("static", sig={"format": "jwk", "alg": "rsa"})
else:
jwks_file_name = JWKS_FILE_NAME
f = open(jwks_file_name, "w")
for key in jwks["keys"]:
for k in key.keys():
key[k] = as_unicode(key[k])
f.write(json.dumps(jwks))
f.close()
oas.jwks_uri = "{}/{}".format(oas.baseurl, jwks_file_name)
# Initiate the SessionDB
_code = DefaultToken(rndstr(32), rndstr(32), typ='A', lifetime=600)
_token = JWTToken('T', oas.keyjar, {'code': 3600, 'token': 900},
iss=config.issuer, sign_alg='RS256')
_refresh_token = JWTToken('R', oas.keyjar, {'': 86400}, iss=config.issuer,
sign_alg='RS256')
oas.sdb = SessionDB(config.SERVICE_URL,
db={},
code_factory=_code,
token_factory=_token,
refresh_token_factory=_refresh_token)
# set some parameters
try:
oas.cookie_ttl = config.COOKIETTL
except AttributeError:
pass
def make_authentication_request(self, session):
session["state"] = rndstr()
session["nonce"] = rndstr()
request_args = {
"response_type": self.response_type,
"state": session["state"],
"nonce": session["nonce"],
"redirect_uri": self.redirect_uri
}
request_args.update(self.behaviour)
auth_req = session["client"].construct_AuthorizationRequest(
request_args=request_args)
login_url = auth_req.request(session["client"].authorization_endpoint)
raise cherrypy.HTTPRedirect(login_url, 303)
Begin the OAuth2 flow.
:param baseurl: The RPs base
:param request: The Authorization query
:param response_type: The response type the AS should use. Default 'code'.
:return: A URL to which the user should be redirected
"""
logger.debug("- begin -")
# Store the request and the redirect uri used
self.redirect_uris = ["%s%s" % (baseurl, self.authz_page)]
self._request = request
# Put myself in the dictionary of sessions, keyed on session-id
if not self.seed:
self.seed = rndstr()
sid = stateID(request, self.seed)
self.grant[sid] = Grant(seed=self.seed)
self._backup(sid)
self.sdb["seed:%s" % self.seed] = sid
if not response_type:
response_type = self.response_type
location = self.request_info(
AuthorizationRequest,
method="GET",
scope=self.scope,
request_args={"state": sid, "response_type": response_type},
)[0]
message_factory=message_factory,
)
self.authn_broker = authn_broker
if authn_broker is None:
# default cookie function
self.cookie_func = CookieDealer(srv=self).create_cookie
else:
self.cookie_func = self.authn_broker[0][0].create_cookie
for item in self.authn_broker:
item.srv = self
self.authz = authz
self.client_authn = client_authn
self.symkey = symkey
self.seed = rndstr().encode("utf-8")
self.iv = iv or os.urandom(16)
self.cookie_name = "pyoidc"
self.cookie_domain = ""
self.cookie_path = ""
self.default_scope = default_scope
self.sso_ttl = 0
self.default_acr = default_acr
if urlmap is None:
self.urlmap = {} # type: Dict[str, List[str]]
else:
self.urlmap = urlmap
self.response_type_map = {
"code": code_response,
"token": token_response,
_rat = rndstr(32)
reg_enp = ""
for endp in self.endp:
if endp.etype == "registration":
reg_enp = urljoin(self.baseurl, endp.url)
break
self.cdb[client_id] = {
"client_id": client_id,
"client_secret": client_secret,
"registration_access_token": _rat,
"registration_client_uri": "%s?client_id=%s" % (reg_enp, client_id),
"client_secret_expires_at": self.client_secret_expiration_time(),
"client_id_issued_at": utc_time_sans_frac(),
"client_salt": rndstr(8),
}
_cinfo = self.do_client_registration(
request,
client_id,
ignore=["redirect_uris", "policy_uri", "logo_uri", "tos_uri"],
)
if isinstance(_cinfo, Response):
return _cinfo
response_cls = self.server.message_factory.get_response_type(
"registration_endpoint"
)
args = dict([(k, v) for k, v in _cinfo.items() if k in response_cls.c_param])
self.comb_uri(args)
return error_response(
"invalid_configuration_parameter", descr="%s" % err
)
request.rm_blanks()
try:
self.match_client_request(request)
except CapabilitiesMisMatch as err:
return error_response(
"invalid_request", descr="Don't support proposed %s" % err
)
# create new id och secret
client_id = rndstr(12)
while client_id in self.cdb:
client_id = rndstr(12)
client_secret = secret(self.seed, client_id)
_rat = rndstr(32)
reg_enp = ""
for endp in self.endp:
if endp.etype == "registration":
reg_enp = urljoin(self.baseurl, endp.url)
break
self.cdb[client_id] = {
"client_id": client_id,
"client_secret": client_secret,
"registration_access_token": _rat,
"registration_client_uri": "%s?client_id=%s" % (reg_enp, client_id),
"client_secret_expires_at": self.client_secret_expiration_time(),
def _distributed(self, info):
# store the user info so it can be accessed later
access_token = rndstr()
self.info_store[access_token] = info
return UserClaimsResponse(
endpoint=self.claims_userinfo_endpoint,
access_token=access_token,
claims_names=info.keys(),
)