Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_readline_new_line_after_size():
body = Body(io.BytesIO(b"abc\ndef"))
assert body.readline(2) == b"ab"
assert body.readline() == b"c\n"
def test_readline_no_new_line():
body = Body(io.BytesIO(b"abcdef"))
assert body.readline() == b"abcdef"
body = Body(io.BytesIO(b"abcdef"))
assert body.readline(2) == b"ab"
assert body.readline(2) == b"cd"
assert body.readline(2) == b"ef"
def assert_readline(payload, size, expected):
body = Body(io.BytesIO(payload))
assert body.readline(size) == expected
def set_body_reader(self):
super(Request, self).set_body_reader()
if isinstance(self.body.reader, EOFReader):
self.body = Body(LengthReader(self.unreader, 0))
def set_body_reader(self):
chunked = False
content_length = None
for (name, value) in self.headers:
if name == "CONTENT-LENGTH":
content_length = value
elif name == "TRANSFER-ENCODING":
chunked = value.lower() == "chunked"
elif name == "SEC-WEBSOCKET-KEY1":
content_length = 8
if chunked:
self.body = Body(ChunkedReader(self, self.unreader))
elif content_length is not None:
try:
content_length = int(content_length)
except ValueError:
raise InvalidHeader("CONTENT-LENGTH", req=self)
if content_length < 0:
raise InvalidHeader("CONTENT-LENGTH", req=self)
self.body = Body(LengthReader(self.unreader, content_length))
else:
self.body = Body(EOFReader(self.unreader))
def set_body_reader(self):
chunked = False
content_length = None
for (name, value) in self.headers:
if name == "CONTENT-LENGTH":
content_length = value
elif name == "TRANSFER-ENCODING":
chunked = value.lower() == "chunked"
elif name == "SEC-WEBSOCKET-KEY1":
content_length = 8
if chunked:
self.body = Body(ChunkedReader(self, self.unreader))
elif content_length is not None:
try:
content_length = int(content_length)
except ValueError:
raise InvalidHeader("CONTENT-LENGTH", req=self)
if content_length < 0:
raise InvalidHeader("CONTENT-LENGTH", req=self)
self.body = Body(LengthReader(self.unreader, content_length))
else:
self.body = Body(EOFReader(self.unreader))
chunked = value.lower() == "chunked"
elif name == "SEC-WEBSOCKET-KEY1":
content_length = 8
if chunked:
self.body = Body(ChunkedReader(self, self.unreader))
elif content_length is not None:
try:
content_length = int(content_length)
except ValueError:
raise InvalidHeader("CONTENT-LENGTH", req=self)
if content_length < 0:
raise InvalidHeader("CONTENT-LENGTH", req=self)
self.body = Body(LengthReader(self.unreader, content_length))
else:
self.body = Body(EOFReader(self.unreader))
def set_body_reader(self):
chunked = False
content_length = None
for (name, value) in self.headers:
if name == "CONTENT-LENGTH":
content_length = value
elif name == "TRANSFER-ENCODING":
chunked = value.lower() == "chunked"
elif name == "SEC-WEBSOCKET-KEY1":
content_length = 8
if chunked:
self.body = Body(ChunkedReader(self, self.unreader))
elif content_length is not None:
try:
content_length = int(content_length)
except ValueError:
raise InvalidHeader("CONTENT-LENGTH", req=self)
if content_length < 0:
raise InvalidHeader("CONTENT-LENGTH", req=self)
self.body = Body(LengthReader(self.unreader, content_length))
else:
self.body = Body(EOFReader(self.unreader))
content_length = 8
if chunked:
self.body = Body(ChunkedReader(self, self.unreader))
elif content_length is not None:
try:
content_length = int(content_length)
except ValueError:
raise InvalidHeader("CONTENT-LENGTH", req=self)
if content_length < 0:
raise InvalidHeader("CONTENT-LENGTH", req=self)
self.body = Body(LengthReader(self.unreader, content_length))
else:
self.body = Body(EOFReader(self.unreader))
chunked = value.lower() == "chunked"
elif name == "SEC-WEBSOCKET-KEY1":
content_length = 8
if chunked:
self.body = Body(ChunkedReader(self, self.unreader))
elif content_length is not None:
try:
content_length = int(content_length)
except ValueError:
raise InvalidHeader("CONTENT-LENGTH", req=self)
if content_length < 0:
raise InvalidHeader("CONTENT-LENGTH", req=self)
self.body = Body(LengthReader(self.unreader, content_length))
else:
self.body = Body(EOFReader(self.unreader))