Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
raise ex
headers = data.pop(0)
types = [self._field_type(header, year) for header in headers]
results = [{header : (cast(item) if item is not None else None)
for header, cast, item
in zip(headers, types, d)}
for d in data]
return results
elif resp.status_code == 204:
return []
else:
raise CensusException(resp.text)
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def merge(dicts):
return dict(item for d in dicts for item in d.items())
class CensusException(Exception):
pass
class UnsupportedYearException(CensusException):
pass
class Client(object):
endpoint_url = 'https://api.census.gov/data/%s/%s'
definitions_url = 'https://api.census.gov/data/%s/%s/variables.json'
definition_url = 'https://api.census.gov/data/%s/%s/variables/%s.json'
groups_url = 'https://api.census.gov/data/%s/%s/groups.json'
def __init__(self, key, year=None, session=None, retries=3):
self._key = key
self.session = session or new_session()
if year:
self.default_year = year
self.retries = retries
def wrapper(self, *args, **kwargs):
for _ in range(max(self.retries - 1, 0)):
try:
result = func(self, *args, **kwargs)
except CensusException as e:
if "There was an error while running your query. We've logged the error and we'll correct it ASAP. Sorry for the inconvenience." in str(e):
pass
else:
raise
else:
return result
else:
return func(self, *args, **kwargs)