Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sample_with_rate():
sampler = Sampler(sample_rate=0.3, seed=123)
trace_id = 'bde15168450e7097008c7aab41c27ade'
assert sampler.is_sampled(trace_id)
assert sampler.is_sampled(trace_id)
assert not sampler.is_sampled(trace_id)
def test_sample_never():
sampler = Sampler(sample_rate=0.0)
trace_id = 'bde15168450e7097008c7aab41c27ade'
assert not sampler.is_sampled(trace_id)
assert not sampler.is_sampled(trace_id)
assert not sampler.is_sampled(trace_id)
def test_sample_always():
sampler = Sampler(sample_rate=1.0)
trace_id = 'bde15168450e7097008c7aab41c27ade'
assert sampler.is_sampled(trace_id)
assert sampler.is_sampled(trace_id)
assert sampler.is_sampled(trace_id)
def create_custom(local_endpoint: Endpoint,
transport: Optional[TransportABC] = None,
sampler: Optional[SamplerABC] = None) -> Awaitable[Tracer]:
t = transport or StubTransport()
sample_rate = 1 # sample everything
s = sampler or Sampler(sample_rate=sample_rate)
async def build_tracer() -> Tracer:
return Tracer(t, s, local_endpoint)
result: Awaitable[Tracer] = _ContextManager(build_tracer())
return result
async def build_tracer() -> Tracer:
sampler = Sampler(sample_rate=sample_rate)
transport = Transport(
zipkin_address, send_interval=send_interval, loop=loop)
return Tracer(transport, sampler, local_endpoint)
result = _ContextManager(build_tracer())