Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_to_seconds(input, output):
assert arq.utils.to_seconds(input) == output
:param coroutine: coroutine function to call, can be a string to import
:param name: name for function, if None, ``coroutine.__qualname__`` is used
:param keep_result: duration to keep the result for, if 0 the result is not kept
:param timeout: maximum time the job should take
:param max_tries: maximum number of tries allowed for the function, use 1 to prevent retrying
"""
if isinstance(coroutine, Function):
return coroutine
if isinstance(coroutine, str):
name = name or coroutine
coroutine = import_string(coroutine)
assert asyncio.iscoroutinefunction(coroutine), f'{coroutine} is not a coroutine function'
timeout = to_seconds(timeout)
keep_result = to_seconds(keep_result)
return Function(name or coroutine.__qualname__, coroutine, timeout, keep_result, max_tries)
job_serializer: Optional[Serializer] = None,
job_deserializer: Optional[Deserializer] = None,
):
self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)}
self.queue_name = queue_name
self.cron_jobs: List[CronJob] = []
if cron_jobs:
assert all(isinstance(cj, CronJob) for cj in cron_jobs), 'cron_jobs, must be instances of CronJob'
self.cron_jobs = cron_jobs
self.functions.update({cj.name: cj for cj in self.cron_jobs})
assert len(self.functions) > 0, 'at least one function or cron_job must be registered'
self.burst = burst
self.on_startup = on_startup
self.on_shutdown = on_shutdown
self.sem = asyncio.BoundedSemaphore(max_jobs)
self.job_timeout_s = to_seconds(job_timeout)
self.keep_result_s = to_seconds(keep_result)
self.poll_delay_s = to_seconds(poll_delay)
self.queue_read_limit = queue_read_limit or max_jobs
self._queue_read_offset = 0
self.max_tries = max_tries
self.health_check_interval = to_seconds(health_check_interval)
if health_check_key is None:
self.health_check_key = self.queue_name + health_check_key_suffix
else:
self.health_check_key = health_check_key
self.pool = redis_pool
if self.pool is None:
self.redis_settings = redis_settings or RedisSettings()
else:
self.redis_settings = None
self.tasks = []
:param second: second(s) to run the job on, 0 - 59
:param microsecond: microsecond(s) to run the job on,
defaults to 123456 as the world is busier at the top of a second, 0 - 1e6
:param run_at_startup: whether to run as worker starts
:param unique: whether the job should be only be executed once at each time
:param timeout: job timeout
:param keep_result: how long to keep the result for
:param max_tries: maximum number of tries for the job
"""
if isinstance(coroutine, str):
name = name or 'cron:' + coroutine
coroutine = import_string(coroutine)
assert asyncio.iscoroutinefunction(coroutine), f'{coroutine} is not a coroutine function'
timeout = to_seconds(timeout)
keep_result = to_seconds(keep_result)
return CronJob(
name or 'cron:' + coroutine.__qualname__,
coroutine,
month,
day,
weekday,
hour,
minute,
second,
microsecond,
run_at_startup,
unique,
timeout,
keep_result,
if cron_jobs:
assert all(isinstance(cj, CronJob) for cj in cron_jobs), 'cron_jobs, must be instances of CronJob'
self.cron_jobs = cron_jobs
self.functions.update({cj.name: cj for cj in self.cron_jobs})
assert len(self.functions) > 0, 'at least one function or cron_job must be registered'
self.burst = burst
self.on_startup = on_startup
self.on_shutdown = on_shutdown
self.sem = asyncio.BoundedSemaphore(max_jobs)
self.job_timeout_s = to_seconds(job_timeout)
self.keep_result_s = to_seconds(keep_result)
self.poll_delay_s = to_seconds(poll_delay)
self.queue_read_limit = queue_read_limit or max_jobs
self._queue_read_offset = 0
self.max_tries = max_tries
self.health_check_interval = to_seconds(health_check_interval)
if health_check_key is None:
self.health_check_key = self.queue_name + health_check_key_suffix
else:
self.health_check_key = health_check_key
self.pool = redis_pool
if self.pool is None:
self.redis_settings = redis_settings or RedisSettings()
else:
self.redis_settings = None
self.tasks = []
self.main_task = None
self.loop = asyncio.get_event_loop()
self.ctx = ctx or {}
max_timeout = max(f.timeout_s or self.job_timeout_s for f in self.functions.values())
self.in_progress_timeout_s = max_timeout + 10
self.jobs_complete = 0
:param coroutine: coroutine function to call, can be a string to import
:param name: name for function, if None, ``coroutine.__qualname__`` is used
:param keep_result: duration to keep the result for, if 0 the result is not kept
:param timeout: maximum time the job should take
:param max_tries: maximum number of tries allowed for the function, use 1 to prevent retrying
"""
if isinstance(coroutine, Function):
return coroutine
if isinstance(coroutine, str):
name = name or coroutine
coroutine = import_string(coroutine)
assert asyncio.iscoroutinefunction(coroutine), f'{coroutine} is not a coroutine function'
timeout = to_seconds(timeout)
keep_result = to_seconds(keep_result)
return Function(name or coroutine.__qualname__, coroutine, timeout, keep_result, max_tries)
:param microsecond: microsecond(s) to run the job on,
defaults to 123456 as the world is busier at the top of a second, 0 - 1e6
:param run_at_startup: whether to run as worker starts
:param unique: whether the job should be only be executed once at each time
:param timeout: job timeout
:param keep_result: how long to keep the result for
:param max_tries: maximum number of tries for the job
"""
if isinstance(coroutine, str):
name = name or 'cron:' + coroutine
coroutine = import_string(coroutine)
assert asyncio.iscoroutinefunction(coroutine), f'{coroutine} is not a coroutine function'
timeout = to_seconds(timeout)
keep_result = to_seconds(keep_result)
return CronJob(
name or 'cron:' + coroutine.__qualname__,
coroutine,
month,
day,
weekday,
hour,
minute,
second,
microsecond,
run_at_startup,
unique,
timeout,
keep_result,
max_tries,
):
self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)}
self.queue_name = queue_name
self.cron_jobs: List[CronJob] = []
if cron_jobs:
assert all(isinstance(cj, CronJob) for cj in cron_jobs), 'cron_jobs, must be instances of CronJob'
self.cron_jobs = cron_jobs
self.functions.update({cj.name: cj for cj in self.cron_jobs})
assert len(self.functions) > 0, 'at least one function or cron_job must be registered'
self.burst = burst
self.on_startup = on_startup
self.on_shutdown = on_shutdown
self.sem = asyncio.BoundedSemaphore(max_jobs)
self.job_timeout_s = to_seconds(job_timeout)
self.keep_result_s = to_seconds(keep_result)
self.poll_delay_s = to_seconds(poll_delay)
self.queue_read_limit = queue_read_limit or max_jobs
self._queue_read_offset = 0
self.max_tries = max_tries
self.health_check_interval = to_seconds(health_check_interval)
if health_check_key is None:
self.health_check_key = self.queue_name + health_check_key_suffix
else:
self.health_check_key = health_check_key
self.pool = redis_pool
if self.pool is None:
self.redis_settings = redis_settings or RedisSettings()
else:
self.redis_settings = None
self.tasks = []
self.main_task = None
self.loop = asyncio.get_event_loop()
job_deserializer: Optional[Deserializer] = None,
):
self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)}
self.queue_name = queue_name
self.cron_jobs: List[CronJob] = []
if cron_jobs:
assert all(isinstance(cj, CronJob) for cj in cron_jobs), 'cron_jobs, must be instances of CronJob'
self.cron_jobs = cron_jobs
self.functions.update({cj.name: cj for cj in self.cron_jobs})
assert len(self.functions) > 0, 'at least one function or cron_job must be registered'
self.burst = burst
self.on_startup = on_startup
self.on_shutdown = on_shutdown
self.sem = asyncio.BoundedSemaphore(max_jobs)
self.job_timeout_s = to_seconds(job_timeout)
self.keep_result_s = to_seconds(keep_result)
self.poll_delay_s = to_seconds(poll_delay)
self.queue_read_limit = queue_read_limit or max_jobs
self._queue_read_offset = 0
self.max_tries = max_tries
self.health_check_interval = to_seconds(health_check_interval)
if health_check_key is None:
self.health_check_key = self.queue_name + health_check_key_suffix
else:
self.health_check_key = health_check_key
self.pool = redis_pool
if self.pool is None:
self.redis_settings = redis_settings or RedisSettings()
else:
self.redis_settings = None
self.tasks = []
self.main_task = None