Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_uvloop_policy(self):
try:
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.new_event_loop()
try:
self.assertIsInstance(loop, uvloop.Loop)
finally:
loop.close()
finally:
asyncio.set_event_loop_policy(None)
import socket
from contextlib import ExitStack
from tempfile import mktemp
import aiohttp.web
import pytest
import http.client
import aiomisc
from aiomisc.service import TCPServer, UDPServer, TLSServer
from aiomisc.service.aiohttp import AIOHTTPService
try:
import uvloop
uvloop_loop_type = uvloop.Loop
except ImportError:
uvloop_loop_type = None
pytestmark = pytest.mark.catch_loop_exceptions
@pytest.fixture()
def unix_socket_udp():
socket_path = mktemp(dir='/tmp', suffix='.sock')
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.setblocking(False)
# Behaviour like in the bind_socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
async def start_server():
nonlocal CNT
CNT = 0
addrs = ('127.0.0.1', 'localhost')
if not isinstance(self.loop, uvloop.Loop):
# Hack to let tests run on Python 3.5.0
# (asyncio doesn't support multiple hosts in 3.5.0)
addrs = '127.0.0.1'
srv = await asyncio.start_server(
handle_client,
addrs, 0,
family=socket.AF_INET)
srv_socks = srv.sockets
self.assertTrue(srv_socks)
if self.has_start_serving():
self.assertTrue(srv.is_serving())
addr = srv_socks[0].getsockname()
help='0=pickle 1=cloudpickle 2=dill')
parser.add_argument(
'--data_pickle', '-d', dest='data_pickle', default=0, type=int,
help='0=pickle 1=cloudpickle 2=dill')
args = parser.parse_args()
if not args.port and not args.unix_path:
print('distex installed OK')
return
if args.loop == LoopType.default:
loop = util.get_loop()
elif args.loop == LoopType.asyncio:
loop = asyncio.get_event_loop()
elif args.loop == LoopType.uvloop:
import uvloop
loop = uvloop.Loop()
elif args.loop == LoopType.proactor:
loop = asyncio.ProactorEventLoop()
elif args.loop == LoopType.quamash:
import quamash
import PyQt5.Qt as qt
qapp = qt.QApplication([]) # noqa
loop = quamash.QEventLoop()
asyncio.set_event_loop(loop)
processor = Processor( # noqa
args.host, args.port, args.unix_path,
args.func_pickle, args.data_pickle)
loop.run_forever()
def is_uvloop_event_loop():
try:
# noinspection PyPackageRequirements
from uvloop import Loop
except ImportError:
return False
return isinstance(self._loop, Loop)
def get_loop():
"""
Get optimal event loop for the platform.
"""
loop = None
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
else:
with suppress(ImportError):
import uvloop
loop = uvloop.Loop()
return loop or asyncio.get_event_loop()
if platform.type == 'win': # pragma nocover
EVENT_LOOPS['select'] = asyncio.SelectorEventLoop
EVENT_LOOPS['proactor'] = asyncio.ProactorEventLoop
else:
for selector in ('Epoll', 'Kqueue', 'Poll', 'Select'):
name = '%sSelector' % selector
selector_class = getattr(asyncio.selectors, name, None)
if selector_class:
EVENT_LOOPS[selector.lower()] = make_loop_factory(selector_class)
try: # add uvloop if available
import uvloop
EVENT_LOOPS['uv'] = uvloop.Loop
# Future = uvloop.Future
except Exception: # pragma nocover
pass
if os.environ.get('BUILDING-PULSAR-DOCS') == 'yes': # pragma nocover
default_loop = (
'uvloop if available, epoll on linux, '
'kqueue on mac, select on windows'
)
elif EVENT_LOOPS:
default_loop = tuple(EVENT_LOOPS)[0]
else:
default_loop = None