Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def unpatch():
import rq
if not getattr(rq, '_datadog_patch', False):
return
Pin.remove_from(rq)
# Unpatch rq.job.Job
Pin.remove_from(rq.job.Job)
_uw(rq.job.Job, 'perform')
_uw(rq.job.Job, 'fetch')
# Unpatch rq.queue.Queue
Pin.remove_from(rq.queue.Queue)
_uw(rq.queue.Queue, 'enqueue_job')
_uw(rq.queue.Queue, 'fetch_job')
# Unpatch rq.worker.Worker
Pin.remove_from(rq.worker.Worker)
_uw(rq.worker.Worker, 'perform_job')
setattr(rq, '_datadog_patch', False)
def unpatch():
if getattr(boto.connection, '_datadog_patch', False):
setattr(boto.connection, '_datadog_patch', False)
unwrap(boto.connection.AWSQueryConnection, 'make_request')
unwrap(boto.connection.AWSAuthConnection, 'make_request')
def unpatch():
if getattr(kombu, '_datadog_patch', False):
setattr(kombu, '_datadog_patch', False)
unwrap(kombu.Producer, '_publish')
unwrap(kombu.Consumer, 'receive')
def unpatch():
"""Disables Context Propagation between threads"""
if not getattr(futures, '__datadog_patch', False):
return
setattr(futures, '__datadog_patch', False)
_u(futures.ThreadPoolExecutor, 'submit')
def unpatch():
"""
Remove all tracing functions in a Tornado web application.
"""
if not getattr(tornado, '__datadog_patch', False):
return
setattr(tornado, '__datadog_patch', False)
# unpatch Tornado
_u(tornado.web.RequestHandler, '_execute')
_u(tornado.web.RequestHandler, 'on_finish')
_u(tornado.web.RequestHandler, 'log_exception')
_u(tornado.web.Application, '__init__')
_u(tornado.concurrent, 'run_on_executor')
_u(tornado.template.Template, 'generate')
# unpatch `futures`
compat.unwrap_futures()
def unpatch():
if getattr(redis, '_datadog_patch', False):
setattr(redis, '_datadog_patch', False)
with require_package('redis<3.0.0') as exists:
if exists:
unwrap(redis.StrictRedis, 'execute_command')
unwrap(redis.StrictRedis, 'pipeline')
unwrap(redis.Redis, 'pipeline')
unwrap(redis.client.BasePipeline, 'execute')
unwrap(redis.client.BasePipeline, 'immediate_execute_command')
with require_package('redis>=3.0.0') as exists:
if exists:
unwrap(redis.Redis, 'execute_command')
unwrap(redis.Redis, 'pipeline')
unwrap(redis.client.Pipeline, 'execute')
unwrap(redis.client.Pipeline, 'immediate_execute_command')
def unpatch():
"""
Remove all tracing functions in a Tornado web application.
"""
if not getattr(tornado, '__datadog_patch', False):
return
setattr(tornado, '__datadog_patch', False)
# unpatch Tornado
_u(tornado.web.RequestHandler, '_execute')
_u(tornado.web.RequestHandler, 'on_finish')
_u(tornado.web.RequestHandler, 'log_exception')
_u(tornado.web.Application, '__init__')
_u(tornado.concurrent, 'run_on_executor')
_u(tornado.template.Template, 'generate')
# unpatch `futures`
compat.unwrap_futures()
import rq
if not getattr(rq, '_datadog_patch', False):
return
Pin.remove_from(rq)
# Unpatch rq.job.Job
Pin.remove_from(rq.job.Job)
_uw(rq.job.Job, 'perform')
_uw(rq.job.Job, 'fetch')
# Unpatch rq.queue.Queue
Pin.remove_from(rq.queue.Queue)
_uw(rq.queue.Queue, 'enqueue_job')
_uw(rq.queue.Queue, 'fetch_job')
# Unpatch rq.worker.Worker
Pin.remove_from(rq.worker.Worker)
_uw(rq.worker.Worker, 'perform_job')
setattr(rq, '_datadog_patch', False)
def unpatch():
if algoliasearch_version == (0, 0):
return
if getattr(algoliasearch, DD_PATCH_ATTR, False):
setattr(algoliasearch, DD_PATCH_ATTR, False)
if algoliasearch_version < (2, 0) and algoliasearch_version >= (1, 0):
_u(algoliasearch.index.Index, 'search')
elif algoliasearch_version >= (2, 0) and algoliasearch_version < (3, 0):
from algoliasearch import search_index
_u(search_index.SearchIndex, 'search')
else:
return
def unpatch():
"""Remove instrumentation
"""
if getattr(molten, '_datadog_patch', False):
setattr(molten, '_datadog_patch', False)
# remove pin
pin = Pin.get_from(molten)
if pin:
pin.remove_from(molten)
_u(molten.BaseApp, '__init__')
_u(molten.App, '__call__')
_u(molten.Router, 'add_route')