Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def websocket_handshake(self, request, subprotocols: set=None):
"""Websocket handshake, handled by `websockets`
"""
headers = []
def get_header(k):
return request.headers.get(k.upper(), '')
def set_header(k, v):
headers.append((k, v))
try:
key = handshake.check_request(get_header)
handshake.build_response(set_header, key)
except InvalidHandshake:
raise RuntimeError('Invalid websocket request')
subprotocol = None
ws_protocol = get_header('Sec-Websocket-Protocol')
if subprotocols and ws_protocol:
# select a subprotocol
client_subprotocols = tuple(
(p.strip() for p in ws_protocol.split(',')))
for p in client_subprotocols:
if p in subprotocols:
subprotocol = p
set_header('Sec-Websocket-Protocol', subprotocol)
break
async def websocket_handshake(self, request, subprotocols=None):
headers = {}
try:
key = handshake.check_request(request.headers)
handshake.build_response(headers, key)
except InvalidHandshake:
msg = "Invalid websocket request received."
if self.debug:
msg += "\n" + traceback.format_exc()
self.logger.error(msg)
self.on_response(msg)
raise RuntimeError(msg)
subprotocol = None
if subprotocols and "Sec-Websocket-Protocol" in request.headers:
# select a subprotocol
client_subprotocols = [
p.strip()
for p in request.headers["Sec-Websocket-Protocol"].split(",")
]
async def websocket_handshake(self, request, subprotocols=None):
# let the websockets package do the handshake with the client
headers = {}
try:
key = handshake.check_request(request.headers)
handshake.build_response(headers, key)
except InvalidHandshake:
raise InvalidUsage("Invalid websocket request")
subprotocol = None
if subprotocols and "Sec-Websocket-Protocol" in request.headers:
# select a subprotocol
client_subprotocols = [
p.strip()
for p in request.headers["Sec-Websocket-Protocol"].split(",")
]
for p in client_subprotocols:
if p in subprotocols:
subprotocol = p
headers["Sec-Websocket-Protocol"] = subprotocol
break
def websocket_upgrade(http):
request_headers = dict([
(key.decode('latin-1'), value.decode('latin-1'))
for key, value in http.headers
])
response_headers = {}
try:
key = websockets.handshake.check_request(request_headers)
websockets.handshake.build_response(response_headers, key)
except websockets.InvalidHandshake:
rv = b"HTTP/1.1 403 Forbidden\r\n\r\n"
http.transport.write(rv)
http.transport.close()
return
# Retrieve any subprotocols to be negotiated with the consumer later
subprotocols = [
subprotocol.strip() for subprotocol in
request_headers.get("sec-websocket-protocol", "").split(",")
]
http.scope.update({"type": "websocket", "subprotocols": subprotocols})
asgi_instance = http.app(http.scope)
request = WebSocketRequest(http, response_headers)
http.loop.create_task(asgi_instance(request.receive, request.send))
request.put_message({"type": "websocket.connect", "order": 0})
async def websocket_handshake(self, request, subprotocols=None):
# let the websockets package do the handshake with the client
headers = {}
try:
key = handshake.check_request(request.headers)
handshake.build_response(headers, key)
except InvalidHandshake:
raise InvalidUsage("Invalid websocket request")
subprotocol = None
if subprotocols and "Sec-Websocket-Protocol" in request.headers:
# select a subprotocol
client_subprotocols = [
p.strip()
for p in request.headers["Sec-Websocket-Protocol"].split(",")
]
for p in client_subprotocols:
if p in subprotocols:
subprotocol = p
headers["Sec-Websocket-Protocol"] = subprotocol
break
async def websocket_handshake(self, request, subprotocols=None):
headers = {}
try:
key = handshake.check_request(request.headers)
handshake.build_response(headers, key)
except InvalidHandshake:
msg = "Invalid websocket request received."
if self.debug:
msg += "\n" + traceback.format_exc()
self.logger.error(msg)
self.on_response(msg)
raise RuntimeError(msg)
subprotocol = None
if subprotocols and "Sec-Websocket-Protocol" in request.headers:
# select a subprotocol
client_subprotocols = [
p.strip()
for p in request.headers["Sec-Websocket-Protocol"].split(",")
]
for p in client_subprotocols:
def handshake(self, response):
"""Websocket handshake, handled by `websockets`
"""
try:
headers = websockets.http.Headers(**self.request.headers)
key = websockets.handshake.check_request(headers)
websockets.handshake.build_response(response.headers, key)
except websockets.InvalidHandshake:
raise RuntimeError('Invalid websocket request')
subprotocol = None
ws_protocol = ','.join(headers.get_all('Sec-Websocket-Protocol'))
subprotocols = self.request.route.payload.get('subprotocols')
if subprotocols and ws_protocol:
# select a subprotocol
client_subprotocols = tuple(
(p.strip() for p in ws_protocol.split(',')))
for p in client_subprotocols:
if p in subprotocols:
subprotocol = p
response.headers['Sec-Websocket-Protocol'] = subprotocol
break
def websocket_handshake(self, request, subprotocols: set=None):
"""Websocket handshake, handled by `websockets`
"""
headers = []
def get_header(k):
return request.headers.get(k.upper(), '')
def set_header(k, v):
headers.append((k, v))
try:
key = handshake.check_request(get_header)
handshake.build_response(set_header, key)
except InvalidHandshake:
raise RuntimeError('Invalid websocket request')
subprotocol = None
ws_protocol = get_header('Sec-Websocket-Protocol')
if subprotocols and ws_protocol:
# select a subprotocol
client_subprotocols = tuple(
(p.strip() for p in ws_protocol.split(',')))
for p in client_subprotocols:
if p in subprotocols:
subprotocol = p
set_header('Sec-Websocket-Protocol', subprotocol)
break
# write the 101 response back to the client
def __init__(self, environ, switch_protocols):
super().__init__()
http_1_1 = environ['SERVER_PROTOCOL'] == 'HTTP/1.1'
get_header = lambda k: environ['HTTP_' + k.upper().replace('-', '_')]
key = handshake.check_request(get_header)
if not http_1_1 or key is None:
self.status_code = 400
self.content = "Invalid WebSocket handshake.\n"
else:
self._headers = {} # Reset headers (private API!)
set_header = self.__setitem__
handshake.build_response(set_header, key)
self.close = switch_protocols
def __init__(self, environ, switch_protocols):
super().__init__()
self.status_int = 101
http_1_1 = environ['SERVER_PROTOCOL'] == 'HTTP/1.1'
def get_header(k):
key_map = {k.upper(): k for k in environ}
return environ[key_map['HTTP_' + k.upper().replace('-', '_')]]
key = websockets.handshake.check_request(get_header)
if not http_1_1 or key is None:
self.status_int = 400
self.content = "Invalid WebSocket handshake.\n"
else:
set_header = self.headers.__setitem__
websockets.handshake.build_response(set_header, key)
self.app_iter = HandshakeInterator(self.app_iter)
self.app_iter.close = switch_protocols