Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_manager_start_and_stop():
"""
Test managing process can start and stop child processes
"""
conn = boto3.client('sqs', region_name='us-east-1')
conn.create_queue(QueueName="email")
manager = ManagerWorker(
queue_prefixes=['email'], worker_concurrency=2, interval=1,
batchsize=10,
)
len(manager.worker_children).should.equal(2)
manager.worker_children[0].is_alive().should.equal(False)
manager.worker_children[1].is_alive().should.equal(False)
manager.start()
manager.worker_children[0].is_alive().should.equal(True)
manager.worker_children[1].is_alive().should.equal(True)
manager.stop()
def test_worker_to_large_batch_size():
"""
Test workers with too large of a batch size
"""
BATCHSIZE = 10000
CONCURRENCY = 1
QUEUE_PREFIX = "tester"
INTERVAL = 0.0
conn = boto3.client('sqs', region_name='us-east-1')
conn.create_queue(QueueName="tester")['QueueUrl']
worker = ManagerWorker(QUEUE_PREFIX, CONCURRENCY, INTERVAL, BATCHSIZE)
worker.batchsize.should.equal(MESSAGE_DOWNLOAD_BATCH_SIZE)
Test managing process handles OS signals
"""
# Setup SQS Queue
conn = boto3.client('sqs', region_name='us-east-1')
conn.create_queue(QueueName="tester")
# Mock out sys.exit
sys.exit = Mock()
# Have our inner method send our signal
def process_counts():
os.kill(os.getpid(), signal.SIGTERM)
# Setup Manager
manager = ManagerWorker(
queue_prefixes=["tester"], worker_concurrency=1, interval=1,
batchsize=10,
)
manager.process_counts = process_counts
manager._graceful_shutdown = MagicMock()
# When we start and trigger a signal
manager.start()
manager.sleep()
# Then we exit
sys.exit.assert_called_once_with(0)
import os
import signal
import time
# This sleep time is long enoug for 100 messages in queue
time.sleep(5)
try:
os.kill(pid, signal.SIGKILL)
except OSError:
# Return that we didn't need to kill the process
return True
else:
# Return that we needed to kill the process
return False
# Setup Manager
manager = ManagerWorker(
queue_prefixes=["tester"], worker_concurrency=1, interval=0.0,
batchsize=1,
)
manager.start()
# Give our processes a moment to start
time.sleep(1)
# Setup Threading watcher
try:
# Try Python 2 Style
thread = ThreadWithReturnValue2(
target=sleep_and_kill, args=(manager.reader_children[0].pid,))
thread.daemon = True
except TypeError:
# Use Python 3 Style
def test_master_spawns_worker_processes():
"""
Test managing process creates child workers
"""
# Setup SQS Queue
conn = boto3.client('sqs', region_name='us-east-1')
conn.create_queue(QueueName="tester")
# Setup Manager
manager = ManagerWorker(["tester"], 1, 1, 10)
manager.start()
# Check Workers
len(manager.reader_children).should.equal(1)
len(manager.worker_children).should.equal(1)
manager.reader_children[0].is_alive().should.be.true
manager.worker_children[0].is_alive().should.be.true
# Cleanup
manager.stop()
def test_master_replaces_reader_processes():
"""
Test managing process replaces reader children
"""
# Setup SQS Queue
conn = boto3.client('sqs', region_name='us-east-1')
conn.create_queue(QueueName="tester")
# Setup Manager
manager = ManagerWorker(
queue_prefixes=["tester"], worker_concurrency=1, interval=1,
batchsize=10,
)
manager.start()
# Get Reader PID
pid = manager.reader_children[0].pid
# Kill Reader and wait to replace
manager.reader_children[0].shutdown()
time.sleep(2)
manager.replace_workers()
# Check Replacement
manager.reader_children[0].pid.shouldnt.equal(pid)
def test_master_counts_processes():
"""
Test managing process counts child processes
"""
# Setup Logging
logger = logging.getLogger("pyqs")
del logger.handlers[:]
logger.handlers.append(MockLoggingHandler())
# Setup SQS Queue
conn = boto3.client('sqs', region_name='us-east-1')
conn.create_queue(QueueName="tester")
# Setup Manager
manager = ManagerWorker(["tester"], 2, 1, 10)
manager.start()
# Check Workers
manager.process_counts()
# Cleanup
manager.stop()
# Check messages
msg1 = "Reader Processes: 1"
logger.handlers[0].messages['debug'][-2].lower().should.contain(
msg1.lower())
msg2 = "Worker Processes: 2"
logger.handlers[0].messages['debug'][-1].lower().should.contain(
msg2.lower())
def test_manager_worker_with_queue_prefix():
"""
Test managing process can find queues by prefix
"""
conn = boto3.client('sqs', region_name='us-east-1')
conn.create_queue(QueueName="email.foobar")
conn.create_queue(QueueName="email.baz")
manager = ManagerWorker(
queue_prefixes=['email.*'], worker_concurrency=1, interval=1,
batchsize=10,
)
len(manager.reader_children).should.equal(2)
children = manager.reader_children
# Pull all the read children and sort by name to make testing easier
sorted_children = sorted(children, key=lambda child: child.queue_url)
sorted_children[0].queue_url.should.equal(
"https://queue.amazonaws.com/123456789012/email.baz")
sorted_children[1].queue_url.should.equal(
"https://queue.amazonaws.com/123456789012/email.foobar")
import os
import signal
import time
# This sleep time is long enoug for 100 messages in queue
time.sleep(5)
try:
os.kill(pid, signal.SIGKILL)
except OSError:
# Return that we didn't need to kill the process
return True
else:
# Return that we needed to kill the process
return False
# Setup Manager
manager = ManagerWorker(
queue_prefixes=["tester"], worker_concurrency=0, interval=0.0,
batchsize=1,
)
manager.start()
# Give our processes a moment to start
time.sleep(1)
# Setup Threading watcher
try:
# Try Python 2 Style
thread = ThreadWithReturnValue2(
target=sleep_and_kill, args=(manager.reader_children[0].pid,))
thread.daemon = True
except TypeError:
# Use Python 3 Style
def _main(queue_prefixes, concurrency=5, logging_level="WARN",
region=None, access_key_id=None, secret_access_key=None,
interval=1, batchsize=10, prefetch_multiplier=2):
logging.basicConfig(
format="[%(levelname)s]: %(message)s",
level=getattr(logging, logging_level),
)
logger.info("Starting PyQS version {}".format(__version__))
manager = ManagerWorker(
queue_prefixes, concurrency, interval, batchsize,
prefetch_multiplier=prefetch_multiplier, region=region,
access_key_id=access_key_id, secret_access_key=secret_access_key,
)
_add_cwd_to_path()
manager.start()
manager.sleep()