Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_parent_process_death(os):
"""
Test worker processes recognize parent process death
"""
os.getppid.return_value = 123
worker = BaseWorker(parent_id=1)
worker.parent_is_alive().should.be.false
def test_parent_process_alive(os):
"""
Test worker processes recognize when parent process is alive
"""
os.getppid.return_value = 1234
worker = BaseWorker(parent_id=1234)
worker.parent_is_alive().should.be.true
packed_message, True, self.visibility_timeout)
except Full:
msg = (
"Timed out trying to add the following message "
"to the internal queue after {} seconds: {}"
).format(self.visibility_timeout, message_body) # noqa
logger.warning(msg)
continue
else:
logger.debug(
"Message successfully added to internal queue "
"from SQS queue {} with body: {}".format(
self.queue_url, message_body)) # noqa
class ProcessWorker(BaseWorker):
def __init__(self, internal_queue, interval, connection_args=None, *args,
**kwargs):
super(ProcessWorker, self).__init__(*args, **kwargs)
if connection_args is None:
self.conn = get_conn()
else:
self.conn = get_conn(**connection_args)
self.internal_queue = internal_queue
self.interval = interval
self._messages_to_process_before_shutdown = 100
def run(self):
# Set the child process to not receive any keyboard interrupts
signal.signal(signal.SIGINT, signal.SIG_IGN)
def __init__(self, *args, **kwargs):
self.parent_id = kwargs.pop('parent_id')
super(BaseWorker, self).__init__(*args, **kwargs)
self.should_exit = Event()
def shutdown(self):
logger.info(
"Received shutdown signal, shutting down PID {}!".format(
os.getpid()))
self.should_exit.set()
def parent_is_alive(self):
if os.getppid() != self.parent_id:
logger.info(
"Parent process has gone away, exiting process {}!".format(
os.getpid()))
return False
return True
class ReadWorker(BaseWorker):
def __init__(self, queue_url, internal_queue, batchsize,
connection_args=None, *args, **kwargs):
super(ReadWorker, self).__init__(*args, **kwargs)
if connection_args is None:
connection_args = {}
self.connection_args = connection_args
self.conn = get_conn(**self.connection_args)
self.queue_url = queue_url
sqs_queue = self.conn.get_queue_attributes(
QueueUrl=queue_url, AttributeNames=['All'])['Attributes']
self.visibility_timeout = int(sqs_queue['VisibilityTimeout'])
self.internal_queue = internal_queue
self.batchsize = batchsize