Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# aws gzip compression.
# https://github.com/boto/botocore/issues/1255
url = request.url
headers = request.headers
data = request.body
headers['Accept-Encoding'] = 'identity'
headers_ = MultiDict(
(z[0], _text(z[1], encoding='utf-8')) for z in headers.items())
# botocore does this during the request so we do this here as well
# TODO: this should be part of the ClientSession, perhaps make wrapper
proxy = self.proxies.get(urlparse(url.lower()).scheme)
if isinstance(data, io.IOBase):
data = _IOBaseWrapper(data)
url = URL(url, encoded=True)
resp = await self.http_session.request(
request.method, url=url, headers=headers_, data=data, proxy=proxy)
# If we're not streaming, read the content so we can retry any timeout
# errors, see:
# https://github.com/boto/botocore/blob/develop/botocore/vendored/requests/sessions.py#L604
if not request.stream_output:
await resp.read()
return resp