Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""An unexpected HTTP error occurred."""
def __init__(self, status_code):
self.status_code = status_code
def __str__(self):
return "HTTP Error: %d" % self.status_code
class Timeout(Exception):
"""The request timed out."""
pass
class _RetriableRequest(Exception):
"""Signifies that the request can be retried."""
pass
class _OverQueryLimit(ApiError, _RetriableRequest):
"""Signifies that the request failed because the client exceeded its query rate limit.
Normally we treat this as a retriable condition, but we allow the calling code to specify that these requests should
not be retried.
"""
pass
# Check if the time of the nth previous query (where n is
# queries_per_second) is under a second ago - if so, sleep for
# the difference.
if self.sent_times and len(self.sent_times) == self.queries_per_second:
elapsed_since_earliest = time.time() - self.sent_times[0]
if elapsed_since_earliest < 1:
time.sleep(1 - elapsed_since_earliest)
try:
if extract_body:
result = extract_body(response)
else:
result = self._get_body(response)
self.sent_times.append(time.time())
return result
except googlemaps.exceptions._RetriableRequest as e:
if isinstance(e, googlemaps.exceptions._OverQueryLimit) and not self.retry_over_query_limit:
raise
# Retry request.
return self._request(url, params, first_request_time,
retry_counter + 1, base_url, accepts_clientid,
extract_body, requests_kwargs, post_json)
def _geolocation_extract(response):
"""
Mimics the exception handling logic in ``client._get_body``, but
for geolocation which uses a different response format.
"""
body = response.json()
if response.status_code in (200, 404):
return body
elif response.status_code == 403:
raise exceptions._RetriableRequest()
else:
try:
error = body["error"]["errors"][0]["reason"]
except KeyError:
error = None
raise exceptions.ApiError(response.status_code, error)
try:
j = resp.json()
except:
if resp.status_code != 200:
raise googlemaps.exceptions.HTTPError(resp.status_code)
raise googlemaps.exceptions.ApiError("UNKNOWN_ERROR",
"Received a malformed response.")
if "error" in j:
error = j["error"]
status = error["status"]
if status == "RESOURCE_EXHAUSTED":
raise googlemaps.exceptions._RetriableRequest()
if "message" in error:
raise googlemaps.exceptions.ApiError(status, error["message"])
else:
raise googlemaps.exceptions.ApiError(status)
if resp.status_code != 200:
raise googlemaps.exceptions.HTTPError(resp.status_code)
return j