Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
wrapped(mock_self, method, url, body, headers)
expected_attributes = {
'http.url': url,
'http.method': method}
expected_name = '[httplib]request'
mock_request_func.assert_called_with(
mock_self, method, url, body, {
'traceparent': '00-123-456-01',
}
)
self.assertEqual(expected_attributes,
mock_tracer.span.attributes)
self.assertEqual(expected_name, mock_tracer.span.name)
self.assertEqual(span_module.SpanKind.CLIENT, mock_tracer.span.span_kind)
'get_opencensus_tracer',
return_value=mock_tracer)
wrapped = trace.wrap_requests(mock_func)
url = 'http://localhost:8080'
with patch:
wrapped(url)
expected_attributes = {
'http.url': url,
'http.status_code': '200'}
expected_name = '[requests]get'
self.assertEqual(span_module.SpanKind.CLIENT,
mock_tracer.current_span.span_kind)
self.assertEqual(expected_attributes,
mock_tracer.current_span.attributes)
self.assertEqual(expected_name, mock_tracer.current_span.name)
return_value=mock_tracer)
url = 'http://localhost:8080'
request_method = 'POST'
kwargs = {}
with patch:
trace.wrap_session_request(
wrapped, 'Session.request', (request_method, url), kwargs)
expected_attributes = {
'http.url': url,
'http.status_code': '200'}
expected_name = '[requests]POST'
self.assertEqual(span_module.SpanKind.CLIENT,
mock_tracer.current_span.span_kind)
self.assertEqual(expected_attributes,
mock_tracer.current_span.attributes)
self.assertEqual(kwargs['headers']['x-trace'], 'some-value')
self.assertEqual(expected_name, mock_tracer.current_span.name)
path='/',
headers={pyramid_trace_header: pyramid_trace_id},
)
middleware._before_request(request)
tracer = execution_context.get_opencensus_tracer()
self.assertIsNotNone(tracer)
span = tracer.current_span()
expected_attributes = {
'http.url': u'/',
'http.method': 'GET',
}
self.assertEqual(span.span_kind, span_module.SpanKind.SERVER)
self.assertEqual(span.attributes, expected_attributes)
self.assertEqual(span.parent_span.span_id, span_id)
span_context = tracer.span_context
self.assertEqual(span_context.trace_id, trace_id)
blacklist_hostnames = execution_context.get_opencensus_attr(
'blacklist_hostnames')
parsed_url = urlparse(url)
if parsed_url.port is None:
dest_url = parsed_url.hostname
else:
dest_url = '{}:{}'.format(parsed_url.hostname, parsed_url.port)
if utils.disable_tracing_hostname(dest_url, blacklist_hostnames):
return requests_func(url, *args, **kwargs)
path = parsed_url.path if parsed_url.path else '/'
_tracer = execution_context.get_opencensus_tracer()
_span = _tracer.start_span()
_span.name = '{}'.format(path)
_span.span_kind = span_module.SpanKind.CLIENT
# Add the requests host to attributes
_tracer.add_attribute_to_current_span(
HTTP_HOST, dest_url)
# Add the requests method to attributes
_tracer.add_attribute_to_current_span(
HTTP_METHOD, requests_func.__name__.upper())
# Add the requests path to attributes
_tracer.add_attribute_to_current_span(
HTTP_PATH, path)
# Add the requests url to attributes
_tracer.add_attribute_to_current_span(HTTP_URL, url)
the communication between SQLAlchemy and the database will also
be traced. To avoid the verbose spans, you can just trace SQLAlchemy.
See: http://docs.sqlalchemy.org/en/latest/core/events.html#sqlalchemy.
events.ConnectionEvents.before_cursor_execute
"""
# Find out the func name
if executemany:
query_func = 'executemany'
else:
query_func = 'execute'
_tracer = execution_context.get_opencensus_tracer()
_span = _tracer.start_span()
_span.name = '{}.query'.format(MODULE_NAME)
_span.span_kind = span_module.SpanKind.CLIENT
# Set query statement attribute
_tracer.add_attribute_to_current_span(
'{}.query'.format(MODULE_NAME), statement)
# Set query parameters attribute
_tracer.add_attribute_to_current_span(
'{}.query.parameters'.format(MODULE_NAME), str(parameters))
# Set query function attribute
_tracer.add_attribute_to_current_span(
'{}.cursor.method.name'.format(MODULE_NAME),
query_func)
def call(query, *args, **kwargs):
_tracer = execution_context.get_opencensus_tracer()
_span = _tracer.start_span()
_span.name = 'mysql.query'
_span.span_kind = span_module.SpanKind.CLIENT
_tracer.add_attribute_to_current_span('mysql.query', query)
_tracer.add_attribute_to_current_span(
'mysql.cursor.method.name',
query_func.__name__)
result = query_func(query, *args, **kwargs)
_tracer.end_span()
return result
return call
"""End a span. Update the span_id in SpanContext to the current span's
parent span id; Update the current span.
"""
cur_span = self.current_span()
if cur_span is None and self._spans_list:
cur_span = self._spans_list[-1]
if cur_span is None:
logging.warning('No active span, cannot do end_span.')
return
cur_span.finish()
self.span_context.span_id = cur_span.parent_span.span_id if \
cur_span.parent_span else None
if isinstance(cur_span.parent_span, trace_span.Span):
execution_context.set_current_span(cur_span.parent_span)
else:
execution_context.set_current_span(None)
with self._spans_list_condition:
if cur_span in self._spans_list:
span_datas = self.get_span_datas(cur_span)
self.exporter.export(span_datas)
self._spans_list.remove(cur_span)
return cur_span
def started(self, event):
span = self.tracer.start_span(
name='{}.{}.{}.{}'.format(
MODULE_NAME,
event.database_name,
event.command.get(event.command_name),
event.command_name,
)
)
span.span_kind = span_module.SpanKind.CLIENT
self.tracer.add_attribute_to_current_span('component', 'mongodb')
self.tracer.add_attribute_to_current_span('db.type', 'mongodb')
self.tracer.add_attribute_to_current_span(
'db.instance', event.database_name
)
self.tracer.add_attribute_to_current_span(
'db.statement', event.command.get(event.command_name)
)
for attr in COMMAND_ATTRIBUTES:
_attr = event.command.get(attr)
if _attr is not None:
self.tracer.add_attribute_to_current_span(attr, str(_attr))
self.tracer.add_attribute_to_current_span(