Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_getting_resource(event_loop):
"""Simple getting of resource."""
aioboto3.DEFAULT_SESSION = None
resource = aioboto3.resource('dynamodb', loop=event_loop, region_name='eu-central-1')
assert isinstance(resource.meta.client, AioBaseClient)
await resource.close()
async def test_getting_client(event_loop):
"""Simple getting of client."""
aioboto3.DEFAULT_SESSION = None
async with aioboto3.client('ssm', loop=event_loop, region_name='eu-central-1') as client:
assert isinstance(client, AioBaseClient)
def _create_client_class(self, service_name, service_model):
class_attributes = self._create_methods(service_model)
py_name_to_operation_name = self._create_name_mapping(service_model)
class_attributes['_PY_TO_OP_NAME'] = py_name_to_operation_name
bases = [AioBaseClient]
service_id = service_model.service_id.hyphenize()
self._event_emitter.emit(
'creating-client-class.%s' % service_id,
class_attributes=class_attributes,
base_classes=bases)
class_name = get_service_module_name(service_model)
cls = type(str(class_name), tuple(bases), class_attributes)
return cls
def patch():
if getattr(aiobotocore.client, '_datadog_patch', False):
return
setattr(aiobotocore.client, '_datadog_patch', True)
wrapt.wrap_function_wrapper('aiobotocore.client', 'AioBaseClient._make_api_call', _wrapped_api_call)
Pin(service='aws', app='aws').onto(aiobotocore.client.AioBaseClient)