Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_extensions_error_no_extensions(self, _process_extensions):
_process_extensions.return_value = "x-no-op", [NoOpExtension()]
with self.assertRaises(InvalidHandshake):
self.start_client("/extensions")
def test_infinite_redirect(self):
with temp_test_redirecting_server(self, http.HTTPStatus.FOUND):
self.server = self.redirecting_server
with self.assertRaises(InvalidHandshake):
with temp_test_client(self):
self.fail("Did not raise") # pragma: no cover
def test_reject_connection():
async def app(message, channels):
await channels['reply'].send({'accept': False})
with run_server(app) as url:
loop = asyncio.new_event_loop()
with pytest.raises(websockets.exceptions.InvalidHandshake):
state = loop.run_until_complete(websockets.connect(url))
loop.close()
def assertInvalidResponseHeaders(self, exc_type, key="CSIRmL8dWYxeAdr/XpEHRw=="):
"""
Provide response headers for modification.
Assert that the transformation made them invalid.
"""
headers = Headers()
build_response(headers, key)
yield headers
assert issubclass(exc_type, InvalidHandshake)
with self.assertRaises(exc_type):
check_response(headers, key)
def assertInvalidResponseHeaders(self, exc_type, key="CSIRmL8dWYxeAdr/XpEHRw=="):
"""
Provide response headers for modification.
Assert that the transformation made them invalid.
"""
headers = Headers()
build_response(headers, key)
yield headers
assert issubclass(exc_type, InvalidHandshake)
with self.assertRaises(exc_type):
check_response(headers, key)
def test_checking_origin_fails(self):
server = self.loop.run_until_complete(
serve(handler, "localhost", 0, origins=["http://localhost"])
)
with self.assertRaisesRegex(
InvalidHandshake, "server rejected WebSocket connection: HTTP 403"
):
self.loop.run_until_complete(
connect(get_server_uri(server), origin="http://otherhost")
)
server.close()
self.loop.run_until_complete(server.wait_closed())
elif msg.startswith('ROOM_PEER_MSG'):
_, peer_id, msg = msg.split(maxsplit=2)
if peer_id in sent_offers:
print('Got answer from {!r}: {}'.format(peer_id, msg))
continue
print('Got offer from {!r}, replying'.format(peer_id))
await ws.send(get_answer_sdp(msg, peer_id))
continue
print('Unknown msg: {!r}, exiting'.format(msg))
return
print('Our uid is {!r}'.format(PEER_ID))
try:
asyncio.get_event_loop().run_until_complete(hello())
except websockets.exceptions.InvalidHandshake:
print('Invalid handshake: are you sure this is a websockets server?\n')
raise
except ssl.SSLError:
print('SSL Error: are you sure the server is using TLS?\n')
raise
headers = {
'Authorization': 'Bearer ' + self.auth_token,
'X-ConnectionId': self.connection_id
}
# record the Connection metric telemetry data
self.metrics.append({
'Name': 'Connection',
'Id': self.connection_id,
'Start': utils.generate_timestamp()
})
try:
# request a WebSocket connection to the speech API
self.ws = await websockets.client.connect(url, extra_headers=headers)
except websockets.exceptions.InvalidHandshake as err:
print('Handshake error: {0}'.format(err))
return
# TODO: add Connection failure telemetry for error cases
# record the Connection metric telemetry data
self.metrics[-1]['End'] = utils.generate_timestamp()
# send the speech.config message
await self.send_speech_config_msg()
if CALLEE_ID:
if msg == 'SESSION_OK':
await ws.send(send_sdp_ice())
sent_sdp = True
else:
print('Unknown reply: {!r}, exiting'.format(msg))
return
else:
await ws.send(reply_sdp_ice(msg))
return # Done
print('Our uid is {!r}'.format(PEER_ID))
try:
asyncio.get_event_loop().run_until_complete(hello())
except websockets.exceptions.InvalidHandshake:
print('Invalid handshake: are you sure this is a websockets server?\n')
raise
except ssl.SSLError:
print('SSL Error: are you sure the server is using TLS?\n')
raise