Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def handle_api_error(self, http_status, http_body, response):
try:
error = response['error']
except (KeyError, TypeError):
raise Error("Invalid response from API: (%d) %r " % (http_status, http_body), http_status, http_body)
try:
raise Error(error.get('message', ''), http_status, http_body)
except AttributeError:
raise Error(error, http_status, http_body)
headers = {
'X-Client-User-Agent': json.dumps(ua),
'User-Agent': USER_AGENT,
'Authorization': 'Bearer %s' % my_api_key,
'Content-type': 'application/x-www-form-urlencoded'
}
if timeout > _max_timeout:
raise Error("`timeout` must not exceed %d; it is %d" % (_max_timeout, timeout))
if request_lib == 'urlfetch':
http_body, http_status = self.urlfetch_request(method, abs_url, headers, params)
elif request_lib == 'requests':
http_body, http_status = self.requests_request(method, abs_url, headers, params)
else:
raise Error("Bug discovered: invalid request_lib: %s. "
"Please report to contact@easypost.com." % request_lib)
return http_body, http_status, my_api_key
raise Error("Bug discovered: invalid request method: %s. "
"Please report to contact@easypost.com." % method)
try:
result = requests_session.request(
method,
abs_url,
headers=headers,
data=data,
timeout=timeout,
verify=True,
)
http_body = result.text
http_status = result.status_code
except Exception as e:
raise Error("Unexpected error communicating with EasyPost. If this "
"problem persists please let us know at contact@easypost.com.")
return http_body, http_status
def request_raw(self, method, url, params=None, apiKeyRequired=True):
if params is None:
params = {}
my_api_key = self._api_key or api_key
if apiKeyRequired and my_api_key is None:
raise Error(
'No API key provided. Set an API key via "easypost.api_key = \'APIKEY\'. '
'Your API keys can be found in your EasyPost dashboard, or you can email us '
'at contact@easypost.com for assistance.')
abs_url = self.api_url(url)
params = self._objects_to_ids(params)
ua = {
'client_version': VERSION,
'lang': 'python',
'publisher': 'easypost',
'request_lib': request_lib,
}
for attr, func in (('lang_version', platform.python_version),
('platform', platform.platform),
('uname', lambda: ' '.join(platform.uname()))):
def handle_api_error(self, http_status, http_body, response):
try:
error = response['error']
except (KeyError, TypeError):
raise Error("Invalid response from API: (%d) %r " % (http_status, http_body), http_status, http_body)
try:
raise Error(error.get('message', ''), http_status, http_body)
except AttributeError:
raise Error(error, http_status, http_body)
def requests_request(self, method, abs_url, headers, params):
method = method.lower()
if method == 'get' or method == 'delete':
if params:
abs_url = self.build_url(abs_url, params)
data = None
elif method == 'post' or method == 'put':
data = self.encode(params)
else:
raise Error("Bug discovered: invalid request method: %s. "
"Please report to contact@easypost.com." % method)
try:
result = requests_session.request(
method,
abs_url,
headers=headers,
data=data,
timeout=timeout,
verify=True,
)
http_body = result.text
http_status = result.status_code
except Exception as e:
raise Error("Unexpected error communicating with EasyPost. If this "
"problem persists please let us know at contact@easypost.com.")
elif method == 'get' or method == 'delete':
abs_url = self.build_url(abs_url, params)
else:
raise Error("Bug discovered: invalid request method: %s. Please report "
"to contact@easypost.com." % method)
args['url'] = abs_url
args['method'] = method
args['headers'] = headers
args['validate_certificate'] = False
args['deadline'] = timeout
try:
result = urlfetch.fetch(**args)
except:
raise Error("Unexpected error communicating with EasyPost. "
"If this problem persists, let us know at contact@easypost.com.")
return result.content, result.status_code