Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_received_bad_host_header(self):
from waitress.utilities import BadRequest
data = b"HTTP/1.0 GET /foobar\r\n Host: foo\r\n\r\n"
result = self.parser.received(data)
self.assertEqual(result, 36)
self.assertTrue(self.parser.completed)
self.assertEqual(self.parser.error.__class__, BadRequest)
from waitress.utilities import BadRequest
data = (
b"GET /foobar HTTP/1.1\r\n"
b"Transfer-Encoding: chunked\r\n"
b"X-Foo: 1\r\n"
b"\r\n"
b"garbage\r\n"
)
# header
result = self.parser.received(data)
# body
result = self.parser.received(data[result:])
self.assertEqual(result, 9)
self.assertTrue(self.parser.completed)
self.assertTrue(isinstance(self.parser.error, BadRequest))
def test_received_chunk_not_properly_terminated(self):
from waitress.utilities import BadRequest
buf = DummyBuffer()
inst = self._makeOne(buf)
data = b"4\r\nWikibadchunk\r\n"
result = inst.received(data)
self.assertEqual(result, len(data))
self.assertEqual(inst.completed, False)
self.assertEqual(buf.data[0], b"Wiki")
self.assertEqual(inst.error.__class__, BadRequest)
self.validate_chunk_end = True
elif self.validate_chunk_end:
s = self.chunk_end + s
pos = s.find(b"\r\n")
if pos < 0 and len(s) < 2:
self.chunk_end = s
s = b""
else:
self.chunk_end = b""
if pos == 0:
# Chop off the terminating CR LF from the chunk
s = s[2:]
else:
self.error = BadRequest("Chunk not properly terminated")
self.all_chunks_received = True
# Always exit this loop
self.validate_chunk_end = False
elif not self.all_chunks_received:
# Receive a control line.
s = self.control_line + s
pos = s.find(b"\r\n")
if pos < 0:
# Control line not finished.
self.control_line = s
s = b""
else:
# Control line finished.
line = s[:pos]
header_plus = s[:index]
# Remove preceeding blank lines. This is suggested by
# https://tools.ietf.org/html/rfc7230#section-3.5 to support
# clients sending an extra CR LF after another request when
# using HTTP pipelining
header_plus = header_plus.lstrip()
if not header_plus:
self.empty = True
self.completed = True
else:
try:
self.parse_header(header_plus)
except ParsingError as e:
self.error = BadRequest(e.args[0])
self.completed = True
except TransferEncodingNotImplemented as e:
self.error = ServerNotImplemented(e.args[0])
self.completed = True
else:
if self.body_rcv is None:
# no content-length header and not a t-e: chunked
# request
self.completed = True
if self.content_length > 0:
max_body = self.adj.max_request_body_size
# we won't accept this request if the content-length
# is too large
if self.content_length >= max_body:
line = s[:pos]
s = s[pos + 2 :]
self.control_line = b""
line = line.strip()
if line:
# Begin a new chunk.
semi = line.find(b";")
if semi >= 0:
# discard extension info.
line = line[:semi]
try:
sz = int(line.strip(), 16) # hexadecimal
except ValueError: # garbage in input
self.error = BadRequest("garbage in chunked encoding input")
sz = 0
if sz > 0:
# Start a new chunk.
self.chunk_remainder = sz
else:
# Finished chunks.
self.all_chunks_received = True
# else expect a control line.
else:
# Receive the trailer.
trailer = self.trailer + s
if trailer.startswith(b"\r\n"):
# No trailer.
self.completed = True
s = self.header_plus + data
index = find_double_newline(s)
if index >= 0:
# Header finished.
header_plus = s[:index]
consumed = len(data) - (len(s) - index)
# Remove preceeding blank lines.
header_plus = header_plus.lstrip()
if not header_plus:
self.empty = True
self.completed = True
else:
try:
self.parse_header(header_plus)
except ParsingError as e:
self.error = BadRequest(e.args[0])
self.completed = True
else:
if self.body_rcv is None:
# no content-length header and not a t-e: chunked
# request
self.completed = True
if self.content_length > 0:
max_body = self.adj.max_request_body_size
# we won't accept this request if the content-length
# is too large
if self.content_length >= max_body:
self.error = RequestEntityTooLarge(
'exceeds max_body of %s' % max_body)
self.completed = True
self.headers_finished = True
return consumed
line = s[:pos]
s = s[pos + 2 :]
self.control_line = b""
line = line.strip()
if line:
# Begin a new chunk.
semi = line.find(b";")
if semi >= 0:
# discard extension info.
line = line[:semi]
try:
sz = int(line.strip(), 16) # hexadecimal
except ValueError: # garbage in input
self.error = BadRequest("garbage in chunked encoding input")
sz = 0
if sz > 0:
# Start a new chunk.
self.chunk_remainder = sz
else:
# Finished chunks.
self.all_chunks_received = True
# else expect a control line.
else:
# Receive the trailer.
trailer = self.trailer + s
if trailer.startswith(b"\r\n"):
# No trailer.
self.completed = True
self.validate_chunk_end = True
elif self.validate_chunk_end:
s = self.chunk_end + s
pos = s.find(b"\r\n")
if pos < 0 and len(s) < 2:
self.chunk_end = s
s = b""
else:
self.chunk_end = b""
if pos == 0:
# Chop off the terminating CR LF from the chunk
s = s[2:]
else:
self.error = BadRequest("Chunk not properly terminated")
self.all_chunks_received = True
# Always exit this loop
self.validate_chunk_end = False
elif not self.all_chunks_received:
# Receive a control line.
s = self.control_line + s
pos = s.find(b"\r\n")
if pos < 0:
# Control line not finished.
self.control_line = s
s = b""
else:
# Control line finished.
line = s[:pos]