Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_unpatch_before_import(self):
patch()
unpatch()
import vertica_python
assert not isinstance(vertica_python.Connection.cursor, wrapt.ObjectProxy)
assert not isinstance(
vertica_python.vertica.cursor.Cursor.execute, wrapt.ObjectProxy
)
def assert_is_not_wrapped(self, obj):
self.assertFalse(isinstance(obj, wrapt.ObjectProxy), '{} is wrapped'.format(obj))
def prepare(self, *args, **kwargs):
func = self.__wrapped__.prepare
if not args:
return func(*args, **kwargs)
conn = args[0]
# prepare performs a c-level check of the object type so
# we must be sure to pass in the actual db connection
if isinstance(conn, wrapt.ObjectProxy):
conn = conn.__wrapped__
return func(conn, *args[1:], **kwargs)
def unpatch():
if isinstance(mysql.connector.connect, wrapt.ObjectProxy):
mysql.connector.connect = mysql.connector.connect.__wrapped__
if hasattr(mysql.connector, 'Connect'):
mysql.connector.Connect = mysql.connector.connect
@wrapt.patch_function_wrapper(patch_mod, patch_class_routine)
def wrapper(wrapped, instance, args, kwargs):
# TODO?: remove Pin dependence
pin = Pin.get_from(instance)
if patch_routine in pin._config['routines']:
conf = pin._config['routines'][patch_routine]
else:
conf = _find_routine_config(config, instance, patch_routine)
enabled = conf.get('trace_enabled', True)
span = None
try:
# shortcut if not enabled
if not enabled:
def patch():
"""Patch the instrumented methods
"""
if getattr(django, '_datadog_patch', False):
return
setattr(django, '_datadog_patch', True)
_w = wrapt.wrap_function_wrapper
_w('django', 'setup', traced_setup)
def patch():
wrapt.wrap_function_wrapper('pymysql', 'connect', _connect)
def patch():
if getattr(boto.connection, '_datadog_patch', False):
return
setattr(boto.connection, '_datadog_patch', True)
# AWSQueryConnection and AWSAuthConnection are two different classes called by
# different services for connection.
# For exemple EC2 uses AWSQueryConnection and S3 uses AWSAuthConnection
wrapt.wrap_function_wrapper(
'boto.connection', 'AWSQueryConnection.make_request', patched_query_request
)
wrapt.wrap_function_wrapper(
'boto.connection', 'AWSAuthConnection.make_request', patched_auth_request
)
Pin(service='aws', app='aws', app_type='web').onto(
boto.connection.AWSQueryConnection
)
Pin(service='aws', app='aws', app_type='web').onto(
boto.connection.AWSAuthConnection
)
return trace_wrapped(resource, func, *args, **kwargs)
# TODO[tahir]: the signature of a wrapped resolve method causes DIError to
# be thrown since paramter types cannot be determined
class WrapperRenderer(wrapt.ObjectProxy):
""" Tracing of renderers """
def render(self, *args, **kwargs):
func = self.__wrapped__.render
cname = func_name(self.__wrapped__)
resource = '{}.{}'.format(cname, func.__name__)
return trace_wrapped(resource, func, *args, **kwargs)
class WrapperMiddleware(wrapt.ObjectProxy):
""" Tracing of callable functional-middleware """
def __call__(self, *args, **kwargs):
func = self.__wrapped__.__call__
resource = func_name(self.__wrapped__)
return trace_wrapped(resource, func, *args, **kwargs)
class WrapperRouter(wrapt.ObjectProxy):
""" Tracing of router on the way back from a matched route """
def match(self, *args, **kwargs):
# catch matched route and wrap tracer around its handler and set root span resource
func = self.__wrapped__.match
route_and_params = func(*args, **kwargs)
pin = Pin.get_from(molten)
if not pin or not pin.enabled():
# 3p
from ddtrace.vendor import wrapt
# project
import ddtrace
from ddtrace.ext import mongo as mongox
from ddtrace.contrib.pymongo.client import TracedMongoClient
# TODO(Benjamin): we should instrument register_connection instead, because more generic
# We should also extract the "alias" attribute and set it as a meta
class WrappedConnect(wrapt.ObjectProxy):
""" WrappedConnect wraps mongoengines 'connect' function to ensure
that all returned connections are wrapped for tracing.
"""
def __init__(self, connect):
super(WrappedConnect, self).__init__(connect)
ddtrace.Pin(service=mongox.SERVICE, tracer=ddtrace.tracer).onto(self)
def __call__(self, *args, **kwargs):
client = self.__wrapped__(*args, **kwargs)
pin = ddtrace.Pin.get_from(self)
if pin:
# mongoengine uses pymongo internally, so we can just piggyback on the
# existing pymongo integration and make sure that the connections it
# uses internally are traced.
client = TracedMongoClient(client)