Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_trace_nested_broken_traces():
@trace()
def f1():
pass
@trace()
def f2():
pass
recorder = Recorder()
opentracing.tracer = BasicTracer(recorder=recorder)
test_span = opentracing.tracer.start_span(operation_name='test_trace')
with test_span:
f1()
broken_span = opentracing.tracer.start_span(operation_name='broken_trace')
with broken_span:
f1(span=broken_span)
# Broken traces does not work with stack inspection, it is better to pass the span in this case!
f2(span=test_span)
assert len(recorder.spans) == 5
assert recorder.spans[0].context.trace_id == test_span.context.trace_id
def test_trace_mutliple_spans():
@trace()
def parent():
nested()
@trace()
def nested(**kwargs):
assert is_span_in_kwargs(**kwargs) is False
recorder = Recorder()
opentracing.tracer = BasicTracer(recorder=recorder)
test_span_first = opentracing.tracer.start_span(operation_name='test_trace_first')
with test_span_first:
parent()
assert len(recorder.spans) == 3
assert recorder.spans[0].context.trace_id == test_span_first.context.trace_id
assert recorder.spans[0].parent_id == recorder.spans[1].context.span_id
assert recorder.spans[1].context.trace_id == test_span_first.context.trace_id
assert recorder.spans[1].parent_id == test_span_first.context.span_id
assert recorder.spans[-1].parent_id is None
assert recorder.spans[-1].operation_name == 'test_trace_first'
def test_get_new_span_with_extractor():
opentracing.tracer = BasicTracer()
parent_span = opentracing.tracer.start_span()
extractor = MagicMock()
extractor.return_value = parent_span
ctx = '123'
def f(ctx, extras=True):
pass
span_arg_name, span = get_new_span(f, [ctx], {'extras': True}, span_extractor=extractor, inspect_stack=False)
assert DEFAULT_SPAN_ARG_NAME == span_arg_name
assert span.parent_id == parent_span.context.span_id
extractor.assert_called_with(ctx, extras=True)
def test_trace_requests_no_propagators(monkeypatch):
resp = Response()
resp.status_code = 200
resp.url = URL
send_request_mock = MagicMock()
send_request_mock.return_value = resp
logger = MagicMock()
monkeypatch.setattr('opentracing_utils.libs._requests.__requests_http_send', send_request_mock)
monkeypatch.setattr('opentracing_utils.libs._requests.logger', logger)
recorder = Recorder()
opentracing.tracer = BasicTracer(recorder=recorder)
top_span = opentracing.tracer.start_span(operation_name='top_span')
with top_span:
session = requests.Session()
session.headers.update({CUSTOM_HEADER: CUSTOM_HEADER_VALUE})
response = session.get(URL)
assert len(recorder.spans) == 2
assert recorder.spans[0].context.trace_id == top_span.context.trace_id
assert recorder.spans[0].parent_id == recorder.spans[1].context.span_id
assert recorder.spans[-1].operation_name == 'top_span'
assert response.status_code == resp.status_code
def test_span_corrupted_invalid_traceid_value():
tracer = BasicTracer()
tracer.register_required_propagators()
# Given a carrier with invalid 'ot-tracer-traceid' value
headers = {
'ot-tracer-traceid': 'nothex',
'ot-tracer-sampled': 'false',
'ot-tracer-spanid': '1c3b00da',
}
# When .extract is called
with pytest.raises(SpanContextCorruptedException) as exc:
tracer.extract(Format.TEXT_MAP, headers)
# Then it should raise SpanContextCorruptedException
assert str(exc.value) == (
"ot-tracer-traceid got an invalid hexadecimal value 'nothex'"
def recorder():
recorder = Recorder()
recorder.spans = []
opentracing.tracer = BasicTracer(recorder=recorder)
yield recorder
del(recorder)
def test_trace_requests_no_error_tag(monkeypatch):
resp = Response()
resp.status_code = 400
resp.url = URL
trace_requests(set_error_tag=False)
monkeypatch.setattr('opentracing_utils.libs._requests.__requests_http_send', assert_send_request_mock(resp))
recorder = Recorder()
t = BasicTracer(recorder=recorder)
t.register_required_propagators()
opentracing.tracer = t
top_span = opentracing.tracer.start_span(operation_name='top_span')
with top_span:
response = requests.get(URL, headers={CUSTOM_HEADER: CUSTOM_HEADER_VALUE})
assert len(recorder.spans) == 2
assert recorder.spans[0].context.trace_id == top_span.context.trace_id
assert recorder.spans[0].parent_id == recorder.spans[-1].context.span_id
assert response.status_code == resp.status_code
assert recorder.spans[0].tags[tags.HTTP_STATUS_CODE] == resp.status_code
assert recorder.spans[0].tags[tags.HTTP_URL] == URL
if pass_span:
current_span = extract_span_from_kwargs(**kwargs)
assert current_span.operation_name == 'parent'
nested()
@trace(pass_span=pass_span)
def nested(**kwargs):
assert is_span_in_kwargs(**kwargs) is pass_span
if pass_span:
current_span = extract_span_from_kwargs(**kwargs)
assert current_span.operation_name == 'nested'
recorder = Recorder()
opentracing.tracer = BasicTracer(recorder=recorder)
test_span = opentracing.tracer.start_span(operation_name='test_trace')
with test_span:
parent()
assert len(recorder.spans) == 3
assert recorder.spans[0].context.trace_id == test_span.context.trace_id
assert recorder.spans[0].parent_id == recorder.spans[1].context.span_id
assert recorder.spans[1].context.trace_id == test_span.context.trace_id
assert recorder.spans[1].parent_id == test_span.context.span_id
assert recorder.spans[-1].parent_id is None
resp.url = URL
send_request_mock = MagicMock()
send_request_mock.return_value = resp
extract_span_mock = MagicMock()
extract_span_mock.return_value = None, None
monkeypatch.setattr('opentracing_utils.libs._requests.__requests_http_send', send_request_mock)
monkeypatch.setattr('opentracing_utils.libs._requests.get_span_from_kwargs', extract_span_mock)
logger = MagicMock()
monkeypatch.setattr('opentracing_utils.libs._requests.logger', logger)
recorder = Recorder()
t = BasicTracer(recorder=recorder)
t.register_required_propagators()
opentracing.tracer = t
session = requests.Session()
session.headers.update({CUSTOM_HEADER: CUSTOM_HEADER_VALUE})
response = session.get(URL)
assert response.status_code == resp.status_code
logger.warn.assert_called_once()
def test_span_log_kv():
recorder = InMemoryRecorder()
tracer = BasicTracer(recorder=recorder)
span = tracer.start_span('x')
span.log_kv({
'foo': 'bar',
'baz': 42,
})
span.finish()
finished_spans = recorder.get_spans()
assert len(finished_spans) == 1
assert len(finished_spans[0].logs) == 1
assert len(finished_spans[0].logs[0].key_values) == 2
assert finished_spans[0].logs[0].key_values['foo'] == 'bar'
assert finished_spans[0].logs[0].key_values['baz'] == 42