Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def aleph_emit(context, data):
if not settings.ALEPH_HOST:
context.log.warning("No $MEMORIOUS_ALEPH_HOST, skipping upload...")
return
if not settings.ALEPH_API_KEY:
context.log.warning("No $MEMORIOUS_ALEPH_API_KEY, skipping upload...")
return
session_id = 'memorious:%s' % context.crawler.name
api = AlephAPI(settings.ALEPH_HOST, settings.ALEPH_API_KEY,
session_id=session_id)
collection_id = get_collection_id(context, api)
if collection_id is None:
context.log.warning("Cannot get aleph collection.")
return
content_hash = data.get('content_hash')
source_url = data.get('source_url', data.get('url'))
def aleph_emit(context, data):
if not settings.ALEPH_HOST:
context.log.warning("No $MEMORIOUS_ALEPH_HOST, skipping upload...")
return
if not settings.ALEPH_API_KEY:
context.log.warning("No $MEMORIOUS_ALEPH_API_KEY, skipping upload...")
return
session_id = 'memorious:%s' % context.crawler.name
api = AlephAPI(settings.ALEPH_HOST, settings.ALEPH_API_KEY,
session_id=session_id)
collection_id = get_collection_id(context, api)
if collection_id is None:
context.log.warning("Cannot get aleph collection.")
return
content_hash = data.get('content_hash')
source_url = data.get('source_url', data.get('url'))
foreign_id = data.get('foreign_id', data.get('request_id', source_url))
if context.skip_incremental(collection_id, foreign_id, content_hash):
context.log.info("Skip aleph upload: %s", foreign_id)
return
meta = {
'crawler': context.crawler.name,
'foreign_id': foreign_id,
def get_rate_limit(resource, limit=100, interval=60, unit=1):
return RateLimit(conn, resource, limit=limit, interval=interval, unit=unit)
def parse(context, data):
with context.http.rehash(data) as result:
if result.html is not None:
parse_html(context, data, result)
# Get extra metadata from the DOM
parse_for_metadata(context, data, result.html)
rules = context.params.get('store') or {'match_all': {}}
if Rule.get_rule(rules).apply(result):
context.emit(rule='store', data=data)
op.name = context.stage.name
op.run_id = context.run_id
op.status = Operation.STATUS_PENDING
session.add(op)
session.commit()
context.operation_id = op.id
try:
context.log.info('Running: %s', op.name)
res = func(context, data, *a, **kw)
op.status = Operation.STATUS_SUCCESS
return res
except Exception as exc:
# this should clear results and tags created by this op
session.rollback()
Event.save(op.id, Event.LEVEL_ERROR, exc=exc)
context.log.exception(exc)
finally:
if op.status == Operation.STATUS_PENDING:
op.status = Operation.STATUS_FAILED
op.ended_at = datetime.utcnow()
session.add(op)
session.commit()
def func_wrapper(context, data, *a, **kw):
op = Operation()
op.crawler = context.crawler.name
op.name = context.stage.name
op.run_id = context.run_id
op.status = Operation.STATUS_PENDING
session.add(op)
session.commit()
context.operation_id = op.id
try:
context.log.info('Running: %s', op.name)
res = func(context, data, *a, **kw)
op.status = Operation.STATUS_SUCCESS
return res
except Exception as exc:
# this should clear results and tags created by this op
session.rollback()
Event.save(op.id, Event.LEVEL_ERROR, exc=exc)
context.log.exception(exc)
finally:
if op.status == Operation.STATUS_PENDING:
def boot(self):
self.scheduler = get_rate_limit('scheduler',
unit=60,
interval=settings.SCHEDULER_INTERVAL,
limit=1)
def _rate_limit(self, url):
resource = urlparse(url).netloc or url
limit = self.context.get('http_rate_limit', settings.HTTP_RATE_LIMIT)
rate_limit = get_rate_limit(resource, limit=limit)
rate_limit.comply()
def _upsert(context, params, data):
"""Insert or update data and add/update appropriate timestamps"""
table = params.get("table")
table = datastore.get_table(table, primary_id=False)
unique_keys = ensure_list(params.get("unique"))
data["__last_seen"] = datetime.datetime.utcnow()
if len(unique_keys):
updated = table.update(data, unique_keys, return_count=True)
if updated:
return
data["__first_seen"] = data["__last_seen"]
rate_limit = get_rate_limit("db", limit=settings.DB_RATE_LIMIT)
rate_limit.comply()
table.insert(data)
def cli(debug, cache, incremental):
"""Crawler framework for documents and structured scrapers."""
settings.HTTP_CACHE = cache
settings.INCREMENTAL = incremental
settings.DEBUG = debug
if settings.DEBUG:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
init_memorious()