Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_client_signals(tracer, fake_transport):
trace_config = az.make_trace_config(tracer)
session = aiohttp.ClientSession(trace_configs=[trace_config])
with tracer.new_trace() as span:
span.name('client:signals')
url = 'https://httpbin.org/get'
# do not propagate headers
ctx = {'span_context': span.context, 'propagate_headers': False}
resp = await session.get(url, trace_request_ctx=ctx)
await resp.text()
assert resp.status == 200
assert az.make_context(resp.request_info.headers) is None
# by default headers added
ctx = {'span_context': span.context}
resp = await session.get(url, trace_request_ctx=ctx)
await resp.text()
async def make_app():
app = web.Application()
app.router.add_get('/api/v1/data', handler)
zipkin_address = 'http://127.0.0.1:9411/api/v2/spans'
endpoint = az.create_endpoint('service_b', ipv4=host, port=port)
tracer = await az.create(zipkin_address, endpoint, sample_rate=1.0)
az.setup(app, tracer)
trace_config = az.make_trace_config(tracer)
session = aiohttp.ClientSession(trace_configs=[trace_config])
app['session'] = session
async def close_session(app):
await app['session'].close()
app.on_cleanup.append(close_session)
return app
async def make_app():
app = web.Application()
app.router.add_get('/api/v1/data', handler)
app.router.add_get('/', handler)
endpoint = az.create_endpoint('service_a', ipv4=host, port=port)
tracer = await az.create(zipkin_address, endpoint, sample_rate=1.0)
trace_config = az.make_trace_config(tracer)
session = aiohttp.ClientSession(trace_configs=[trace_config])
app['session'] = session
async def close_session(app):
await app['session'].close()
app.on_cleanup.append(close_session)
az.setup(app, tracer)
TEMPLATES_ROOT = pathlib.Path(__file__).parent / 'templates'
aiohttp_jinja2.setup(
app, loader=jinja2.FileSystemLoader(str(TEMPLATES_ROOT))
)