Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@task(custom_function_path="custom_function.path", queue="foobar")
def custom_path_task():
pass
@task()
def sleeper(message, extra=None):
import time
time.sleep(message)
@task(queue='email')
def send_email(subject, message):
pass
@task()
def index_incrementer(message, extra=None):
if isinstance(message, basestring):
task_results.append(message)
else:
raise ValueError(
"Need to be given basestring, was given {}".format(message))
@task(queue='delayed', delay_seconds=5)
def delayed_task():
pass
@task(queue='queue-example')
def process(message):
logging.info(
f'PyQS task process() is processing message {message}. '
'Clap your hands!'