Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(
self, status_code, body=b'',
headers={'Content-Type': 'text/plain'}, cookies={}
):
super().__init__(status_code, headers=headers, cookies=cookies)
self.body = body
async def _send_body(self, send):
await send({
'type': 'http.response.body',
'body': self.body,
'more_body': False
})
class HTTP(HTTPResponse):
def __init__(
self, status_code, body=u'',
headers={'Content-Type': 'text/plain'}, cookies={}
):
super().__init__(status_code, headers=headers, cookies=cookies)
self.body = body
@property
def encoded_body(self):
return self.body.encode('utf-8')
async def _send_body(self, send):
await send({
'type': 'http.response.body',
'body': self.encoded_body,
'more_body': False
async def dynamic_handler(self, scope, receive, send):
try:
http = await self.app._router_http.dispatch()
except HTTPResponse as http_exception:
http = http_exception
#: render error with handlers if in app
error_handler = self.app.error_handlers.get(http.status_code)
if error_handler:
http = HTTP(
http.status_code, await error_handler(), response.headers)
#: always set cookies
http.set_cookies(response.cookies)
return http
async def _send_headers(self, send):
await send({
'type': 'http.response.start',
'status': self.status_code,
'headers': self.headers
})
async def _send_body(self, send):
await send({'type': 'http.response.body'})
async def send(self, scope, send):
await self._send_headers(send)
await self._send_body(send)
class HTTPBytes(HTTPResponse):
def __init__(
self, status_code, body=b'',
headers={'Content-Type': 'text/plain'}, cookies={}
):
super().__init__(status_code, headers=headers, cookies=cookies)
self.body = body
async def _send_body(self, send):
await send({
'type': 'http.response.body',
'body': self.body,
'more_body': False
})
class HTTP(HTTPResponse):
super().__init__(status_code, headers=headers, cookies=cookies)
self.body = body
@property
def encoded_body(self):
return self.body.encode('utf-8')
async def _send_body(self, send):
await send({
'type': 'http.response.body',
'body': self.encoded_body,
'more_body': False
})
class HTTPRedirect(HTTPResponse):
def __init__(self, status_code, location):
location = location.replace('\r', '%0D').replace('\n', '%0A')
super().__init__(status_code, headers={'Location': location})
# async def send(self, scope, send):
# await self._send_headers(send)
# await send({'type': 'http.response.body'})
class HTTPFile(HTTPResponse):
def __init__(self, file_path, headers={}, cookies={}, chunk_size=4096):
super().__init__(200, headers=headers, cookies=cookies)
self.file_path = file_path
self.chunk_size = chunk_size
def _get_stat_headers(self, stat_data):
await HTTP(404).send(scope, send)
async def _send_body(self, send):
async with aiofiles.open(self.file_path, mode='rb') as f:
more_body = True
while more_body:
chunk = await f.read(self.chunk_size)
more_body = len(chunk) == self.chunk_size
await send({
'type': 'http.response.body',
'body': chunk,
'more_body': more_body,
})
class HTTPIO(HTTPResponse):
def __init__(self, io_stream, headers={}, cookies={}, chunk_size=4096):
super().__init__(200, headers=headers, cookies=cookies)
self.io_stream = io_stream
self.chunk_size = chunk_size
def _get_io_headers(self):
content_length = str(self.io_stream.getbuffer().nbytes)
return {
'Content-Length': content_length
}
async def send(self, scope, send):
self._headers.update(self._get_io_headers())
await self._send_headers(send)
await self._send_body(send)
'body': self.encoded_body,
'more_body': False
})
class HTTPRedirect(HTTPResponse):
def __init__(self, status_code, location):
location = location.replace('\r', '%0D').replace('\n', '%0A')
super().__init__(status_code, headers={'Location': location})
# async def send(self, scope, send):
# await self._send_headers(send)
# await send({'type': 'http.response.body'})
class HTTPFile(HTTPResponse):
def __init__(self, file_path, headers={}, cookies={}, chunk_size=4096):
super().__init__(200, headers=headers, cookies=cookies)
self.file_path = file_path
self.chunk_size = chunk_size
def _get_stat_headers(self, stat_data):
content_length = str(stat_data.st_size)
last_modified = formatdate(stat_data.st_mtime, usegmt=True)
etag_base = str(stat_data.st_mtime) + '_' + str(stat_data.st_size)
etag = md5(etag_base.encode('utf-8')).hexdigest()
return {
'Content-Type': contenttype(self.file_path),
'Content-Length': content_length,
'Last-Modified': last_modified,
'Etag': etag
}
if not self.template:
self.template = f.__name__ + self.app.template_default_extension
if self.template_folder:
self.template = os.path.join(self.template_folder, self.template)
pipeline_obj = Pipeline(self.pipeline)
wrapped_f = pipeline_obj(f)
self.pipeline_flow_open = pipeline_obj._flow_open()
self.pipeline_flow_close = pipeline_obj._flow_close()
self.f = wrapped_f
output_type = pipeline_obj._output_type() or self.output_type
self.response_builders = {
method.upper(): self.router._outputs[output_type](self)
for method in self.methods
}
if 'head' in self.response_builders:
self.response_builders['head'].http_cls = HTTPResponse
for idx, path in enumerate(self.paths):
self.router.add_route(HTTPRoute(self, path, idx))
return f