Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def normalize_and_validate(headers, _parsed=False):
new_headers = []
saw_content_length = False
saw_transfer_encoding = False
for name, value in headers:
# For headers coming out of the parser, we can safely skip some steps,
# because it always returns bytes and has already run these regexes
# over the data:
if not _parsed:
name = bytesify(name)
value = bytesify(value)
validate(_field_name_re, name, "Illegal header name {!r}", name)
validate(_field_value_re, value, "Illegal header value {!r}", value)
name = name.lower()
if name == b"content-length":
if saw_content_length:
raise LocalProtocolError("multiple Content-Length headers")
validate(_content_length_re, value, "bad Content-Length")
saw_content_length = True
if name == b"transfer-encoding":
# "A server that receives a request message with a transfer coding
# it does not understand SHOULD respond with 501 (Not
# Implemented)."
# https://tools.ietf.org/html/rfc7230#section-3.3.1
if saw_transfer_encoding:
raise LocalProtocolError(
"multiple Transfer-Encoding headers", error_status_hint=501
)
# "All transfer-coding names are case-insensitive"
saw_content_length = False
saw_transfer_encoding = False
for name, value in headers:
# For headers coming out of the parser, we can safely skip some steps,
# because it always returns bytes and has already run these regexes
# over the data:
if not _parsed:
name = bytesify(name)
value = bytesify(value)
validate(_field_name_re, name, "Illegal header name {!r}", name)
validate(_field_value_re, value, "Illegal header value {!r}", value)
name = name.lower()
if name == b"content-length":
if saw_content_length:
raise LocalProtocolError("multiple Content-Length headers")
validate(_content_length_re, value, "bad Content-Length")
saw_content_length = True
if name == b"transfer-encoding":
# "A server that receives a request message with a transfer coding
# it does not understand SHOULD respond with 501 (Not
# Implemented)."
# https://tools.ietf.org/html/rfc7230#section-3.3.1
if saw_transfer_encoding:
raise LocalProtocolError(
"multiple Transfer-Encoding headers", error_status_hint=501
)
# "All transfer-coding names are case-insensitive"
# -- https://tools.ietf.org/html/rfc7230#section-4
value = value.lower()
if value != b"chunked":
raise LocalProtocolError(
"Only Transfer-Encoding: chunked is supported",
def _validate(self):
# "A server MUST respond with a 400 (Bad Request) status code to any
# HTTP/1.1 request message that lacks a Host header field and to any
# request message that contains more than one Host header field or a
# Host header field with an invalid field-value."
# -- https://tools.ietf.org/html/rfc7230#section-5.4
host_count = 0
for name, value in self.headers:
if name == b"host":
host_count += 1
if self.http_version == b"1.1" and host_count == 0:
raise LocalProtocolError("Missing mandatory Host: header")
if host_count > 1:
raise LocalProtocolError("Found multiple Host: headers")
validate(request_target_re, self.target, "Illegal target characters")
def _decode_header_lines(lines):
for line in _obsolete_line_fold(lines):
# _obsolete_line_fold yields either bytearray or bytes objects. On
# Python 3, validate() takes either and returns matches as bytes. But
# on Python 2, validate can return matches as bytearrays, so we have
# to explicitly cast back.
matches = validate(header_field_re, bytes(line))
yield (matches["field_name"], matches["field_value"])
def maybe_read_from_SEND_RESPONSE_server(buf):
lines = buf.maybe_extract_lines()
if lines is None:
return None
if not lines:
raise LocalProtocolError("no response line received")
matches = validate(status_line_re, lines[0])
# Tolerate missing reason phrases
if matches["reason"] is None:
matches["reason"] = b""
status_code = matches["status_code"] = int(matches["status_code"])
class_ = InformationalResponse if status_code < 200 else Response
return class_(
headers=list(_decode_header_lines(lines[1:])), _parsed=True, **matches
)