Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_invalid_response_headers(self, sock):
for key_header, invalid_value in ((b'upgrade', b'boom'),
(b'connection', b'bim')):
s = MagicMock()
sock.socket.return_value = s
sock.getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 0, "",
("127.0.0.1", 80, 0, 0))]
c = WebSocketBaseClient(url="ws://127.0.0.1/?token=value")
status_line = b"HTTP/1.1 101 Switching Protocols"
headers = {
b"connection": b"Upgrade",
b"Sec-Websocket-Version": b"13",
b"Content-Type": b"text/plain;charset=utf-8",
b"Sec-Websocket-Accept": b64encode(sha1(c.key + WS_KEY).digest()),
b"upgrade": b"websocket",
b"Date": b"Sun, 26 Jul 2015 12:32:55 GMT",
b"Server": b"ws4py/test"
}
headers[key_header] = invalid_value
request = [status_line] + [k + b" : " + v for (k, v) in headers.items()] + [b'\r\n']
s.recv.return_value = b"\r\n".join(request)
self.assertRaises(HandshakeError, c.connect)
s.shutdown.assert_called_once_with(socket.SHUT_RDWR)
s.close.assert_called_once_with()
sock.reset_mock()
exts = []
ws_extensions = []
extensions = headers.get('Sec-WebSocket-Extensions')
if extensions:
for ext in extensions.split(','):
ext = ext.strip()
if ext in exts:
ws_extensions.append(ext)
response = [req_protocol + b' 101 Switching Protocols']
response.append(b'Upgrade: websocket')
response.append(b'Content-Type: text/plain')
response.append(b'Content-Length: 0')
response.append(b'Connection: Upgrade')
response.append(b'Sec-WebSocket-Version:' + bytes(str(version), 'utf-8'))
response.append(b'Sec-WebSocket-Accept:' + base64.b64encode(sha1(key.encode('utf-8') + WS_KEY).digest()))
if ws_protocols:
response.append(b'Sec-WebSocket-Protocol:' + b', '.join(ws_protocols))
if ws_extensions:
response.append(b'Sec-WebSocket-Extensions:' + b','.join(ws_extensions))
response.append(b'')
response.append(b'')
self.writer.write(CRLF.join(response))
yield from self.handle_websocket()
if subprotocols:
for s in subprotocols.split(','):
s = s.strip()
if s in protocols:
ws_protocols.append(s)
ws_extensions = []
exts = self.extensions or []
extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS')
if extensions:
for ext in extensions.split(','):
ext = ext.strip()
if ext in exts:
ws_extensions.append(ext)
accept_value = base64.b64encode(sha1(key.encode('utf-8') + WS_KEY).digest())
if py3k: accept_value = accept_value.decode('utf-8')
upgrade_headers = [
('Upgrade', 'websocket'),
('Connection', 'Upgrade'),
('Sec-WebSocket-Version', '%s' % version),
('Sec-WebSocket-Accept', accept_value),
]
if ws_protocols:
upgrade_headers.append(('Sec-WebSocket-Protocol', ', '.join(ws_protocols)))
if ws_extensions:
upgrade_headers.append(('Sec-WebSocket-Extensions', ','.join(ws_extensions)))
start_response("101 Switching Protocols", upgrade_headers)
self.make_websocket(environ['ws4py.socket'],
ws_protocols,
# Collect supported extensions
exts = self.extensions or []
ws_extensions = []
extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS')
if extensions:
for ext in extensions.split(','):
ext = ext.strip()
if ext in exts:
ws_extensions.append(ext)
# Build and start the HTTP response
headers = [
('Upgrade', 'websocket'),
('Connection', 'Upgrade'),
('Sec-WebSocket-Version', environ['websocket.version']),
('Sec-WebSocket-Accept', base64.b64encode(sha1(key + WS_KEY).digest())),
]
if ws_protocols:
headers.append(('Sec-WebSocket-Protocol', ', '.join(ws_protocols)))
if ws_extensions:
headers.append(('Sec-WebSocket-Extensions', ','.join(ws_extensions)))
start_response("101 Web Socket Hybi Handshake", headers)
if 'upgrade.socket' in environ:
upgrade_socket = environ['upgrade.socket']
else:
upgrade_socket = environ['wsgi.input']._sock
return self.app(self.websocket_class(upgrade_socket,
ws_protocols,
ws_extensions,
for ext in extensions.split(','):
ext = ext.strip()
if ext in exts:
ws_extensions.append(ext)
self.ws.protocols = ws_protocols
self.ws.extensions = ws_extensions
self.ws.headers = headers
response = [req_protocol + b' 101 Switching Protocols']
response.append(b'Upgrade: websocket')
response.append(b'Content-Type: text/plain')
response.append(b'Content-Length: 0')
response.append(b'Connection: Upgrade')
response.append(b'Sec-WebSocket-Version:' + bytes(str(version), 'utf-8'))
response.append(b'Sec-WebSocket-Accept:' + base64.b64encode(sha1(key.encode('utf-8') + WS_KEY).digest()))
if ws_protocols:
response.append(b'Sec-WebSocket-Protocol:' + b', '.join(ws_protocols))
if ws_extensions:
response.append(b'Sec-WebSocket-Extensions:' + b','.join(ws_extensions))
response.append(b'')
response.append(b'')
self.writer.write(CRLF.join(response))
yield from self.handle_websocket()
headers = headers.strip()
for header_line in headers.split(b'\r\n'):
header, value = header_line.split(b':', 1)
header = header.strip().lower()
value = value.strip().lower()
if header == b'upgrade' and value != b'websocket':
raise HandshakeError("Invalid Upgrade header: %s" % value)
elif header == b'connection' and value != b'upgrade':
raise HandshakeError("Invalid Connection header: %s" % value)
elif header == b'sec-websocket-accept':
match = b64encode(sha1(self.key + WS_KEY).digest())
if value != match.lower():
raise HandshakeError("Invalid challenge response: %s" % value)
elif header == b'sec-websocket-protocol':
protocols = ','.join(value)
elif header == b'sec-websocket-extensions':
extensions = ','.join(value)
return protocols, extensions
headers = headers.strip()
for header_line in headers.split(b'\r\n'):
header, value = header_line.split(b':', 1)
header = header.strip().lower()
value = value.strip().lower()
if header == 'upgrade' and value != 'websocket':
raise HandshakeError("Invalid Upgrade header: %s" % value)
elif header == 'connection' and value != 'upgrade':
raise HandshakeError("Invalid Connection header: %s" % value)
elif header == 'sec-websocket-accept':
match = b64encode(sha1(self.key.encode('utf-8') + WS_KEY).digest())
if value != match.lower():
raise HandshakeError("Invalid challenge response: %s" % value)
elif header == 'sec-websocket-protocol':
protocols = ','.join(value)
elif header == 'sec-websocket-extensions':
extensions = ','.join(value)
return protocols, extensions
if subprotocols:
for s in subprotocols.split(','):
s = s.strip()
if s in protocols:
ws_protocols.append(s)
ws_extensions = []
exts = self.extensions or []
extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS')
if extensions:
for ext in extensions.split(','):
ext = ext.strip()
if ext in exts:
ws_extensions.append(ext)
accept_value = base64.b64encode(sha1(key.encode('utf-8') + WS_KEY).digest())
if py3k: accept_value = accept_value.decode('utf-8')
upgrade_headers = [
('Upgrade', 'websocket'),
('Connection', 'Upgrade'),
('Sec-WebSocket-Version', '%s' % version),
('Sec-WebSocket-Accept', accept_value),
]
if ws_protocols:
upgrade_headers.append(('Sec-WebSocket-Protocol', ', '.join(ws_protocols)))
if ws_extensions:
upgrade_headers.append(('Sec-WebSocket-Extensions', ','.join(ws_extensions)))
start_response("101 Switching Protocols", upgrade_headers)
self.make_websocket(environ['ws4py.socket'],
ws_protocols,
headers = headers.strip()
for header_line in headers.split(b'\r\n'):
header, value = header_line.split(b':', 1)
header = header.strip().lower()
value = value.strip().lower()
if header == b'upgrade' and value != b'websocket':
raise HandshakeError("Invalid Upgrade header: %s" % value)
elif header == b'connection' and value != b'upgrade':
raise HandshakeError("Invalid Connection header: %s" % value)
elif header == b'sec-websocket-accept':
match = b64encode(sha1(self.key + WS_KEY).digest())
if value != match.lower():
raise HandshakeError("Invalid challenge response: %s" % value)
elif header == b'sec-websocket-protocol':
protocols.extend([x.strip() for x in value.split(b',')])
elif header == b'sec-websocket-extensions':
extensions.extend([x.strip() for x in value.split(b',')])
return protocols, extensions
location.append('localhost')
if include_port:
location.append(":%d" % request.local.port)
location.append(request.path_info)
if request.query_string != "":
location.append("?%s" % request.query_string)
ws_location = ''.join(location)
response = cherrypy.serving.response
response.stream = True
response.status = '101 Switching Protocols'
response.headers['Content-Type'] = 'text/plain'
response.headers['Upgrade'] = 'websocket'
response.headers['Connection'] = 'Upgrade'
response.headers['Sec-WebSocket-Version'] = str(version)
response.headers['Sec-WebSocket-Accept'] = base64.b64encode(sha1(key.encode('utf-8') + WS_KEY).digest())
if ws_protocols:
response.headers['Sec-WebSocket-Protocol'] = ', '.join(ws_protocols)
if ws_extensions:
response.headers['Sec-WebSocket-Extensions'] = ','.join(ws_extensions)
addr = (request.remote.ip, request.remote.port)
rfile = request.rfile.rfile
if isinstance(rfile, KnownLengthRFile):
rfile = rfile.rfile
ws_conn = get_connection(rfile)
request.ws_handler = handler_cls(ws_conn, ws_protocols, ws_extensions,
request.wsgi_environ.copy(),
heartbeat_freq=heartbeat_freq)