Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def unpatch():
if getattr(aiobotocore.client, '_datadog_patch', False):
setattr(aiobotocore.client, '_datadog_patch', False)
unwrap(aiobotocore.client.AioBaseClient, '_make_api_call')
def patch():
"""
Patch aiobotocore client so it generates subsegments
when calling AWS services.
"""
if hasattr(aiobotocore.client, '_xray_enabled'):
return
setattr(aiobotocore.client, '_xray_enabled', True)
wrapt.wrap_function_wrapper(
'aiobotocore.client',
'AioBaseClient._make_api_call',
_xray_traced_aiobotocore,
)
wrapt.wrap_function_wrapper(
'aiobotocore.endpoint',
'AioEndpoint.prepare_request',
inject_header,
)
def patch():
if getattr(aiobotocore.client, '_datadog_patch', False):
return
setattr(aiobotocore.client, '_datadog_patch', True)
wrapt.wrap_function_wrapper('aiobotocore.client', 'AioBaseClient._make_api_call', _wrapped_api_call)
Pin(service='aws', app='aws').onto(aiobotocore.client.AioBaseClient)
def patch():
"""
Patch aiobotocore client so it generates subsegments
when calling AWS services.
"""
if hasattr(aiobotocore.client, '_xray_enabled'):
return
setattr(aiobotocore.client, '_xray_enabled', True)
wrapt.wrap_function_wrapper(
'aiobotocore.client',
'AioBaseClient._make_api_call',
_xray_traced_aiobotocore,
)
wrapt.wrap_function_wrapper(
'aiobotocore.endpoint',
'AioEndpoint.prepare_request',
inject_header,
)