Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
) -> None:
self.context = get_context()
self.scheduler = scheduler or RoundRobin()
self.process_count = max(1, processes or os.cpu_count() or 2)
self.queue_count = max(1, queuecount or 1)
if self.queue_count > self.process_count:
raise ValueError("queue count must be <= process count")
self.initializer = initializer
self.initargs = initargs
self.maxtasksperchild = max(0, maxtasksperchild)
self.childconcurrency = max(1, childconcurrency)
self.processes: Dict[Process, QueueID] = {}
self.queues: Dict[QueueID, Tuple[Queue, Queue]] = {}
self.running = True
self.last_id = 0
self._results: Dict[TaskID, Tuple[Any, Optional[TracebackStr]]] = {}
self.init()
self._loop = asyncio.ensure_future(self.loop())
"""Send SIGTERM to child process."""
return self.aio_process.terminate()
# multiprocessing.Process methods added in 3.7
if sys.version_info >= (3, 7):
def kill(self) -> None:
"""Send SIGKILL to child process."""
return self.aio_process.kill()
def close(self) -> None:
"""Clean up child process once finished."""
return self.aio_process.close()
class Worker(Process):
"""Execute a coroutine on a separate process and return the result."""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, process_target=Worker.run_async, **kwargs)
self.unit.namespace.result = None
@staticmethod
def run_async(unit: Unit) -> R:
"""Initialize the child process and event loop, then execute the coroutine."""
try:
result: R = Process.run_async(unit)
unit.namespace.result = result
return result
except BaseException as e:
unit.namespace.result = e
import os
import queue
import traceback
from typing import Any, Awaitable, Callable, Dict, Optional, Sequence, Tuple
from .core import Process, get_context
from .scheduler import RoundRobin, Scheduler
from .types import PoolTask, ProxyException, Queue, QueueID, R, T, TaskID, TracebackStr
MAX_TASKS_PER_CHILD = 0 # number of tasks to execute before recycling a child process
CHILD_CONCURRENCY = 16 # number of tasks to execute simultaneously per child process
log = logging.getLogger(__name__)
class PoolWorker(Process):
"""Individual worker process for the async pool."""
def __init__(
self,
tx: Queue,
rx: Queue,
ttl: int = MAX_TASKS_PER_CHILD,
concurrency: int = CHILD_CONCURRENCY,
*,
initializer: Optional[Callable] = None,
initargs: Sequence[Any] = (),
) -> None:
super().__init__(target=self.run, initializer=initializer, initargs=initargs)
self.concurrency = max(1, concurrency)
self.ttl = max(0, ttl)
self.tx = tx