Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_crawl_extractor(nursery):
# Create test fixtures.
job_id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
db = Mock()
db.delete_frontier_item = AsyncMock()
db.insert_frontier_items = AsyncMock()
to_extractor, extractor_recv = trio.open_memory_channel(0)
extractor_send, from_extractor = trio.open_memory_channel(0)
created_at = datetime(2018,12,31,13,47,00)
policy_doc = {
'id': 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
'name': 'Test',
'created_at': created_at,
'updated_at': created_at,
'authentication': {
'enabled': False,
},
'limits': {
'max_cost': 10,
'max_duration': 3600,
'max_items': 10_000,
},
'mime_type_rules': [
def outgoing_message_channels():
return trio.open_memory_channel(0)
def vote_channels():
return trio.open_memory_channel(0)
old_urls = pickle.loads(job_doc['old_urls'])
except KeyError:
# If old URLs are not in the job_doc, then this is a new job and
# we should intialize old_urls to the seed URLs.
old_urls = set()
for seed in job_doc['seeds']:
url_can = policy.url_normalization.normalize(seed)
hash_ = hashlib.blake2b(url_can.encode('ascii'), digest_size=16)
old_urls.add(hash_.digest())
# Set up channels
frontier_send = self._rate_limiter.get_request_channel()
rate_limiter_reset = self._rate_limiter.get_reset_channel()
downloader_recv = self._rate_limiter.add_job(job_id)
downloader_send, storage_recv = trio.open_memory_channel(0)
storage_send, extractor_recv = trio.open_memory_channel(0)
extractor_send, pipeline_end = trio.open_memory_channel(0)
# Set up crawling components.
stats_dict = {
'id': job_id,
'run_state': job_doc['run_state'],
'name': job_doc['name'],
'seeds': job_doc['seeds'],
'tags': job_doc['tags'],
'started_at': job_doc['started_at'],
'completed_at': job_doc['completed_at'],
'item_count': job_doc['item_count'],
'http_success_count': job_doc['http_success_count'],
'http_error_count': job_doc['http_error_count'],
'exception_count': job_doc['exception_count'],
'http_status_counts': job_doc['http_status_counts'],
def add_job(self, job_id):
'''
Add a job to the rate limiter. Returns a send channel that requests for
this job will be sent to.
:param str job_id: A job ID.
'''
job_send, job_recv = trio.open_memory_channel(0)
self._job_channels[job_id] = job_send
return job_recv
if (remote_node_id, request_id) in self.response_handler_send_channels:
raise ValueError(
f"Response handler for node id {encode_hex(remote_node_id)} and request id "
f"{request_id} has already been added"
)
self.logger.debug(
"Adding response handler for peer %s and request id %d",
encode_hex(remote_node_id),
request_id,
)
response_channels: Tuple[
SendChannel[IncomingMessage],
ReceiveChannel[IncomingMessage],
] = trio.open_memory_channel(0)
self.response_handler_send_channels[(remote_node_id, request_id)] = response_channels[0]
def remove() -> None:
try:
self.response_handler_send_channels.pop((remote_node_id, request_id))
except KeyError:
raise ValueError(
f"Response handler for node id {encode_hex(remote_node_id)} and request id "
f"{request_id} has already been removed"
)
else:
self.logger.debug(
"Removing response handler for peer %s and request id %d",
encode_hex(remote_node_id),
request_id,
)
def __init__(self, app: ASGIFramework, config: Config) -> None:
self.app = app
self.config = config
self.startup = trio.Event()
self.shutdown = trio.Event()
self.app_send_channel, self.app_receive_channel = trio.open_memory_channel(
config.max_app_queue_size
)
self.supported = True
except KeyError:
# If old URLs are not in the job_doc, then this is a new job and
# we should intialize old_urls to the seed URLs.
old_urls = set()
for seed in job_doc['seeds']:
url_can = policy.url_normalization.normalize(seed)
hash_ = hashlib.blake2b(url_can.encode('ascii'), digest_size=16)
old_urls.add(hash_.digest())
# Set up channels
frontier_send = self._rate_limiter.get_request_channel()
rate_limiter_reset = self._rate_limiter.get_reset_channel()
downloader_recv = self._rate_limiter.add_job(job_id)
downloader_send, storage_recv = trio.open_memory_channel(0)
storage_send, extractor_recv = trio.open_memory_channel(0)
extractor_send, pipeline_end = trio.open_memory_channel(0)
# Set up crawling components.
stats_dict = {
'id': job_id,
'run_state': job_doc['run_state'],
'name': job_doc['name'],
'seeds': job_doc['seeds'],
'tags': job_doc['tags'],
'started_at': job_doc['started_at'],
'completed_at': job_doc['completed_at'],
'item_count': job_doc['item_count'],
'http_success_count': job_doc['http_success_count'],
'http_error_count': job_doc['http_error_count'],
'exception_count': job_doc['exception_count'],
'http_status_counts': job_doc['http_status_counts'],
}
async def recycle_or_close(self) -> None:
if self.connection.our_state is h11.DONE:
await self.app_send_channel.aclose()
await self.app_receive_channel.aclose()
self.connection.start_next_cycle()
self.app_send_channel, self.app_receive_channel = trio.open_memory_channel(10)
self.response = None
self.scope = None
self.state = ASGIHTTPState.REQUEST
else:
raise MustCloseError()
def __init__(self, app: ASGIFramework, config: Config, asend: Callable) -> None:
self.app = app
self.config = config
self.response: Optional[dict] = None
self.scope: Optional[dict] = None
self.state = ASGIWebsocketState.CONNECTED
self.connection: Optional[wsproto.connection.Connection] = None
self.asend = asend # type: ignore
self.app_send_channel, self.app_receive_channel = trio.open_memory_channel(10)