Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_settings_changed():
settings = RedisSettings(port=123)
assert settings.port == 123
assert (
''
) == str(settings)
async def test_redis_log():
redis = await create_pool(RedisSettings())
await redis.flushall()
await redis.set(b'a', b'1')
await redis.set(b'b', b'2')
log_msgs = []
def _log(s):
log_msgs.append(s)
await log_redis_info(redis, _log)
assert len(log_msgs) == 1
assert re.search(r'redis_version=\d\.', log_msgs[0]), log_msgs
assert log_msgs[0].endswith(' db_keys=2')
async def test_redis_sentinel_failure():
settings = RedisSettings()
settings.host = [('localhost', 6379), ('localhost', 6379)]
settings.sentinel = True
try:
pool = await create_pool(settings)
await pool.ping('ping')
except Exception as e:
assert 'unknown command `SENTINEL`' in str(e)
async def test_redis_timeout(mocker):
mocker.spy(arq.utils.asyncio, 'sleep')
with pytest.raises(OSError):
await create_pool(RedisSettings(port=0, conn_retry_delay=0))
assert arq.utils.asyncio.sleep.call_count == 5
async def test_redis_success_log(caplog):
caplog.set_level(logging.INFO)
settings = RedisSettings()
pool = await create_pool(settings)
assert 'redis connection successful' not in [r.message for r in caplog.records]
pool.close()
await pool.wait_closed()
pool = await create_pool(settings, retry=1)
assert 'redis connection successful' in [r.message for r in caplog.records]
pool.close()
await pool.wait_closed()
self.on_shutdown = on_shutdown
self.sem = asyncio.BoundedSemaphore(max_jobs)
self.job_timeout_s = to_seconds(job_timeout)
self.keep_result_s = to_seconds(keep_result)
self.poll_delay_s = to_seconds(poll_delay)
self.queue_read_limit = queue_read_limit or max_jobs
self._queue_read_offset = 0
self.max_tries = max_tries
self.health_check_interval = to_seconds(health_check_interval)
if health_check_key is None:
self.health_check_key = self.queue_name + health_check_key_suffix
else:
self.health_check_key = health_check_key
self.pool = redis_pool
if self.pool is None:
self.redis_settings = redis_settings or RedisSettings()
else:
self.redis_settings = None
self.tasks = []
self.main_task = None
self.loop = asyncio.get_event_loop()
self.ctx = ctx or {}
max_timeout = max(f.timeout_s or self.job_timeout_s for f in self.functions.values())
self.in_progress_timeout_s = max_timeout + 10
self.jobs_complete = 0
self.jobs_retried = 0
self.jobs_failed = 0
self._last_health_check = 0
self._last_health_check_log = None
self._add_signal_handler(signal.SIGINT, self.handle_sig)
self._add_signal_handler(signal.SIGTERM, self.handle_sig)
self.on_stop = None