How to use the pyqs.utils.function_to_import_path function in pyqs

To help you get started, we’ve selected a few pyqs examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github spulec / PyQS / pyqs / decorator.py View on Github external
def task_delayer(func_to_delay, queue_name, delay_seconds=None,
                 override=False):
    function_path = function_to_import_path(func_to_delay, override=override)

    if not queue_name:
        # If no queue specified, use the function_path for the queue
        queue_name = function_path

    def wrapper(*args, **kwargs):
        queue = get_or_create_queue(queue_name)

        _delay_seconds = delay_seconds
        if '_delay_seconds' in kwargs:
            _delay_seconds = kwargs['_delay_seconds']
            del kwargs['_delay_seconds']

        logger.info("Delaying task %s: %s, %s", function_path, args, kwargs)
        message_dict = {
            'task': function_path,