Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_connector_timeout(event_loop):
session = AioSession(loop=event_loop)
config = AioConfig(max_pool_connections=1, connect_timeout=1,
retries={'max_attempts': 0})
async with AIOServer() as server, \
session.create_client('s3', config=config,
endpoint_url=server.endpoint_url,
aws_secret_access_key='xxx',
aws_access_key_id='xxx') as s3_client:
async def get_and_wait():
await s3_client.get_object(Bucket='foo', Key='bar')
await asyncio.sleep(100)
task1 = asyncio.Task(get_and_wait(), loop=event_loop)
task2 = asyncio.Task(get_and_wait(), loop=event_loop)
try:
def get_session(*, env_vars=None, loop=None):
"""
Return a new session object.
"""
loop = loop or asyncio.get_event_loop()
return AioSession(env_vars, loop=loop)