Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class DummyServerTransport(ServerTransport):
def __init__(self):
self.messages = []
self.clients = {}
def receive_message(self):
return self.messages.pop()
def send_reply(self, context, message):
if not isinstance(message, str):
raise TypeError('Message must be str().')
self.clients[context].messages.append(message)
class DummyClientTransport(ClientTransport):
def __init__(self, server):
self.server = server
self.id = id(self)
self.server.clients[self.id] = self
self.messages = []
def send_message(self, message):
if not isinstance(message, str):
raise TypeError('Message must be str().')
self.server.messages.append((self.id, message))
def receive_reply(self):
return self.messages.pop()
ZMQ_ENDPOINT = 'inproc://example2'
def mock_transport():
return Mock(ClientTransport)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from typing import Dict
import geventwebsocket as websocket
from . import ClientTransport
class HttpWebSocketClientTransport(ClientTransport):
"""HTTP WebSocket based client transport.
Requires :py:mod:`websocket-python`. Submits messages to a server using the body of
an ``HTTP`` ``WebSocket`` message. Replies are taken from the response of the websocket.
The connection is establish on the ``__init__`` because the protocol is connection oriented,
you need to close the connection calling the close method.
:param endpoint: The URL to connect the websocket.
:param kwargs: Additional parameters for :py:func:`websocket.send`.
"""
def __init__(self, endpoint: str, **kwargs: Dict):
self.endpoint = endpoint
self.request_kwargs = kwargs
self.ws = websocket.create_connection(self.endpoint, **kwargs)
Instead of creating the socket yourself, you can call this function and
merely pass the :py:class:`zmq.core.context.Context` instance.
By passing a context imported from :py:mod:`zmq.green`, you can use
green (gevent) 0mq sockets as well.
:param zmq_context: A 0mq context.
:param endpoint: The endpoint clients will connect to.
"""
socket = zmq_context.socket(zmq.ROUTER)
socket.bind(endpoint)
return cls(socket)
class ZmqClientTransport(ClientTransport):
"""Client transport based on a :py:const:`zmq.REQ` socket.
:param socket: A :py:const:`zmq.REQ` socket instance, connected to the
server socket.
"""
def __init__(self, socket):
self.socket = socket
def send_message(self, message, expect_reply=True):
self.socket.send(message)
if expect_reply:
return self.socket.recv()
@classmethod
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from Queue import Queue
import threading
import requests
import websocket
from . import ServerTransport, ClientTransport
class HttpPostClientTransport(ClientTransport):
"""HTTP POST based client transport.
Requires :py:mod:`requests`. Submits messages to a server using the body of
an ``HTTP`` ``POST`` request. Replies are taken from the responses body.
:param endpoint: The URL to send ``POST`` data to.
:param kwargs: Additional parameters for :py:func:`requests.post`.
"""
def __init__(self, endpoint, **kwargs):
self.endpoint = endpoint
self.request_kwargs = kwargs
def send_message(self, message, expect_reply=True):
if not isinstance(message, str):
raise TypeError('str expected')
def __init__(self, endpoint, **kwargs):
self.endpoint = endpoint
self.request_kwargs = kwargs
def send_message(self, message, expect_reply=True):
if not isinstance(message, str):
raise TypeError('str expected')
r = requests.post(self.endpoint, data=message, **self.request_kwargs)
if expect_reply:
return r.content
class HttpWebSocketClientTransport(ClientTransport):
"""HTTP WebSocket based client transport.
Requires :py:mod:`websocket-python`. Submits messages to a server using the body of
an ``HTTP`` ``WebSocket`` message. Replies are taken from the response of the websocket.
The connection is establish on the ``__init__`` because the protocol is connection oriented,
you need to close the connection calling the close method.
:param endpoint: The URL to connect the websocket.
:param kwargs: Additional parameters for :py:func:`websocket.send`.
"""
def __init__(self, endpoint, **kwargs):
self.endpoint = endpoint
self.request_kwargs = kwargs
self.ws = websocket.create_connection(self.endpoint, **kwargs)
WebSocketServerTransport(ws),
JSONRPCProtocol(),
dispatcher,
)
def serve_forever(self):
try:
super(WebSocketRPCServer, self).serve_forever()
except WebSocketDisconnectedError:
return
def _spawn(self, func, *args, **kwargs):
hub.spawn(func, *args, **kwargs)
class WebSocketClientTransport(ClientTransport):
def __init__(self, ws, queue):
self.ws = ws
self.queue = queue
def send_message(self, message, expect_reply=True):
self.ws.send(six.text_type(message))
if expect_reply:
return self.queue.get()
class WebSocketRPCClient(RPCClient):
def __init__(self, ws):
self.ws = ws