Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_plugins(): # pragma: no cover
"""Load plugins with groups: isbnlib.metadata & isbnlib.formatters."""
# get metadata plugins from entry_points
if options.get('LOAD_METADATA_PLUGINS', True):
try:
for entry in iter_entry_points(group='isbnlib.metadata'):
add_service(entry.name, entry.load())
except Exception:
LOGGER.critical('Some metadata plugins were not loaded!')
global PROVIDERS
_buf = list(services.keys())
_buf.remove('default')
PROVIDERS = tuple(sorted(_buf))
# get formatters from entry_points
if options.get('LOAD_FORMATTER_PLUGINS', True):
try:
for entry in iter_entry_points(group='isbnlib.formatters'):
add_bibformatter(entry.name, entry.load())
except Exception:
LOGGER.critical('Some formatters plugins were not loaded!')
global BIBFORMATS
_buf = list(bibformatters.keys())
_buf.remove('labels')
_buf.remove('default')
BIBFORMATS = tuple(sorted(_buf))
def response(self):
"""Check errors on response."""
try:
response = urlopen(self._request,
timeout=options.get('URLOPEN_TIMEOUT'))
LOGGER.debug('Request headers:\n%s', self._request.header_items())
except HTTPError as e: # pragma: no cover
LOGGER.critical('ISBNLibHTTPError for %s with code %s [%s]',
self._url, e.code, e.msg)
if e.code in (401, 403, 429):
raise ISBNLibHTTPError('%s Are you making many requests?' %
e.code)
if e.code in (502, 504):
raise ISBNLibHTTPError('%s Service temporarily unavailable!' %
e.code)
raise ISBNLibHTTPError('(%s) %s' % (e.code, e.msg))
except URLError as e: # pragma: no cover
LOGGER.critical('ISBNLibURLError for %s with reason %s', self._url,
e.reason)
raise ISBNLibURLError(e.reason)
except timeout: # pragma: no cover
registry.set_cache(None)
else:
CACHE_FILE = '.metacache'
cache_path = os.path.join(CONF_PATH, CACHE_FILE)
from ._shelvecache import ShelveCache
try:
registry.set_cache(ShelveCache(cache_path))
except:
# stay with the default in-memory cache
pass
# set logger
fmt = "%(asctime)s;%(levelname)s;%(message)s"
if CONF_PATH:
log_path = os.path.join(CONF_PATH, 'isbntools.log')
debug = config.options.get('DEBUG', 'False')
level = logging.DEBUG if debug == 'True' else logging.CRITICAL
log_file = 'isbntools.DEBUG.log' if debug == 'True' else 'isbntools.log'
log_path = os.path.join(CONF_PATH, log_file)
logging.basicConfig(filename=log_path, level=level, format=fmt)
else:
logging.basicConfig(level=logging.CRITICAL, format=fmt)
# setup Windows console
if WINDOWS:
from ._console import set_msconsole
set_msconsole()
# set CONF_PATH
if not CONF_PATH:
if VIRTUAL:
CONF_PATH = os.path.join(sys.prefix, 'isbntools')
else:
CONF_PATH = os.path.join(os.getenv('APPDATA'), 'isbntools') \
if WINDOWS else os.path.expanduser('~/.isbntools')
# make the folder if it doesn't exist (see issue #101)!
try: # pragma: no cover
os.mkdir(CONF_PATH)
except:
pass
# set metadata cache
if config.options.get('CACHE', 'UNDEFINED').lower() == 'no':
registry.set_cache(None)
else:
CACHE_FILE = '.metacache'
cache_path = os.path.join(CONF_PATH, CACHE_FILE)
from ._shelvecache import ShelveCache
try:
registry.set_cache(ShelveCache(cache_path))
except:
# stay with the default in-memory cache
pass
# set logger
fmt = "%(asctime)s;%(levelname)s;%(message)s"
if CONF_PATH:
log_path = os.path.join(CONF_PATH, 'isbntools.log')
debug = config.options.get('DEBUG', 'False')
def load_plugins(): # pragma: no cover
"""Load plugins with groups: isbnlib.metadata & isbnlib.formatters."""
# get metadata plugins from entry_points
if options.get('LOAD_METADATA_PLUGINS', True):
try:
for entry in iter_entry_points(group='isbnlib.metadata'):
add_service(entry.name, entry.load())
except Exception:
LOGGER.critical('Some metadata plugins were not loaded!')
global PROVIDERS
_buf = list(services.keys())
_buf.remove('default')
PROVIDERS = tuple(sorted(_buf))
# get formatters from entry_points
if options.get('LOAD_FORMATTER_PLUGINS', True):
try:
for entry in iter_entry_points(group='isbnlib.formatters'):
add_bibformatter(entry.name, entry.load())
except Exception:
LOGGER.critical('Some formatters plugins were not loaded!')
results = {}
q = Queue()
def _worker(name, task, arg, q):
try: # pragma: no cover
q.put((name, task(arg)))
except Exception: # pragma: no cover
LOGGER.debug("No result in 'multi' for %s[%s](%s)", task, name,
arg)
q.put((name, None))
for name, task in named_tasks:
p = Process(target=_worker, args=(name, task, arg, q))
p.start()
p.join(options.get('THREADS_TIMEOUT'))
q.put('STOP')
while True:
el = q.get()
if el == 'STOP':
break
results[el[0]] = el[1]
return results
from threading import Thread
results = {}
def _worker(name, task, arg):
try:
results[name] = task(arg)
except Exception: # pragma: no cover
LOGGER.debug("No result in 'parallel' for %s[%s](%s)", task, name,
arg)
results[name] = None
for name, task in named_tasks:
t = Thread(target=_worker, args=(name, task, arg))
t.start()
t.join(options.get('THREADS_TIMEOUT'))
return results