Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
return "%s (%s)" % (self.status, self.message)
class TransportError(Exception):
"""Something went wrong while trying to execute the request."""
def __init__(self, base_exception=None):
self.base_exception = base_exception
def __str__(self):
if self.base_exception:
return str(self.base_exception)
return "An unknown error occurred."
class HTTPError(TransportError):
"""An unexpected HTTP error occurred."""
def __init__(self, status_code):
self.status_code = status_code
def __str__(self):
return "HTTP Error: %d" % self.status_code
class Timeout(Exception):
"""The request timed out."""
pass
class _RetriableRequest(Exception):
"""Signifies that the request can be retried."""
pass
class _OverQueryLimit(ApiError, _RetriableRequest):
requests_kwargs = requests_kwargs or {}
final_requests_kwargs = dict(self.requests_kwargs, **requests_kwargs)
# Determine GET/POST.
requests_method = self.session.get
if post_json is not None:
requests_method = self.session.post
final_requests_kwargs["json"] = post_json
try:
response = requests_method(base_url + authed_url,
**final_requests_kwargs)
except requests.exceptions.Timeout:
raise googlemaps.exceptions.Timeout()
except Exception as e:
raise googlemaps.exceptions.TransportError(e)
if response.status_code in _RETRIABLE_STATUSES:
# Retry request.
return self._request(url, params, first_request_time,
retry_counter + 1, base_url, accepts_clientid,
extract_body, requests_kwargs, post_json)
# Check if the time of the nth previous query (where n is
# queries_per_second) is under a second ago - if so, sleep for
# the difference.
if self.sent_times and len(self.sent_times) == self.queries_per_second:
elapsed_since_earliest = time.time() - self.sent_times[0]
if elapsed_since_earliest < 1:
time.sleep(1 - elapsed_since_earliest)
try:
delay_seconds = 0.5 * 1.5 ** (retry_counter - 1)
# Jitter this value by 50% and pause.
time.sleep(delay_seconds * (random.random() + 0.5))
url = self._generate_auth_url(url, params)
try:
resp = requests.get(_BASE_URL + url,
headers={"User-Agent": _USER_AGENT},
timeout=self.timeout,
verify=True) # NOTE(cbro): verify SSL certs.
except requests.exceptions.Timeout:
raise googlemaps.exceptions.Timeout()
except Exception as e:
raise googlemaps.exceptions.TransportError(e)
if resp.status_code in _RETRIABLE_STATUSES:
# Retry request.
return self._get(url, params, first_request_time, retry_counter + 1)
if resp.status_code != 200:
raise googlemaps.exceptions.HTTPError(resp.status_code)
body = resp.json()
api_status = body["status"]
if api_status == "OK" or api_status == "ZERO_RESULTS":
return body
if api_status == "OVER_QUERY_LIMIT":
# Retry request.