Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_delayed_module_init_class():
if constants.is_py37():
return Python37DelayedImport
else:
return Python35plusDelayedImport
websocket_max_size=websocket_max_size,
websocket_max_queue=websocket_max_queue,
websocket_read_limit=websocket_read_limit,
websocket_write_limit=websocket_write_limit,
state=state,
debug=debug,
)
create_server_kwargs = dict(
ssl=ssl,
reuse_port=reuse_port,
sock=sock,
backlog=backlog,
)
if constants.is_py37():
create_server_kwargs.update(
start_serving=False
)
server_coroutine = loop.create_server(
server, **create_server_kwargs
)
# Instead of pulling time at the end of every request,
# pull it once per minute
loop.call_soon(partial(update_current_time, loop))
if run_async:
return server_coroutine
try:
def start():
pid = os.getpid()
try:
logger.info("Starting worker [%s]", pid)
if constants.is_py37():
loop.run_until_complete(http_server.serve_forever())
else:
loop.run_forever()
finally:
http_server.close()
loop.run_until_complete(http_server.wait_closed())
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
def start_serving():
if constants.is_py37():
loop.run_until_complete(http_server.start_serving())