Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_patch_before_import(self):
from ddtrace import patch
patch(celery=True)
import celery
app = celery.Celery()
assert Pin.get_from(app) is not None
def test_execute_wrapped_is_called_and_returned(self):
cursor = self.cursor
cursor.rowcount = 0
cursor.execute.return_value = '__result__'
pin = Pin('pin_name', tracer=self.tracer)
traced_cursor = FetchTracedCursor(cursor, pin)
assert '__result__' == traced_cursor.execute('__query__', 'arg_1', kwarg1='kwarg1')
cursor.execute.assert_called_once_with('__query__', 'arg_1', kwarg1='kwarg1')
def test_render_template_string_pin_disabled(self):
"""
When we call a patched ``flask.render_template_string``
When the app's ``Pin`` is disabled
We do not create any spans
"""
pin = Pin.get_from(self.app)
pin.tracer.enabled = False
with self.app.app_context():
with self.app.test_request_context('/'):
response = flask.render_template_string('hello {{world}}', world='world')
self.assertEqual(response, 'hello world')
self.assertEqual(len(self.get_spans()), 0)
def get_spans(self):
pin = Pin.get_from(self.client)
tracer = pin.tracer
spans = tracer.writer.pop()
return spans
def test_correct_span_names_can_be_overridden_by_pin(self):
cursor = self.cursor
tracer = self.tracer
cursor.rowcount = 0
pin = Pin('pin_name', app='changed', tracer=tracer)
traced_cursor = FetchTracedCursor(cursor, pin)
traced_cursor.execute('arg_1', kwarg1='kwarg1')
self.assert_structure(dict(name='changed.query'))
self.reset()
traced_cursor.executemany('arg_1', kwarg1='kwarg1')
self.assert_structure(dict(name='changed.query'))
self.reset()
traced_cursor.callproc('arg_1', 'arg2')
self.assert_structure(dict(name='changed.query'))
self.reset()
traced_cursor.fetchone('arg_1', kwarg1='kwarg1')
self.assert_structure(dict(name='changed.query.fetchone'))
def test_sqlite(self):
# ensure we can trace multiple services without stomping
services = ['db', 'another']
for service in services:
db = sqlite3.connect(':memory:')
pin = Pin.get_from(db)
assert pin
pin.clone(
service=service,
tracer=self.tracer).onto(db)
# Ensure we can run a query and it's correctly traced
q = 'select * from sqlite_master'
start = time.time()
cursor = db.execute(q)
self.assertIsInstance(cursor, TracedSQLiteCursor)
rows = cursor.fetchall()
end = time.time()
assert not rows
self.assert_structure(
dict(name='sqlite.query', span_type='sql', resource=q, service=service, error=0),
)
def test_correct_span_names(self):
cursor = self.cursor
tracer = self.tracer
cursor.rowcount = 0
pin = Pin('pin_name', tracer=tracer)
traced_cursor = TracedCursor(cursor, pin)
traced_cursor.execute('arg_1', kwarg1='kwarg1')
self.assert_structure(dict(name='sql.query'))
self.reset()
traced_cursor.executemany('arg_1', kwarg1='kwarg1')
self.assert_structure(dict(name='sql.query'))
self.reset()
traced_cursor.callproc('arg_1', 'arg2')
self.assert_structure(dict(name='sql.query'))
self.reset()
traced_cursor.fetchone('arg_1', kwarg1='kwarg1')
self.assert_has_no_spans()
)
alias = getattr(conn, 'alias', 'default')
service = '{}{}{}'.format(database_prefix, alias, 'db')
vendor = getattr(conn, 'vendor', 'db')
prefix = sqlx.normalize_vendor(vendor)
tags = {
'django.db.vendor': vendor,
'django.db.alias': alias,
}
tracer.set_service_info(
service=service,
app=prefix,
app_type=AppTypes.db,
)
pin = Pin(service, tags=tags, tracer=tracer, app=prefix)
return DbApiTracedCursor(conn._datadog_original_cursor(), pin)
# not provide it as an argument
if client is None:
client = _MongoClient(*args, **kwargs)
# else client is a value for host so just pass it along
else:
client = _MongoClient(client, *args, **kwargs)
super(TracedMongoClient, self).__init__(client)
# NOTE[matt] the TracedMongoClient attempts to trace all of the network
# calls in the trace library. This is good because it measures the
# actual network time. It's bad because it uses a private API which
# could change. We'll see how this goes.
client._topology = TracedTopology(client._topology)
# Default Pin
ddtrace.Pin(service=mongox.TYPE, app=mongox.TYPE, app_type=AppTypes.db).onto(self)
def patch_app(app, pin=None):
"""Attach the Pin class to the application and connect
our handlers to Celery signals.
"""
if getattr(app, '__datadog_patch', False):
return
setattr(app, '__datadog_patch', True)
# attach the PIN object
pin = pin or Pin(
service=config.celery['worker_service_name'],
app=APP,
app_type=AppTypes.worker,
_config=config.celery,
)
pin.onto(app)
# connect to the Signal framework
signals.task_prerun.connect(trace_prerun, weak=False)
signals.task_postrun.connect(trace_postrun, weak=False)
signals.before_task_publish.connect(trace_before_publish, weak=False)
signals.after_task_publish.connect(trace_after_publish, weak=False)
signals.task_failure.connect(trace_failure, weak=False)
signals.task_retry.connect(trace_retry, weak=False)
return app