Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
message = json.dumps({
'task': 'tests.tasks.index_incrementer',
'args': [],
'kwargs': {
'message': 'Test message',
},
})
conn.send_message(QueueUrl=queue_url, MessageBody=message)
internal_queue = Queue()
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE, parent_id=1)
worker.read_message()
packed_message = internal_queue.get(timeout=1)
found_message_body = decode_message(packed_message['message'])
found_message_body.should.equal({
'task': 'tests.tasks.index_incrementer',
'args': [],
'kwargs': {
'message': 'Test message',
},
queue_url = conn.create_queue(QueueName="tester")['QueueUrl']
message = (
'{"body": "KGRwMApTJ3Rhc2snCnAxClMndGVzdHMudGFza3MuaW5kZXhfa'
'W5jcmVtZW50ZXInCnAyCnNTJ2Fy\\nZ3MnCnAzCihscDQKc1Mna3dhcmdzJw'
'pwNQooZHA2ClMnbWVzc2FnZScKcDcKUydUZXN0IG1lc3Nh\\nZ2UyJwpwOAp'
'zcy4=\\n", "some stuff": "asdfasf"}'
)
conn.send_message(QueueUrl=queue_url, MessageBody=message)
internal_queue = Queue()
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE, parent_id=1)
worker.read_message()
packed_message = internal_queue.get(timeout=1)
found_message_body = decode_message(packed_message['message'])
found_message_body.should.equal({
'task': 'tests.tasks.index_incrementer',
'args': [],
'kwargs': {
'message': 'Test message2',
},
def process_message(self):
try:
packed_message = self.internal_queue.get(timeout=0.5)
except Empty:
# Return False if we did not attempt to process any messages
return False
message = packed_message['message']
queue_url = packed_message['queue']
fetch_time = packed_message['start_time']
timeout = packed_message['timeout']
message_body = decode_message(message)
full_task_path = message_body['task']
args = message_body['args']
kwargs = message_body['kwargs']
task_name = full_task_path.split(".")[-1]
task_path = ".".join(full_task_path.split(".")[:-1])
task_module = importlib.import_module(task_path)
task = getattr(task_module, task_name)
current_time = time.time()
if int(current_time - fetch_time) >= timeout:
logger.warning(
"Discarding task {} with args: {} and kwargs: {} due to "
"exceeding visibility timeout".format( # noqa
def read_message(self):
messages = self.sqs_queue.get_messages(MESSAGE_DOWNLOAD_BATCH_SIZE, wait_time_seconds=LONG_POLLING_INTERVAL)
logger.info("Successfully got {} messages from SQS queue {}".format(len(messages), self.sqs_queue.name)) # noqa
start = time.time()
for message in messages:
end = time.time()
if int(end - start) >= self.visibility_timeout:
# Don't add any more messages since they have re-appeared in the sqs queue
# Instead just reset and get fresh messages from the sqs queue
msg = "Clearing Local messages since we exceeded their visibility_timeout"
logger.warning(msg)
break
message_body = decode_message(message)
try:
packed_message = {
"is_pyqs_task": True if 'task' in message_body else False,
"queue": self.sqs_queue.id,
"message": message,
"message_body": message_body,
"start_time": start,
"timeout": self.visibility_timeout,
}
self.internal_queue.put(packed_message, True, self.visibility_timeout)
except Full:
msg = "Timed out trying to add the following message to the internal queue after {} seconds: {}".format(self.visibility_timeout, message_body) # noqa
logger.warning(msg)
continue
else:
logger.debug("Message successfully added to internal queue from SQS queue {} with body: {}".format(self.sqs_queue.name, message_body)) # noqa
start = time.time()
for message in messages:
end = time.time()
if int(end - start) >= self.visibility_timeout:
# Don't add any more messages since they have
# re-appeared in the sqs queue Instead just reset and get
# fresh messages from the sqs queue
msg = (
"Clearing Local messages since we exceeded "
"their visibility_timeout"
)
logger.warning(msg)
break
message_body = decode_message(message)
try:
packed_message = {
"queue": self.queue_url,
"message": message,
"start_time": start,
"timeout": self.visibility_timeout,
}
self.internal_queue.put(
packed_message, True, self.visibility_timeout)
except Full:
msg = (
"Timed out trying to add the following message "
"to the internal queue after {} seconds: {}"
).format(self.visibility_timeout, message_body) # noqa
logger.warning(msg)
continue