Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def fetch_access_token(self):
"""Helper method to fetch and set access token.
Returns token string.
"""
oauth = OAuth1(client_key=self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self._token['key'],
resource_owner_secret=self._token['secret'],
verifier=self._token['verifier'],
callback_uri=self.callback_url,
signature_type='auth_header')
response = self._session.post(url=self.access_token_url, auth=oauth, timeout=self.timeout)
self.log_request(response)
if response.status_code != 200:
raise OAuthException("OAuth token verification failed (%s) %s" % (response.status_code, response.text))
self._token = parse_qs(response.text)['oauth_token'][0]
return self._token
if not password:
password = self._password
if not email or not password:
raise Exception('Missing email or password')
data = {
'email': email,
'password': password,
'oauth_token': self._token['key']}
response = self._session.post(url=self.authorization_url, data=data, timeout=self.timeout)
self.log_request(response)
if response.status_code != 200:
raise OAuthException("OAuth login failed (%s) %s" % (response.status_code, response.text))
# set token verifier
self._token['verifier'] = parse_qs(response.text)['oauth_verifier'][0]
def logoff(self):
"""Returns self after deleting authenticated session."""
if self.api_path == API_PATH_V1:
response = self._session.delete(self._resolve_url('/a/session'), timeout=self.timeout)
elif self.api_path == API_PATH_V2:
response = self._session.delete(self._resolve_url('/session'), timeout=self.timeout)
elif self.api_path == API_PATH_SSO:
oauth = OAuth1(client_key=self.consumer_key,
resource_owner_key=self._token,
callback_uri=self.callback_url,
signature_type='query')
response = self._session.delete(url=self.access_token_url, auth=oauth, timeout=self.timeout)
if response.status_code != 204:
raise OAuthException("OAuth token deletion failed (%s) %s" % (response.status_code, response.text))
else:
raise UnknownAPIFormatError(
'Unrecognized API path: %s' % self.api_path)
self.log_request(response)
return self
def fetch_request_token(self):
"""Helper method to fetch and set request token.
Returns token string.
"""
oauth = OAuth1(client_key=self.consumer_key,
client_secret=self.consumer_secret,
callback_uri=self.callback_url,
signature_type='auth_header')
response = self._session.post(url=self.request_token_url, auth=oauth, timeout=self.timeout)
self.log_request(response)
if response.status_code != 200:
raise OAuthException("OAuth token request failed (%s) %s" % (response.status_code, response.text))
credentials = parse_qs(response.text)
self._token = {'key': credentials['oauth_token'][0],
'secret': credentials['oauth_token_secret'][0]}
return self._token