Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_read_worker_with_parent_process_dead_and_should_not_exit(os):
"""
Test read workers exit when parent is dead and shutdown is not set
"""
# Setup SQS Queue
conn = boto3.client('sqs', region_name='us-east-1')
queue_url = conn.create_queue(QueueName="tester")['QueueUrl']
# Setup PPID
os.getppid.return_value = 123
# Setup internal queue
q = Queue(1)
# When I have no parent process, and shutdown is not set
worker = ReadWorker(queue_url, q, BATCHSIZE, parent_id=1)
worker.read_message = Mock()
# Then I return from run()
worker.run().should.be.none
"""
Test read workers fill internal queue with celery tasks
"""
conn = boto3.client('sqs', region_name='us-east-1')
queue_url = conn.create_queue(QueueName="tester")['QueueUrl']
message = (
'{"body": "KGRwMApTJ3Rhc2snCnAxClMndGVzdHMudGFza3MuaW5kZXhfa'
'W5jcmVtZW50ZXInCnAyCnNTJ2Fy\\nZ3MnCnAzCihscDQKc1Mna3dhcmdzJw'
'pwNQooZHA2ClMnbWVzc2FnZScKcDcKUydUZXN0IG1lc3Nh\\nZ2UyJwpwOAp'
'zcy4=\\n", "some stuff": "asdfasf"}'
)
conn.send_message(QueueUrl=queue_url, MessageBody=message)
internal_queue = Queue()
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE, parent_id=1)
worker.read_message()
packed_message = internal_queue.get(timeout=1)
found_message_body = decode_message(packed_message['message'])
found_message_body.should.equal({
'task': 'tests.tasks.index_incrementer',
'args': [],
'kwargs': {
'message': 'Test message2',
},
"""
conn = boto3.client('sqs', region_name='us-east-1')
queue_url = conn.create_queue(QueueName="tester")['QueueUrl']
message = json.dumps({
'task': 'tests.tasks.index_incrementer',
'args': [],
'kwargs': {
'message': 'Test message',
},
})
conn.send_message(QueueUrl=queue_url, MessageBody=message)
internal_queue = Queue()
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE, parent_id=1)
worker.read_message()
packed_message = internal_queue.get(timeout=1)
found_message_body = decode_message(packed_message['message'])
found_message_body.should.equal({
'task': 'tests.tasks.index_incrementer',
'args': [],
'kwargs': {
'message': 'Test message',
},
message = json.dumps(
{
"body": (
"KGRwMApTJ3Rhc2snCnAxClMndGVzdHMudGFza3MuaW5kZXhfaW5jcmVtZW"
"50ZXInCnAyCnNTJ2Fy\nZ3MnCnAzCihscDQKc1Mna3dhcmdzJwpwNQooZHA"
"2ClMnbWVzc2FnZScKcDcKUydUZXN0IG1lc3Nh\nZ2UyJwpwOApzcy4=\n"
),
"some stuff": "asdfasf",
}
)
for _ in range(3):
conn.send_message(QueueUrl=queue_url, MessageBody=message)
# Run Reader
internal_queue = Queue(maxsize=1)
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE, parent_id=1)
worker.read_message()
# Check log messages
logger.handlers[0].messages['warning'][0].should.contain(
"Timed out trying to add the following message to the internal queue")
logger.handlers[0].messages['warning'][1].should.contain(
"Clearing Local messages since we exceeded their visibility_timeout")
def check_for_new_queues(self):
queue_urls = self.get_queue_urls_from_queue_prefixes(
self.queue_prefixes)
new_queue_urls = set(queue_urls) - set(self.queue_urls)
for new_queue_url in new_queue_urls:
logger.info("Found new queue\t{}".format(new_queue_url))
worker = ReadWorker(
new_queue_url, self.internal_queue, self.batchsize,
connection_args=self.connection_args,
parent_id=self._pid,
)
worker.start()
self.reader_children.append(worker)
def _initialize_reader_children(self):
for queue_url in self.queue_urls:
self.reader_children.append(
ReadWorker(
queue_url, self.internal_queue, self.batchsize,
connection_args=self.connection_args,
parent_id=self._pid,
)
def __init__(self, queue_url, internal_queue, batchsize,
connection_args=None, *args, **kwargs):
super(ReadWorker, self).__init__(*args, **kwargs)
if connection_args is None:
connection_args = {}
self.connection_args = connection_args
self.conn = get_conn(**self.connection_args)
self.queue_url = queue_url
sqs_queue = self.conn.get_queue_attributes(
QueueUrl=queue_url, AttributeNames=['All'])['Attributes']
self.visibility_timeout = int(sqs_queue['VisibilityTimeout'])
self.internal_queue = internal_queue
self.batchsize = batchsize
def _replace_reader_children(self):
for index, reader in enumerate(self.reader_children):
if not reader.is_alive():
logger.info(
"Reader Process {} is no longer responding, "
"spawning a new reader.".format(reader.pid))
queue_url = reader.queue_url
self.reader_children.pop(index)
worker = ReadWorker(
queue_url, self.internal_queue, self.batchsize,
connection_args=self.connection_args,
parent_id=self._pid,
)
worker.start()
self.reader_children.append(worker)