Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@with_server(extra_headers=Headers({"X-Spam": "Eggs"}))
@with_client("/headers")
def test_protocol_custom_response_headers(self):
self.loop.run_until_complete(self.client.recv())
resp_headers = self.loop.run_until_complete(self.client.recv())
self.assertIn("('X-Spam', 'Eggs')", resp_headers)
def assertInvalidRequestHeaders(self, exc_type):
"""
Provide request headers for modification.
Assert that the transformation made them invalid.
"""
headers = Headers()
build_request(headers)
yield headers
assert issubclass(exc_type, InvalidHandshake)
with self.assertRaises(exc_type):
check_request(headers)
def test_round_trip(self):
request_headers = Headers()
request_key = build_request(request_headers)
response_key = check_request(request_headers)
self.assertEqual(request_key, response_key)
response_headers = Headers()
build_response(response_headers, response_key)
check_response(response_headers, request_key)
def assertValidResponseHeaders(self, key="CSIRmL8dWYxeAdr/XpEHRw=="):
"""
Provide response headers for modification.
Assert that the transformation kept them valid.
"""
headers = Headers()
build_response(headers, key)
yield headers
check_response(headers, key)
def assertInvalidResponseHeaders(self, exc_type, key="CSIRmL8dWYxeAdr/XpEHRw=="):
"""
Provide response headers for modification.
Assert that the transformation made them invalid.
"""
headers = Headers()
build_response(headers, key)
yield headers
assert issubclass(exc_type, InvalidHandshake)
with self.assertRaises(exc_type):
check_response(headers, key)
def assertValidRequestHeaders(self):
"""
Provide request headers for modification.
Assert that the transformation kept them valid.
"""
headers = Headers()
build_request(headers)
yield headers
check_request(headers)
def handshake(self, response):
"""Websocket handshake, handled by `websockets`
"""
try:
headers = websockets.http.Headers(**self.request.headers)
key = websockets.handshake.check_request(headers)
websockets.handshake.build_response(response.headers, key)
except websockets.InvalidHandshake:
raise RuntimeError('Invalid websocket request')
subprotocol = None
ws_protocol = ','.join(headers.get_all('Sec-Websocket-Protocol'))
subprotocols = self.request.route.payload.get('subprotocols')
if subprotocols and ws_protocol:
# select a subprotocol
client_subprotocols = tuple(
(p.strip() for p in ws_protocol.split(',')))
for p in client_subprotocols:
if p in subprotocols:
subprotocol = p
response.headers['Sec-Websocket-Protocol'] = subprotocol