Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_close(self):
curdir = os.path.dirname(os.path.abspath(__file__))
path = os.path.join(curdir, 'closeme.py')
proc = subprocess.run(
[sys.executable, path],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
self.assertEqual(proc.returncode, 0)
wsEndPoint = proc.stdout.decode()
# chrome should be already closed, so fail to connect websocket
with self.assertRaises(OSError):
await websockets.client.connect(wsEndPoint)
# Get authentication
nonce = res.headers["WWW-Authenticate"].split(',', 3)
# read response
res.read()
# Close HTTPS Connection
conn.close()
# Build websocket headers
websocketHeaders = {'Authorization': build_digest_headers(nonce)}
if ssl_context is not None:
websocket_ssl_context = ssl_context
else:
websocket_ssl_context = True # Verify certificate
async with websockets.client.connect('wss://{}:443/mediation/client?mac={}&appli=1'.format(host, mac),
extra_headers=websocketHeaders, ssl=websocket_ssl_context) as websocket:
# Get informations (not very useful)
#await get_info(websocket)
# Get all moments stored on Tydom
#await get_moments(websocket)
# Get scenarios ids
await get_scenarios(websocket)
# Run scenario with scn id returned in previous command
await put_scenarios(websocket, 15)
async def ws_connect(host_ip, session_token):
"""
Connect is an async function that returns a connected Eva websocket
Connect needs to be run from a asyncio event_loop and retuns a
websockets.Websocket object. Using this you can manually call .recv()
and .send(), make sure to also .close() when you are finished to clean
up the websocket connection.
"""
host_uri = 'ws://{}/api/v1/data/stream'.format(host_ip)
subprotocols = ['SessionToken_{}'.format(session_token), "object"]
ws = await websockets.client.connect(host_uri, subprotocols=subprotocols)
return ws
async def connect(self, project_id=None):
"""connects to the cloud data server. specifying a project_id will overwrite any id given when the class is constructed.
one must be given at class contruction or passed to connect() or an error will be raised"""
self.project_id = project_id or self.project_id
assert self.project_id is not None
assert self.session.authenticated()
_websockets_client.USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36" # look like a normal client if this works the way i expect it does :sunglasses:
self._debug("connect: establish")
self.socket = await _websockets_client.connect("ws://" + self.session.CLOUD + "/", loop=self.event_loop,
extra_headers=[(key, value) for key, value in
self.session._get_headers(cookie=True).items()])
self.socket.timeout = 30 # not sure how big this is server side but this number will be used to mark the socket as timed out after a small period of no reads
self._connected = True
self.watchdog.reset_read()
self._debug("connect: handshake")
await AIOCloudSession._write_packet(self, self._create_packet('handshake', {}))
self._debug("connect: write()")
await AIOCloudSession._write(self)
self._debug("connect: complete")
async def connect(self, project_id=None):
"""connects to the cloud data server. specifying a project_id will overwrite any id given when the class is constructed.
one must be given at class contruction or passed to connect() or an error will be raised"""
self.project_id = project_id or self.project_id
assert self.project_id is not None
assert self.session.authenticated()
_websockets_client.USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36" # look like a normal client if this works the way i expect it does :sunglasses:
self._debug("connect: establish")
self.socket = await _websockets_client.connect("ws://" + self.session.CLOUD + "/", loop=self.event_loop,
extra_headers=[(key, value) for key, value in
self.session._get_headers(cookie=True).items()])
self.socket.timeout = 30 # not sure how big this is server side but this number will be used to mark the socket as timed out after a small period of no reads
self._connected = True
self.watchdog.reset_read()
self._debug("connect: handshake")
await AIOCloudSession._write_packet(self, self._create_packet('handshake', {}))
self._debug("connect: write()")
await AIOCloudSession._write(self)
self._debug("connect: complete")
log.debug('Updating our voice state to %s.', payload)
await self.send_as_json(payload)
async def close(self, code=1000, reason=''):
if self._keep_alive:
self._keep_alive.stop()
await super().close(code, reason)
async def close_connection(self, *args, **kwargs):
if self._keep_alive:
self._keep_alive.stop()
await super().close_connection(*args, **kwargs)
class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
"""Implements the websocket protocol for handling voice connections.
Attributes
-----------
IDENTIFY
Send only. Starts a new voice session.
SELECT_PROTOCOL
Send only. Tells discord what encryption mode and how to connect for voice.
READY
Receive only. Tells the websocket that the initial connection has completed.
HEARTBEAT
Send only. Keeps your websocket connection alive.
SESSION_DESCRIPTION
Receive only. Gives you the secret key required for voice.
SPEAKING
Send only. Notifies the client if you are currently speaking.
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._ws = websockets.client.connect(
self._url,
loop=self._loop,
max_size=None,
ping_interval=None,
ping_timeout=None,
)