Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_process_worker_with_parent_process_alive_and_should_exit(os):
"""
Test worker processes exit when parent is alive and shutdown is set
"""
# Setup PPID
os.getppid.return_value = 1234
# When I have a parent process, and shutdown is set
worker = ProcessWorker("foo", INTERVAL, parent_id=1)
worker.process_message = Mock()
worker.shutdown()
# Then I return from run()
worker.run().should.be.none
def test_process_worker_with_parent_process_dead_and_should_not_exit(os):
"""
Test worker processes exit when parent is dead and shutdown is not set
"""
# Setup PPID
os.getppid.return_value = 1
# When I have no parent process, and shutdown is not set
worker = ProcessWorker("foo", INTERVAL, parent_id=1)
worker.process_message = Mock()
# Then I return from run()
worker.run().should.be.none
def test_process_worker_with_parent_process_alive_and_should_not_exit(os):
"""
Test worker processes do not exit when parent is alive and shutdown
is not set
"""
# Setup PPID
os.getppid.return_value = 1
# Setup dummy read_message
def process_message():
raise Exception("Called")
# When I have a parent process, and shutdown is not set
worker = ProcessWorker("foo", INTERVAL, parent_id=1)
worker.process_message = process_message
# Then process_message() is reached
worker.run.when.called_with().should.throw(Exception, "Called")
"ReceiptHandle": "receipt-1234",
}
# Add message to internal queue
internal_queue = Queue()
internal_queue.put(
{
"queue": queue_url,
"message": message,
"start_time": time.time(),
"timeout": 30,
}
)
# Process message
worker = ProcessWorker(internal_queue, INTERVAL, parent_id=1)
worker.process_message()
# Check output
kwargs = json.loads(message['Body'])['kwargs']
expected_result = (
u"Processed task tests.tasks.index_incrementer in 0.0000 seconds "
"with args: [] and kwargs: {}".format(kwargs)
)
logger.handlers[0].messages['info'].should.equal([expected_result])
"message": message,
"start_time": time.time(),
"timeout": 30,
}
)
internal_queue.put(
{
"queue": queue_url,
"message": message,
"start_time": time.time(),
"timeout": 30,
}
)
# When I Process messages
worker = ProcessWorker(internal_queue, INTERVAL, parent_id=1)
worker._messages_to_process_before_shutdown = 2
# Then I return from run()
worker.run().should.be.none
# With messages still on the queue
internal_queue.empty().should.be.false
internal_queue.full().should.be.false
"ReceiptHandle": "receipt-1234",
}
# Add message to internal queue with timeout of 0 that started long ago
internal_queue = Queue()
internal_queue.put(
{
"queue": queue_url,
"message": message,
"start_time": 0,
"timeout": 0,
}
)
# When I process the message
worker = ProcessWorker(internal_queue, INTERVAL, parent_id=1)
worker.process_message()
# Then I get an error about exceeding the visibility timeout
kwargs = json.loads(message['Body'])['kwargs']
msg1 = (
"Discarding task tests.tasks.index_incrementer with args: [] "
"and kwargs: {} due to exceeding "
"visibility timeout"
).format(kwargs) # noqa
logger.handlers[0].messages['warning'][0].lower().should.contain(
msg1.lower())
def _replace_worker_children(self):
for index, worker in enumerate(self.worker_children):
if not worker.is_alive():
logger.info(
"Worker Process {} is no longer responding, "
"spawning a new worker.".format(worker.pid))
self.worker_children.pop(index)
worker = ProcessWorker(
self.internal_queue, self.interval,
connection_args=self.connection_args,
parent_id=self._pid,
)
worker.start()
self.worker_children.append(worker)
def __init__(self, internal_queue, interval, connection_args=None, *args,
**kwargs):
super(ProcessWorker, self).__init__(*args, **kwargs)
if connection_args is None:
self.conn = get_conn()
else:
self.conn = get_conn(**connection_args)
self.internal_queue = internal_queue
self.interval = interval
self._messages_to_process_before_shutdown = 100
def _initialize_worker_children(self, number):
for index in range(number):
self.worker_children.append(
ProcessWorker(
self.internal_queue, self.interval,
connection_args=self.connection_args,
parent_id=self._pid,
)