Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def make_app(host, port):
app = web.Application()
app.router.add_post('/consume', handler)
aiojobs.aiohttp.setup(app)
zipkin_address = 'http://127.0.0.1:9411/api/v2/spans'
endpoint = az.create_endpoint('backend_broker', ipv4=host, port=port)
tracer = await az.create(zipkin_address, endpoint, sample_rate=1.0)
az.setup(app, tracer)
return app
cleanup_contexts = [
manager_status_ctx,
redis_ctx,
database_ctx,
event_dispatcher_ctx,
hook_plugin_ctx,
monitoring_ctx,
agent_registry_ctx,
sched_dispatcher_ctx,
background_task_ctx,
]
for cleanup_ctx in cleanup_contexts:
if shutdown_cb := getattr(cleanup_ctx, 'shutdown', None):
app.on_shutdown.append(shutdown_cb)
app.cleanup_ctx.extend(cleanup_contexts)
aiojobs.aiohttp.setup(app, **app['scheduler_opts'])
cors = aiohttp_cors.setup(app, defaults=app['cors_opts'])
# should be done in create_app() in other modules.
cors.add(app.router.add_route('GET', r'', hello))
cors.add(app.router.add_route('GET', r'/', hello))
if subapp_pkgs is None:
subapp_pkgs = []
for pkg_name in subapp_pkgs:
if pidx == 0:
log.info('Loading module: {0}', pkg_name[1:])
subapp_mod = importlib.import_module(pkg_name, 'ai.backend.gateway')
init_subapp(pkg_name, app, getattr(subapp_mod, 'create_app'))
return app
middlewares = [
virtool.http.csp.middleware,
virtool.http.errors.middleware,
virtool.http.proxy.middleware,
virtool.http.query.middleware
]
do_setup = virtool.config.should_do_setup(config)
if not do_setup:
# Don't use authentication in setup mode.
middlewares.append(virtool.http.auth.middleware)
app = web.Application(middlewares=middlewares)
aiojobs.aiohttp.setup(app)
app.on_response_prepare.append(virtool.http.csp.on_prepare)
app.on_startup.extend([
temp_init_thing(do_setup, config),
init_events,
init_version,
init_client_path,
init_setup,
init_http_client,
init_routes,
init_executors,
init_dispatcher,
init_db,
init_settings,
init_sentry,
async def watcher_server(loop, pidx, args):
global shutdown_enabled
app = web.Application()
app['config'] = args[0]
aiojobs.aiohttp.setup(app, close_timeout=10)
etcd_credentials = None
if app['config']['etcd']['user']:
etcd_credentials = {
'user': app['config']['etcd']['user'],
'password': app['config']['etcd']['password'],
}
scope_prefix_map = {
ConfigScopes.GLOBAL: '',
}
etcd = AsyncEtcd(app['config']['etcd']['addr'],
app['config']['etcd']['namespace'],
scope_prefix_map=scope_prefix_map,
credentials=etcd_credentials)
app['config_server'] = etcd