Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
scope=["inner", "outer"],
extra=["local", "external"],
level=3)
uec = atr.to_urlencoded()
assert query_string_compare(uec,
"scope=inner+outer&level=3&expires_in=3600&token_type=example&extra=local&extra=external&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA&access_token=2YotnFZFEjr1zCsicMWpAA&example_parameter=example_value")
del atr["extra"]
ouec = atr.to_urlencoded()
assert query_string_compare(ouec,
"access_token=2YotnFZFEjr1zCsicMWpAA&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA&level=3&example_parameter=example_value&token_type=example&expires_in=3600&scope=inner+outer")
assert len(uec) == (len(ouec) + len("extra=local") +
len("extra=external") + 2)
atr2 = AccessTokenResponse().deserialize(uec, "urlencoded")
assert _eq(atr2.keys(), ['access_token', 'expires_in', 'token_type',
'scope', 'refresh_token', 'level',
'example_parameter', 'extra'])
atr3 = AccessTokenResponse().deserialize(ouec, "urlencoded")
assert _eq(atr3.keys(), ['access_token', 'expires_in', 'token_type',
'scope', 'refresh_token', 'level',
'example_parameter'])
def test_get_access_token_refresh_from_state(self):
self.client.grant["foo"] = Grant()
_get = time_util.utc_time_sans_frac() + 60
self.client.grant["foo"].grant_expiration_time = _get
self.client.grant["foo"].code = "access_code"
resp = AccessTokenResponse(
refresh_token="refresh_with_me", access_token="access"
)
self.client.grant["foo"].tokens.append(Token(resp))
# Uses refresh_token from previous response
atr = self.client.construct_RefreshAccessTokenRequest(state="foo")
assert isinstance(atr, RefreshAccessTokenRequest)
assert atr["grant_type"] == "refresh_token"
assert atr["refresh_token"] == "refresh_with_me"
def test_json_serialize(self):
at = AccessTokenResponse(
access_token="SlAV32hkKG", token_type="Bearer", expires_in=3600
)
atj = at.serialize(method="json")
atj_obj = json.loads(atj)
expected_atj_obj = {
"token_type": "Bearer",
"access_token": "SlAV32hkKG",
"expires_in": 3600,
}
assert atj_obj == expected_atj_obj
_sdb.do_sub(sid, "client_salt")
# Construct Access token request
areq = AccessTokenRequest(
code=access_grant,
client_id="client1",
redirect_uri="http://example.com/authz",
client_secret="hemlighet",
grant_type="authorization_code",
state="state",
)
txt = areq.to_urlencoded()
resp = self.provider.token_endpoint(request=txt)
atr = AccessTokenResponse().deserialize(resp.message, "json")
assert atr["token_type"] == "Bearer"
def token_response(**kwargs):
_areq = kwargs["areq"]
_scode = kwargs["scode"]
_sdb = kwargs["sdb"]
_dic = _sdb.upgrade_to_token(_scode, issue_refresh=False)
aresp = AccessTokenResponse(**by_schema(AccessTokenResponse, **_dic))
try:
aresp["state"] = _areq["state"]
except KeyError:
pass
add_non_standard(_areq, aresp)
return aresp
def token_response(**kwargs):
_areq = kwargs["areq"]
_scode = kwargs["scode"]
_sdb = kwargs["sdb"]
_dic = _sdb.upgrade_to_token(_scode, issue_refresh=False)
aresp = AccessTokenResponse(**by_schema(AccessTokenResponse, **_dic))
try:
aresp["state"] = _areq["state"]
except KeyError:
pass
add_non_standard(_areq, aresp)
return aresp
def token_request(self, device_code=""):
req = TokenRequest(
grant_type="urn:ietf:params:oauth:grant-type:device_code",
device_code=device_code,
client_id=self.host.client_id,
)
http_response = self.host.http_request(
self.host.provider_info["token_endpoint"], "POST", req.to_urlencoded()
)
response = self.host.parse_request_response(
AccessTokenResponse, http_response, "json"
)
return response
c_param.update({"id_token": SINGLE_OPTIONAL_STRING})
def verify(self, **kwargs):
super().verify(**kwargs)
if "id_token" in self:
# replace the JWT with the verified IdToken instance
self["id_token"] = verify_id_token(self, **kwargs)
return True
class UserInfoRequest(Message):
c_param = {"access_token": SINGLE_OPTIONAL_STRING}
class AuthorizationResponse(message.AuthorizationResponse, message.AccessTokenResponse):
c_param = message.AuthorizationResponse.c_param.copy()
c_param.update(message.AccessTokenResponse.c_param)
c_param.update(
{
"code": SINGLE_OPTIONAL_STRING,
"access_token": SINGLE_OPTIONAL_STRING,
"token_type": SINGLE_OPTIONAL_STRING,
"id_token": SINGLE_OPTIONAL_IDTOKEN,
}
)
def verify(self, **kwargs):
super().verify(**kwargs)
if "aud" in self:
if "client_id" in kwargs:
def __init__(self, client_id, client_secret, **kwargs):
Social.__init__(self, client_id, client_secret, **kwargs)
self.access_token_response = AccessTokenResponse
try:
self._scope = ",".join(self.extra["scope"])
except KeyError:
self._scope = ""
self.token_response_body_type = "urlencoded"