Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, authorizer):
"""Preprare the connection to reddit's API.
:param authorizer: An instance of :class:`Authorizer`.
"""
if not isinstance(authorizer, BaseAuthorizer):
raise InvalidInvocation(
"invalid Authorizer: {}".format(authorizer)
)
self._authorizer = authorizer
self._rate_limiter = RateLimiter()
self._retry_strategy_class = FiniteRetryStrategy
def refresh(self):
"""Obtain a new access token from the refresh_token."""
if self.refresh_token is None:
raise InvalidInvocation("refresh token not provided")
self._request_token(
grant_type="refresh_token", refresh_token=self.refresh_token
)
def _validate_authenticator(self):
if not isinstance(self._authenticator, self.AUTHENTICATOR_CLASS):
raise InvalidInvocation(
"Must use a authenticator of type {}.".format(
self.AUTHENTICATOR_CLASS.__name__
)
session=None,
):
"""Create an instance of the Requestor class.
:param user_agent: The user-agent for your application. Please follow
reddit's user-agent guidlines:
https://github.com/reddit/reddit/wiki/API#rules
:param oauth_url: (Optional) The URL used to make OAuth requests to the
reddit site. (Default: https://oauth.reddit.com)
:param reddit_url: (Optional) The URL used when obtaining access
tokens. (Default: https://www.reddit.com)
:param session: (Optional) A session to handle requests, compatible
with requests.Session(). (Default: None)
"""
if user_agent is None or len(user_agent) < 7:
raise InvalidInvocation("user_agent is not descriptive")
self._http = session or requests.Session()
self._http.headers["User-Agent"] = "{} prawcore/{}".format(
user_agent, __version__
)
self.oauth_url = oauth_url
self.reddit_url = reddit_url
:param duration: Either ``permanent`` or ``temporary``. ``temporary``
authorizations generate access tokens that last only 1
hour. ``permanent`` authorizations additionally generate a refresh
token that can be indefinitely used to generate new hour-long
access tokens. Only ``temporary`` can be specified if ``implicit``
is set to ``True``.
:param scopes: A list of OAuth scopes to request authorization for.
:param state: A string that will be reflected in the callback to
``redirect_uri``. This value should be temporarily unique to the
client for whom the URL was generated for.
:param implicit: (optional) Use the implicit grant flow (default:
False). This flow is only available for UntrustedAuthenticators.
"""
if self.redirect_uri is None:
raise InvalidInvocation("redirect URI not provided")
if implicit and not isinstance(self, UntrustedAuthenticator):
raise InvalidInvocation(
"Only UntrustedAuthentictor instances can "
"use the implicit grant flow."
)
if implicit and duration != "temporary":
raise InvalidInvocation(
"The implicit grant flow only supports "
"temporary access tokens."
)
params = {
"client_id": self.client_id,
"duration": duration,
"redirect_uri": self.redirect_uri,
"response_type": "token" if implicit else "code",
hour. ``permanent`` authorizations additionally generate a refresh
token that can be indefinitely used to generate new hour-long
access tokens. Only ``temporary`` can be specified if ``implicit``
is set to ``True``.
:param scopes: A list of OAuth scopes to request authorization for.
:param state: A string that will be reflected in the callback to
``redirect_uri``. This value should be temporarily unique to the
client for whom the URL was generated for.
:param implicit: (optional) Use the implicit grant flow (default:
False). This flow is only available for UntrustedAuthenticators.
"""
if self.redirect_uri is None:
raise InvalidInvocation("redirect URI not provided")
if implicit and not isinstance(self, UntrustedAuthenticator):
raise InvalidInvocation(
"Only UntrustedAuthentictor instances can "
"use the implicit grant flow."
)
if implicit and duration != "temporary":
raise InvalidInvocation(
"The implicit grant flow only supports "
"temporary access tokens."
)
params = {
"client_id": self.client_id,
"duration": duration,
"redirect_uri": self.redirect_uri,
"response_type": "token" if implicit else "code",
"scope": " ".join(scopes),
"state": state,
def revoke(self):
"""Revoke the current Authorization."""
if self.access_token is None:
raise InvalidInvocation("no token available to revoke")
self._authenticator.revoke_token(self.access_token, "access_token")
self._clear_access_token()
:param state: A string that will be reflected in the callback to
``redirect_uri``. This value should be temporarily unique to the
client for whom the URL was generated for.
:param implicit: (optional) Use the implicit grant flow (default:
False). This flow is only available for UntrustedAuthenticators.
"""
if self.redirect_uri is None:
raise InvalidInvocation("redirect URI not provided")
if implicit and not isinstance(self, UntrustedAuthenticator):
raise InvalidInvocation(
"Only UntrustedAuthentictor instances can "
"use the implicit grant flow."
)
if implicit and duration != "temporary":
raise InvalidInvocation(
"The implicit grant flow only supports "
"temporary access tokens."
)
params = {
"client_id": self.client_id,
"duration": duration,
"redirect_uri": self.redirect_uri,
"response_type": "token" if implicit else "code",
"scope": " ".join(scopes),
"state": state,
}
url = self._requestor.reddit_url + const.AUTHORIZATION_PATH
request = Request("GET", url, params=params)
return request.prepare().url
def authorize(self, code):
"""Obtain and set authorization tokens based on ``code``.
:param code: The code obtained by an out-of-band authorization request
to Reddit.
"""
if self._authenticator.redirect_uri is None:
raise InvalidInvocation("redirect URI not provided")
self._request_token(
code=code,
grant_type="authorization_code",
redirect_uri=self._authenticator.redirect_uri,
)