Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_undecorated_callback(self):
"""Thread Concurrent undecorated results are forwarded to callback."""
task = thread.concurrent(target=undecorated, args=[1],
kwargs={'keyword_argument': 1},
callback=self.callback)
event.wait()
self.assertEqual(task.get(), 2)
@thread.concurrent
def instmethod(self):
return self.b
@thread.concurrent
def error_decorated():
raise Exception("BOOM!")
@thread.concurrent(callback=callback)
def long_decorated_callback():
time.sleep(1)
@thread.concurrent(callback=callback)
def decorated_callback(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument
@thread.concurrent(5, name='foo')
def wrong():
return
except Exception as error:
def test_undecorated_results(self):
"""Process Concurrent undecorated results are produced."""
task = thread.concurrent(target=undecorated_simple)
self.assertEqual(task.get(), 0)
@thread_worker(daemon=True)
def task_manager(task):
"""Task's lifecycle manager.
Starts a new worker, waits for the *Task* to be performed,
collects results, runs the callback and cleans up the process.
"""
queue = task._queue
function = task._function
timeout = task.timeout > 0 and task.timeout or None
process = task_worker(queue, function, task._args, task._kwargs)
try:
results = queue.get(timeout)
task._set(results)