Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _get_body(self, response):
if response.status != 200:
raise googlemaps.exceptions.HTTPError(response.status)
body = await response.json()
api_status = body['status']
if api_status == 'OK' or api_status == 'ZERO_RESULTS':
return body
if api_status == 'OVER_QUERY_LIMIT':
raise googlemaps.exceptions._OverQueryLimit(
api_status, body.get('error_message'))
raise googlemaps.exceptions.ApiError(api_status,
body.get('error_message'))
def _get_body(self, response):
if response.status_code != 200:
raise googlemaps.exceptions.HTTPError(response.status_code)
body = response.json()
api_status = body["status"]
if api_status == "OK" or api_status == "ZERO_RESULTS":
return body
if api_status == "OVER_QUERY_LIMIT":
raise googlemaps.exceptions._OverQueryLimit(
api_status, body.get("error_message"))
raise googlemaps.exceptions.ApiError(api_status,
body.get("error_message"))
# queries_per_second) is under a second ago - if so, sleep for
# the difference.
if self.sent_times and len(self.sent_times) == self.queries_per_second:
elapsed_since_earliest = time.time() - self.sent_times[0]
if elapsed_since_earliest < 1:
time.sleep(1 - elapsed_since_earliest)
try:
if extract_body:
result = extract_body(response)
else:
result = self._get_body(response)
self.sent_times.append(time.time())
return result
except googlemaps.exceptions._RetriableRequest as e:
if isinstance(e, googlemaps.exceptions._OverQueryLimit) and not self.retry_over_query_limit:
raise
# Retry request.
return self._request(url, params, first_request_time,
retry_counter + 1, base_url, accepts_clientid,
extract_body, requests_kwargs, post_json)