Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
with tracer.new_trace() as span:
span.name('client:signals')
url = 'https://httpbin.org/get'
# do not propagate headers
ctx = {'span_context': span.context, 'propagate_headers': False}
resp = await session.get(url, trace_request_ctx=ctx)
await resp.text()
assert resp.status == 200
assert az.make_context(resp.request_info.headers) is None
# by default headers added
ctx = {'span_context': span.context}
resp = await session.get(url, trace_request_ctx=ctx)
await resp.text()
assert resp.status == 200
context = az.make_context(resp.request_info.headers)
assert context.trace_id == span.context.trace_id
await session.close()
assert len(fake_transport.records) == 3
record1 = fake_transport.records[0].asdict()
record2 = fake_transport.records[1].asdict()
record3 = fake_transport.records[2].asdict()
assert record2['parentId'] == record3['id']
assert record1['parentId'] == record3['id']
assert record3['name'] == 'client:signals'
async def test_client_signals(tracer, fake_transport):
trace_config = az.make_trace_config(tracer)
session = aiohttp.ClientSession(trace_configs=[trace_config])
with tracer.new_trace() as span:
span.name('client:signals')
url = 'https://httpbin.org/get'
# do not propagate headers
ctx = {'span_context': span.context, 'propagate_headers': False}
resp = await session.get(url, trace_request_ctx=ctx)
await resp.text()
assert resp.status == 200
assert az.make_context(resp.request_info.headers) is None
# by default headers added
ctx = {'span_context': span.context}
resp = await session.get(url, trace_request_ctx=ctx)
await resp.text()
assert resp.status == 200
context = az.make_context(resp.request_info.headers)
assert context.trace_id == span.context.trace_id
await session.close()
assert len(fake_transport.records) == 3
record1 = fake_transport.records[0].asdict()
record2 = fake_transport.records[1].asdict()
record3 = fake_transport.records[2].asdict()
assert record2['parentId'] == record3['id']
async def consume_message(message, tracer):
await asyncio.sleep(0.1)
headers = message.get('headers', None)
context = az.make_context(headers)
with tracer.new_child(context) as span_consumer:
span_consumer.name('consumer event')
span_consumer.remote_endpoint('broker', ipv4='127.0.0.1', port=9011)
span_consumer.kind(az.CONSUMER)
with tracer.new_child(span_consumer.context) as span_worker:
span_worker.name('process event')
await asyncio.sleep(0.1)