Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
url = request.url
headers = [(_encode(k), _encode(v)) for k, v in request.headers.items()]
if not request.body:
body = b""
elif isinstance(request.body, str):
body = _encode(request.body)
else:
body = request.body
if isinstance(timeout, tuple):
timeout_kwargs = {"connect_timeout": timeout[0], "read_timeout": timeout[1]}
else:
timeout_kwargs = {"connect_timeout": timeout, "read_timeout": timeout}
timeout = http3.TimeoutConfig(**timeout_kwargs)
try:
response = await self.pool.request(
method,
url,
headers=headers,
data=body,
cert=cert,
verify=verify,
timeout=timeout,
)
except socket.error as err:
raise ConnectionError(err, request=request)
except http3.ConnectTimeout as err:
raise ConnectTimeout(err, request=request)
except http3.ReadTimeout as err: