How to use the pebble.ThreadPool function in Pebble

To help you get started, we’ve selected a few Pebble examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github noxdafox / pebble / test / test_thread_pool.py View on Github external
def test_thread_pool_error_callback(self):
        """Thread Pool errors are forwarded to callback."""
        with ThreadPool(max_workers=1) as pool:
            future = pool.schedule(error_function)
            future.add_done_callback(self.callback)
        self.event.wait()
        self.assertTrue(isinstance(self.exception, Exception))
github noxdafox / pebble / test / test_thread_pool.py View on Github external
def test_thread_pool_callback(self):
        """Thread Pool results are forwarded to the callback."""
        with ThreadPool(max_workers=1) as pool:
            future = pool.schedule(
                function, args=[1], kwargs={'keyword_argument': 1})
            future.add_done_callback(self.callback)

        self.event.wait()
        self.assertEqual(self.results, 2)
github noxdafox / pebble / test / test_thread_pool.py View on Github external
def test_thread_pool_multiple_futures(self):
        """Thread Pool multiple futures."""
        futures = []
        with ThreadPool(max_workers=1) as pool:
            for _ in range(5):
                futures.append(pool.schedule(function, args=[1]))
        self.assertEqual(sum([t.result() for t in futures]), 5)
github noxdafox / pebble / tests / test_thread.py View on Github external
def test_thread_pool_schedule_id(self):
        """ThreadPool task ID is forwarded to it."""
        with ThreadPool() as tp:
            task = tp.schedule(jp, args=(1, ), identifier='foo')
        self.assertEqual(task.id, 'foo')
github noxdafox / pebble / tests / test_thread.py View on Github external
def test_thread_pool_close(self):
        """ThreadPool is closed consuming all tasks."""
        tp = ThreadPool()
        for i in range(0, 10):
            tp.schedule(jp, args=(1, ))
        tp.close()
        tp.join()
        self.assertTrue(tp._queue.qsize() <= 1)
github noxdafox / pebble / test / test_thread_pool.py View on Github external
def test_thread_pool_different_thread(self):
        """Thread Pool multiple futures are handled by different threades."""
        futures = []
        with ThreadPool(max_workers=2) as pool:
            for _ in range(0, 5):
                futures.append(pool.schedule(tid_function))
        self.assertEqual(len(set([t.result() for t in futures])), 2)
github noxdafox / pebble / test / test_thread_pool.py View on Github external
def test_thread_pool_stop_stopped_function(self):
        """Thread Pool is stopped in function."""
        with ThreadPool(max_workers=1) as pool:
            def function():
                pool.stop()

            pool.schedule(function)

        self.assertFalse(pool.active)
github noxdafox / pebble / test / test_thread_pool.py View on Github external
def test_thread_pool_map(self):
        """Thread Pool map simple."""
        elements = [1, 2, 3]

        with ThreadPool(max_workers=1) as pool:
            future = pool.map(function, elements)
            generator = future.result()
            self.assertEqual(list(generator), elements)
github noxdafox / pebble / test / test_thread_pool.py View on Github external
def test_thread_pool_map_zero_chunk(self):
        """Thread Pool map chunksize 0."""
        with ThreadPool(max_workers=1) as pool:
            with self.assertRaises(ValueError):
                pool.map(function, [], chunksize=0)
github noxdafox / pebble / tests / test_thread.py View on Github external
def test_thread_pool_schedule_uuid(self):
        """ThreadPool task UUID is assigned if None."""
        with ThreadPool() as tp:
            task = tp.schedule(jp, args=(1, ))
        self.assertEqual(task.id.version, 4)