Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def process_response_line(self, response_line):
"""
Ensure that we received a HTTP `101` status code in
response to our request and if not raises :exc:`HandshakeError`.
"""
protocol, code, status = response_line.split(b' ', 2)
if code != b'101':
raise HandshakeError("Invalid response status: %s %s" % (code, status))
"""
protocols = []
extensions = []
headers = headers.strip()
for header_line in headers.split(b'\r\n'):
header, value = header_line.split(b':', 1)
header = header.strip().lower()
value = value.strip().lower()
if header == b'upgrade' and value != b'websocket':
raise HandshakeError("Invalid Upgrade header: %s" % value)
elif header == b'connection' and value != b'upgrade':
raise HandshakeError("Invalid Connection header: %s" % value)
elif header == b'sec-websocket-accept':
match = b64encode(sha1(self.key + WS_KEY).digest())
if value != match.lower():
raise HandshakeError("Invalid challenge response: %s" % value)
elif header == b'sec-websocket-protocol':
protocols = ','.join(value)
elif header == b'sec-websocket-extensions':
extensions = ','.join(value)
return protocols, extensions
if key:
ws_key = base64.b64decode(key.encode('utf-8'))
if len(ws_key) != 16:
raise HandshakeError("WebSocket key's length is invalid")
version = environ.get('HTTP_SEC_WEBSOCKET_VERSION')
supported_versions = b', '.join([unicode(v).encode('utf-8') for v in WS_VERSION])
version_is_valid = False
if version:
try: version = int(version)
except: pass
else: version_is_valid = version in WS_VERSION
if not version_is_valid:
environ['websocket.version'] = unicode(version).encode('utf-8')
raise HandshakeError('Unhandled or missing WebSocket version')
ws_protocols = []
protocols = self.protocols or []
subprotocols = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL')
if subprotocols:
for s in subprotocols.split(','):
s = s.strip()
if s in protocols:
ws_protocols.append(s)
ws_extensions = []
exts = self.extensions or []
extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS')
if extensions:
for ext in extensions.split(','):
ext = ext.strip()
def process_response_line(self, response_line):
"""
Ensure that we received a HTTP `101` status code in
response to our request and if not raises :exc:`HandshakeError`.
"""
protocol, code, status = response_line.split(b' ', 2)
if code != b'101':
raise HandshakeError("Invalid response status: %s %s" % (code, status))
def __handshake_completed(self, data):
self.io.set_close_callback(None)
try:
response_line, _, headers = data.partition(b'\r\n')
self.process_response_line(response_line)
protocols, extensions = self.process_handshake_header(headers)
except HandshakeError:
self.close_connection()
raise
self.opened()
self.io.set_close_callback(self.__stream_closed)
self.io.read_bytes(self.reading_buffer_size, self.__fetch_more)
self._write(self.handshake_request)
response = b''
doubleCLRF = b'\r\n\r\n'
while True:
bytes = self.sock.recv(128)
if not bytes:
break
response += bytes
if doubleCLRF in response:
break
if not response:
self.close_connection()
raise HandshakeError("Invalid response")
headers, _, body = response.partition(doubleCLRF)
response_line, _, headers = headers.partition(b'\r\n')
try:
self.process_response_line(response_line)
self.protocols, self.extensions = self.process_handshake_header(headers)
except HandshakeError:
self.close_connection()
raise
self.handshake_ok()
if body:
self.process(body)
self._write(self.handshake_request)
response = b''
doubleCLRF = b'\r\n\r\n'
while True:
bytes = self.sock.recv(128)
if not bytes:
break
response += bytes
if doubleCLRF in response:
break
if not response:
self.close_connection()
raise HandshakeError("Invalid response")
headers, _, body = response.partition(doubleCLRF)
response_line, _, headers = headers.partition(b'\r\n')
try:
self.process_response_line(response_line)
self.protocols, self.extensions = self.process_handshake_header(headers)
except HandshakeError:
self.close_connection()
raise
self.handshake_ok()
if body:
self.process(body)
def __call__(self, environ, start_response):
if environ.get('REQUEST_METHOD') != 'GET':
raise HandshakeError('HTTP method must be a GET')
for key, expected_value in [('HTTP_UPGRADE', 'websocket'),
('HTTP_CONNECTION', 'upgrade')]:
actual_value = environ.get(key, '').lower()
if not actual_value:
raise HandshakeError('Header %s is not defined' % key)
if expected_value not in actual_value:
raise HandshakeError('Illegal value for header %s: %s' %
(key, actual_value))
key = environ.get('HTTP_SEC_WEBSOCKET_KEY')
if key:
ws_key = base64.b64decode(key.encode('utf-8'))
if len(ws_key) != 16:
raise HandshakeError("WebSocket key's length is invalid")
def __call__(self, environ, start_response):
if environ.get('REQUEST_METHOD') != 'GET':
raise HandshakeError('HTTP method must be a GET')
for key, expected_value in [('HTTP_UPGRADE', 'websocket'),
('HTTP_CONNECTION', 'upgrade')]:
actual_value = environ.get(key, '').lower()
if not actual_value:
raise HandshakeError('Header %s is not defined' % key)
if expected_value not in actual_value:
raise HandshakeError('Illegal value for header %s: %s' %
(key, actual_value))
key = environ.get('HTTP_SEC_WEBSOCKET_KEY')
if key:
ws_key = base64.b64decode(key.encode('utf-8'))
if len(ws_key) != 16:
raise HandshakeError("WebSocket key's length is invalid")
def includeme(config):
config.add_route('ws', 'ws')
config.add_view(websocket, route_name='ws')
config.add_view(bad_handshake, context=HandshakeError)
config.scan(__name__)