How to use the blacksheep.client.ClientSession function in blacksheep

To help you get started, we’ve selected a few blacksheep examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github RobertoPrevato / BlackSheep / tests / client / test_middlewares.py View on Github external
async def test_single_middleware():
    fake_pools = FakePools([Response(200, None, TextContent('Hello, World!'))])

    steps = []

    async def middleware_one(request, next_handler):
        steps.append(1)
        response = await next_handler(request)
        steps.append(2)
        return response

    async with ClientSession(base_url=b'http://localhost:8080',
                             pools=fake_pools,
                             middlewares=[middleware_one]
                             ) as client:
        response = await client.get(b'/')

        assert steps == [1, 2]
        assert response.status == 200
        text = await response.text()
        assert text == 'Hello, World!'
github RobertoPrevato / BlackSheep / tests / client / test_cookiejar.py View on Github external
fake_pools = FakePools([Response(200, set_cookies, TextContent('Hello, World!')),
                            Response(200, None, TextContent('Hello!'))])
    check_cookie = False

    async def middleware_for_assertions(request, next_handler):
        if check_cookie:
            if not expected_cookies:
                assert not request.cookies

            for expected_cookie in expected_cookies:
                cookie = request.cookies.get(expected_cookie)
                assert cookie is not None, f'{cookie.name.decode()} cookie must be configured for following requests'

        return await next_handler(request)

    async with ClientSession(pools=fake_pools,
                             middlewares=[middleware_for_assertions],
                             ) as client:
        await client.get(first_request_url)
        check_cookie = True
        await client.get(second_request_url)
github RobertoPrevato / BlackSheep / tests / client / test_timeouts.py View on Github external
async def test_connection_timeout():
    fake_pools = FakePools([])
    fake_pools.pool.sleep_for = 5  # wait for 5 seconds before returning a connection; to test timeout handling

    async with ClientSession(base_url=b'http://localhost:8080',
                             pools=fake_pools,
                             connection_timeout=0.002  # 2ms - not realistic, but ok for this test
                             ) as client:
        with pytest.raises(ConnectionTimeout):
            await client.get(b'/')
github RobertoPrevato / BlackSheep / tests / client / test_headers.py View on Github external
async def test_request_headers():
    fake_pools = FakePools([Response(200, [], TextContent('Hello, World!'))])

    async def middleware_for_assertions(request, next_handler):
        assert b'Hello' in request.headers
        assert request.headers.get_single(b'Hello') == b'World'

        return await next_handler(request)

    async with ClientSession(base_url=b'http://localhost:8080',
                             pools=fake_pools,
                             middlewares=[middleware_for_assertions]
                             ) as client:
        await client.get(b'/', headers=[(b'Hello', b'World')])
        await client.post(b'/', headers=[(b'Hello', b'World')])
        await client.put(b'/', headers=[(b'Hello', b'World')])
        await client.delete(b'/', headers=[(b'Hello', b'World')])
github RobertoPrevato / BlackSheep / tests / client / test_redirects.py View on Github external
async def test_not_follow_redirect(responses, expected_location, pools_factory):

    async with ClientSession(base_url=b'http://localhost:8080',
                             pools=pools_factory(responses),
                             follow_redirects=False) as client:
        response = await client.get(b'/')

        assert response.status == 302

        location = response.headers[b'location']
        assert location
        assert location[0] == expected_location
github RobertoPrevato / BlackSheep / tests / client / test_redirects.py View on Github external
async def test_non_url_redirect(responses, expected_status, expected_location, pools_factory):

    async with ClientSession(base_url=b'http://localhost:8080', pools=pools_factory(responses)) as client:
        response = await client.get(b'/')

        assert response is not None
        assert response.status == expected_status

        location_header = response.headers.get_single(b'Location')
        assert location_header == expected_location
github RobertoPrevato / BlackSheep / tests / client / test_query.py View on Github external
async def test_query_params_concatenation(request_url, params, expected_query):
    fake_pools = FakePools([Response(200, None, TextContent('Hello, World!'))])

    async def middleware_for_assertions(request, next_handler):
        assert expected_query == request.url.query
        return await next_handler(request)

    async with ClientSession(base_url=b'http://localhost:8080',
                             pools=fake_pools,
                             middlewares=[middleware_for_assertions]) as client:
        await client.get(request_url, params=params)
github RobertoPrevato / BlackSheep / tests / client / test_cookiejar.py View on Github external
async def test_cookies_jar_single_cookie():
    fake_pools = FakePools([Response(200,
                                     [(b'Set-Cookie', write_response_cookie(Cookie(b'X-Foo', b'Foo')))])
                           .with_content(TextContent('Hello, World!')),
                            Response(200, None, TextContent('Hello!'))])
    check_cookie = False

    async def middleware_for_assertions(request, next_handler):
        if check_cookie:
            cookie = request.cookies.get('X-Foo')
            assert cookie is not None, 'X-Foo cookie must be configured for following requests'

        return await next_handler(request)

    async with ClientSession(base_url=b'https://bezkitu.org',
                             pools=fake_pools,
                             middlewares=[middleware_for_assertions]) as client:
        await client.get(b'/')  # the first request doesn't have any cookie because the response will set;
        check_cookie = True
        await client.get(b'/')
github RobertoPrevato / BlackSheep / tests / client / test_headers.py View on Github external
async def test_request_headers_override_default_header():
    fake_pools = FakePools([Response(200, [], TextContent('Hello, World!'))])

    async def middleware_for_assertions(request, next_handler):
        assert b'hello' in request.headers
        assert request.headers.get_single(b'hello') == b'Kitty'

        assert b'Foo' in request.headers
        assert request.headers.get_single(b'Foo') == b'Power'

        return await next_handler(request)

    async with ClientSession(base_url=b'http://localhost:8080',
                             pools=fake_pools,
                             middlewares=[middleware_for_assertions],
                             default_headers=[(b'Hello', b'World'),
                                              (b'Foo', b'Power')]
                             ) as client:
        await client.get(b'/', headers=[(b'Hello', b'Kitty')])
github RobertoPrevato / BlackSheep / tests / client / test_middlewares.py View on Github external
steps.append(2)
        return response

    async def middleware_two(request, next_handler):
        steps.append(3)
        response = await next_handler(request)
        steps.append(4)
        return response

    async def middleware_three(request, next_handler):
        steps.append(5)
        response = await next_handler(request)
        steps.append(6)
        return response

    async with ClientSession(base_url=b'http://localhost:8080',
                             pools=fake_pools) as client:
        client.add_middlewares([middleware_one])
        client.add_middlewares([middleware_two])
        client.add_middlewares([middleware_three])

        assert middleware_one in client._middlewares
        assert middleware_two in client._middlewares
        assert middleware_three in client._middlewares

        client._build_middlewares_chain()

        response = await client.get(b'/')

        assert steps == [1, 3, 5, 6, 4, 2]
        assert response.status == 200
        text = await response.text()