Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_decorating_test(client):
assert respx.stats.call_count == 0
request = respx.get("https://foo.bar/", status_code=202)
response = await client.get("https://foo.bar/")
assert request.called is True
assert response.status_code == 202
assert respx.stats.call_count == 1
async def test(backend):
url = "https://foo.bar/1/"
respx.get(re.compile("https://some.thing"))
respx.delete("https://some.thing")
foobar1 = respx.get(url, status_code=202, alias="get_foobar")
foobar2 = respx.delete(url, status_code=200, alias="del_foobar")
assert foobar1.called is False
assert foobar1.call_count == len(foobar1.calls)
assert foobar1.call_count == 0
assert respx.stats.call_count == len(respx.calls)
assert respx.stats.call_count == 0
async with httpx.AsyncClient() as client:
get_response = await client.get(url)
del_response = await client.delete(url)
assert foobar1.called is True
def test():
assert respx.stats.call_count == 0
request = respx.get("https://foo.bar/", status_code=202)
response = httpx.get("https://foo.bar/")
assert request.called is True
assert response.status_code == 202
assert respx.stats.call_count == 1
async def test_proxies():
with respx.mock:
respx.get("https://foo.bar/", content={"foo": "bar"})
with httpx.Client(proxies={"https": "https://1.1.1.1:1"}) as client:
response = client.get("https://foo.bar/")
assert response.json() == {"foo": "bar"}
async with respx.mock:
respx.get("https://foo.bar/", content={"foo": "bar"})
async with httpx.AsyncClient(proxies={"https": "https://1.1.1.1:1"}) as client:
response = await client.get("https://foo.bar/")
assert response.json() == {"foo": "bar"}
async def test_http_methods(client):
async with respx.mock:
url = "https://foo.bar/"
m = respx.get(url, status_code=404)
respx.post(url, status_code=201)
respx.put(url, status_code=202)
respx.patch(url, status_code=500)
respx.delete(url, status_code=204)
respx.head(url, status_code=405)
respx.options(url, status_code=501)
response = httpx.get(url)
assert response.status_code == 404
response = await client.get(url)
assert response.status_code == 404
response = httpx.post(url)
assert response.status_code == 201
response = await client.post(url)
assert response.status_code == 201
async def test_nested_global_contextmanager(client):
with respx.mock:
get_request = respx.get("https://foo/bar/", status_code=202)
with respx.mock:
post_request = respx.post("https://foo/bar/", status_code=201)
response = await client.get("https://foo/bar/")
assert get_request.called is True
assert response.status_code == 202
assert respx.stats.call_count == 1
response = await client.post("https://foo/bar/")
assert post_request.called is True
assert response.status_code == 201
assert respx.stats.call_count == 2
async def test_proxies():
with respx.mock:
respx.get("https://foo.bar/", content={"foo": "bar"})
with httpx.Client(proxies={"https": "https://1.1.1.1:1"}) as client:
response = client.get("https://foo.bar/")
assert response.json() == {"foo": "bar"}
async with respx.mock:
respx.get("https://foo.bar/", content={"foo": "bar"})
async with httpx.AsyncClient(proxies={"https": "https://1.1.1.1:1"}) as client:
response = await client.get("https://foo.bar/")
assert response.json() == {"foo": "bar"}
async def test_parallel_requests(client):
async def content(request, page):
await asyncio.sleep(0.2 if page == "one" else 0.1)
return page
url_pattern = re.compile(r"https://foo/(?P\w+)/$")
respx.get(url_pattern, content=content)
responses = await asyncio.gather(
client.get("https://foo/one/"), client.get("https://foo/two/")
)
response_one, response_two = responses
assert response_one.text == "one"
assert response_two.text == "two"
assert respx.stats.call_count == 2
async def test_asgi():
async with respx.mock:
async with httpx.AsyncClient(app="fake-asgi") as client:
url = "https://foo.bar/"
content = lambda request: {"status": "ok"}
headers = {"X-Foo": "bar"}
request = respx.get(url, status_code=202, headers=headers, content=content)
response = await client.get(url)
assert request.called is True
assert response.status_code == 202
assert response.headers == httpx.Headers(
{"Content-Type": "application/json", **headers}
)
assert response.json() == {"status": "ok"}
async def test_global_contextmanager(client):
with respx.mock:
assert respx.stats.call_count == 0
request = respx.get("https://foo/bar/", status_code=202)
response = await client.get("https://foo/bar/")
assert request.called is True
assert response.status_code == 202
assert respx.stats.call_count == 1
async with respx.mock:
assert respx.stats.call_count == 0
request = respx.get("https://foo/bar/", status_code=202)
response = await client.get("https://foo/bar/")
assert request.called is True
assert response.status_code == 202
assert respx.stats.call_count == 1
assert respx.stats.call_count == 0