Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _fake_provider_merge(isbn):
"""Fake provider 'merge' service."""
try: # pragma: no cover
named_tasks = (('openl', _oed), ('thingl', _ted), ('wiki', _wiki))
results = vias.parallel(named_tasks, isbn)
odata = results.get('openl', set())
tdata = results.get('thingl', set())
wdata = results.get('wiki', set())
return list(odata | tdata | wdata)
except Exception: # pragma: no cover
LOGGER.error("Some error on editions 'merge' service for %s!", isbn)
return [isbn]
def query(isbn, processor=None):
"""Query function for the 'merge provider' (waterfall model)."""
if not processor:
processor = config.options.get('VIAS_MERGE', processor).lower()
if not processor: # pragma: no cover
processor = 'parallel'
named_tasks = (('wcat', qwcat), ('goob', qgoob), ('openl', qopen))
if processor == 'parallel':
results = vias.parallel(named_tasks, isbn)
elif processor == 'serial':
results = vias.serial(named_tasks, isbn)
elif processor == 'multi':
results = vias.multi(named_tasks, isbn)
rw = results.get('wcat')
rg = results.get('goob') or results.get('openl')
if not rw and not rg: # pragma: no cover
return None
md = Metadata(rw) if rw else None
if md and rg:
# Overwrite with Authors, Publisher and Year from Google
md.merge(rg, overwrite=('Authors', 'Publisher', 'Year'))
def query(isbn,
qserv1,
qserv2,
overwrite=('Authors', 'Publisher', 'Language'),
processor=None):
"""Query function for the 'merge provider' (waterfall model)."""
if not processor:
processor = config.options.get('VIAS_MERGE', processor).lower()
if not processor: # pragma: no cover
processor = 'serial'
named_tasks = (('serv1', qserv1), ('serv2', qserv2))
if processor == 'parallel':
results = vias.parallel(named_tasks, isbn)
elif processor == 'serial':
results = vias.serial(named_tasks, isbn)
elif processor == 'multi':
results = vias.multi(named_tasks, isbn)
r1 = results.get('serv1')
r2 = results.get('serv2')
if not r1 and not r2:
return {}
md = Metadata(r1) if r1 else {}
if md and r2:
# Try to complete Authors, Publisher and Language from serv2
md.merge(r2, overwrite=overwrite)