Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
coro = asyncio.create_subprocess_exec(*self.PROGRAM_BLOCKED)
task = self.loop.create_task(coro)
self.loop.call_soon(task.cancel)
try:
await task
except asyncio.CancelledError:
pass
# Give the process handler some time to close itself
await asyncio.sleep(0.3)
gc.collect()
# ignore the log:
# "Exception during subprocess creation, kill the subprocess"
with tb.disable_logger():
self.loop.run_until_complete(cancel_make_transport())
*self.PROGRAM_BLOCKED)
task = self.loop.create_task(coro)
self.loop.call_soon(task.cancel)
try:
await task
except asyncio.CancelledError:
pass
# Give the process handler some time to close itself
await asyncio.sleep(0.3)
gc.collect()
# ignore the log:
# "Exception during subprocess creation, kill the subprocess"
with tb.disable_logger():
self.loop.run_until_complete(cancel_make_transport())
tb.run_briefly(self.loop)