Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_cannot_add_websocket_more_than_once(self, MockSelectPoller):
m = WebSocketManager(poller=MockSelectPoller())
ws = MagicMock()
ws.sock.fileno.return_value = 1
m.add(ws)
self.assertEqual(len(m), 1)
m.add(ws)
self.assertEqual(len(m), 1)
def test_mainloop_can_be_stopped_when_no_websocket_were_registered(self, MockSelectPoller):
m = WebSocketManager(poller=MockSelectPoller())
self.assertFalse(m.running)
m.start()
self.assertTrue(m.running)
m.stop()
self.assertFalse(m.running)
def test_add_and_remove_websocket(self, MockSelectPoller):
m = WebSocketManager(poller=MockSelectPoller())
ws = MagicMock()
ws.sock.fileno.return_value = 1
m.add(ws)
m.poller.register.assert_called_once_with(1)
m.remove(ws)
m.poller.unregister.assert_called_once_with(1)
'This feature requires the optional ws4py library.')
if isinstance(commands, six.string_types):
raise TypeError("First argument must be a list.")
response = self.api['exec'].post(json={
'command': commands,
'environment': environment,
'wait-for-websocket': True,
'interactive': False,
})
fds = response.json()['metadata']['metadata']['fds']
operation_id = response.json()['operation'].split('/')[-1]
parsed = parse.urlparse(
self.client.api.operations[operation_id].websocket._api_endpoint)
manager = WebSocketManager()
stdin = _StdinWebsocket(manager, self.client.websocket_url)
stdin.resource = '{}?secret={}'.format(parsed.path, fds['0'])
stdin.connect()
stdout = _CommandWebsocketClient(manager, self.client.websocket_url)
stdout.resource = '{}?secret={}'.format(parsed.path, fds['1'])
stdout.connect()
stderr = _CommandWebsocketClient(manager, self.client.websocket_url)
stderr.resource = '{}?secret={}'.format(parsed.path, fds['2'])
stderr.connect()
manager.start()
while True: # pragma: no cover
for websocket in manager.websockets.values():
if not websocket.terminated:
def initialize_websockets_manager(self):
"""
Call thos to start the underlying websockets
manager. Make sure to call it once your server
is created.
"""
self.manager = WebSocketManager()
self.manager.start()
def __init__(self, bus):
plugins.SimplePlugin.__init__(self, bus)
self.manager = WebSocketManager()
environment = {}
response = self.api['exec'].post(json={
'command': commands,
'environment': environment,
'wait-for-websocket': True,
'interactive': False,
})
fds = response.json()['metadata']['metadata']['fds']
operation_id = \
Operation.extract_operation_id(response.json()['operation'])
parsed = parse.urlparse(
self.client.api.operations[operation_id].websocket._api_endpoint)
with managers.web_socket_manager(WebSocketManager()) as manager:
stdin = _StdinWebsocket(
self.client.websocket_url, payload=stdin_payload,
encoding=stdin_encoding
)
stdin.resource = '{}?secret={}'.format(parsed.path, fds['0'])
stdin.connect()
stdout = _CommandWebsocketClient(
manager, self.client.websocket_url,
encoding=encoding, decode=decode, handler=stdout_handler)
stdout.resource = '{}?secret={}'.format(parsed.path, fds['1'])
stdout.connect()
stderr = _CommandWebsocketClient(
manager, self.client.websocket_url,
encoding=encoding, decode=decode, handler=stderr_handler)
stderr.resource = '{}?secret={}'.format(parsed.path, fds['2'])
stderr.connect()
'tools.auth_digest.realm': 'Auto-Subliminal website',
'tools.auth_digest.get_ha1': auth_digest.get_ha1_dict_plain(users),
'tools.auth_digest.key': 'yek.tsegid_htua.lanimilbuS-otuA' # Can be any random string
})
# Configure our custom json_out_handler (Uncomment if it should be used for any @cherrypy.tools.json_out())
# cherrypy.config.update({'tools.json_out.handler': json_out_handler})
if not restarting:
# Enable websocket plugin
websocket_plugin = WebSocketPlugin(cherrypy.engine)
websocket_plugin.subscribe()
cherrypy.tools.websocket = WebSocketTool()
else:
# When restarting we need to create a new websocket manager thread (you cannot start the same thread twice!)
websocket_plugin.manager = WebSocketManager()
# When restarting we need to clear the httpserver to force the creation of a new one (needed for ip/port change)
cherrypy.server.httpserver = None
def initialize_websockets_manager(self):
"""
Call thos to start the underlying websockets
manager. Make sure to call it once your server
is created.
"""
self.manager = WebSocketManager()
self.manager.start()
from ws4py.client import WebSocketBaseClient
from ws4py.manager import WebSocketManager
import time
import random
m = WebSocketManager()
class EchoClient(WebSocketBaseClient):
def handshake_ok(self):
m.add(self)
def add_writer(self, link):
self.writer = link
def received_message(self, msg):
self.writer.append(str(msg))
def run_ws(ws_addr, delegate):
messages = []