Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_exception_logging(self):
with tracer.start_active_span('test'):
try:
r = self.http.request('GET', testenv["wsgi_server"] + '/exception')
except Exception:
pass
spans = self.recorder.queued_spans()
self.assertEqual(4, len(spans))
wsgi_span = spans[1]
urllib3_span = spans[2]
test_span = spans[3]
assert(r)
self.assertEqual(500, r.status)
self.assertIsNone(tracer.active_span)
def test_prepend(self):
self.bucket.upsert("test_prepend", "one")
res = None
with tracer.start_active_span('test'):
res = self.bucket.prepend("test_prepend", "two")
assert(res)
self.assertTrue(res.success)
spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))
test_span = get_first_span_by_name(spans, 'sdk')
assert(test_span)
self.assertEqual(test_span.data["sdk"]["name"], 'test')
cb_span = get_first_span_by_name(spans, 'couchbase')
assert(cb_span)
# Same traceId and parent relationship
def test_extra_span(self):
with tracer.start_active_span('test'):
self.logger.warning('foo %s', 'bar')
spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))
self.assertEqual(2, spans[0].k)
self.assertEqual('foo bar', spans[0].data["log"].get('message'))
def test_basic_insert(self):
result = None
with tracer.start_active_span('test'):
result = self.cursor.execute(
"""INSERT INTO users(name, email) VALUES(%s, %s)""",
('beaker', 'beaker@muppets.com'))
self.assertEqual(1, result)
spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))
db_span = spans[0]
test_span = spans[1]
self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)
def test_get_request(self):
with tracer.start_active_span('test'):
response = self.http.request('GET', testenv["wsgi_server"] + '/')
spans = self.recorder.queued_spans()
self.assertEqual(3, len(spans))
wsgi_span = spans[0]
urllib3_span = spans[1]
test_span = spans[2]
assert response
self.assertEqual(200, response.status)
assert('X-Instana-T' in response.headers)
assert(int(response.headers['X-Instana-T'], 16))
self.assertEqual(response.headers['X-Instana-T'], wsgi_span.t)
def test_successful_aggregate_query(self):
with tracer.start_active_span("test"):
self.conn.test.records.count_documents({"type": "string"})
assert_is_none(tracer.active_span)
spans = self.recorder.queued_spans()
self.assertEqual(len(spans), 2)
db_span = spans[0]
test_span = spans[1]
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)
assert_is_none(db_span.ec)
self.assertEqual(db_span.n, "mongo")
def test_old_redis_client(self):
result = None
with tracer.start_active_span('test'):
self.client.set('foox', 'barX')
self.client.set('fooy', 'barY')
result = self.client.get('foox')
spans = self.recorder.queued_spans()
self.assertEqual(4, len(spans))
self.assertEqual(b'barX', result)
rs1_span = spans[0]
rs2_span = spans[1]
rs3_span = spans[2]
test_span = spans[3]
self.assertIsNone(tracer.active_span)
def test_custom_header_capture(self):
# Hack together a manual custom headers list
agent.extra_headers = [u'X-Capture-This', u'X-Capture-That']
request_headers = dict()
request_headers['X-Capture-This'] = 'this'
request_headers['X-Capture-That'] = 'that'
with tracer.start_active_span('test'):
response = self.http.request('GET', self.live_server_url + '/', headers=request_headers)
# response = self.client.get('/')
assert response
self.assertEqual(200, response.status)
spans = self.recorder.queued_spans()
self.assertEqual(3, len(spans))
test_span = spans[2]
urllib3_span = spans[1]
django_span = spans[0]
self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual("urllib3", urllib3_span.n)
self.assertEqual("django", django_span.n)
"""Modified start response with additional headers."""
tracer.inject(self.scope.span.context, ot.Format.HTTP_HEADERS, headers)
headers.append(('Server-Timing', "intid;desc=%s" % self.scope.span.context.trace_id))
res = start_response(status, headers, exc_info)
sc = status.split(' ')[0]
if 500 <= int(sc) <= 511:
self.scope.span.mark_as_errored()
self.scope.span.set_tag(tags.HTTP_STATUS_CODE, sc)
self.scope.close()
return res
ctx = tracer.extract(ot.Format.HTTP_HEADERS, env)
self.scope = tracer.start_active_span("wsgi", child_of=ctx)
if hasattr(agent, 'extra_headers') and agent.extra_headers is not None:
for custom_header in agent.extra_headers:
# Headers are available in this format: HTTP_X_CAPTURE_THIS
wsgi_header = ('HTTP_' + custom_header.upper()).replace('-', '_')
if wsgi_header in env:
self.scope.span.set_tag("http.%s" % custom_header, env[wsgi_header])
if 'PATH_INFO' in env:
self.scope.span.set_tag('http.path', env['PATH_INFO'])
if 'QUERY_STRING' in env and len(env['QUERY_STRING']):
scrubbed_params = strip_secrets(env['QUERY_STRING'], agent.secrets_matcher, agent.secrets_list)
self.scope.span.set_tag("http.params", scrubbed_params)
if 'REQUEST_METHOD' in env:
self.scope.span.set_tag(tags.HTTP_METHOD, env['REQUEST_METHOD'])
if 'HTTP_HOST' in env:
def callproc(self, proc_name, params):
parent_span = tracer.active_span
# If not tracing or we're being called from sqlalchemy, just pass through
if (parent_span is None) or (parent_span.operation_name == "sqlalchemy"):
return self.__wrapped__.execute(proc_name, params)
with tracer.start_active_span(self._module_name, child_of=parent_span) as scope:
try:
self._collect_kvs(scope.span, proc_name)
result = self.__wrapped__.callproc(proc_name, params)
except Exception as e:
if scope.span:
scope.span.log_exception(e)
raise
else:
return result