Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from unittest.mock import sentinel
import pytest
from jsonrpcclient.async_client import AsyncClient
from jsonrpcclient.requests import Request
from jsonrpcclient.response import Response
class DummyClient(AsyncClient):
async def send_message(self, request, response_expected, **kwargs):
res = '{"jsonrpc": "2.0", "result": 1, "id": 1}'
return Response(res, raw=sentinel)
class Test:
@pytest.mark.asyncio
async def test_json_encoded(self):
request = '{"jsonrpc": "2.0", "method": "foo", "id": 1}'
response = await DummyClient().send(request)
assert response.data.result == 1
@pytest.mark.asyncio
async def test_json_decoded(self, *_):
request = {"jsonrpc": "2.0", "method": "foo", "id": 1}
response = await DummyClient().send(request)
class MethodCall(namedtuple('MethodCall', ['dest', 'method', 'source'])):
__slots__ = ()
def tos(self):
return self.dest + ':' + self.method + ':' + self.source
def __repr__(self):
return self.dest + ':' + self.method + ':' + self.source
@classmethod
def make(cls, method):
return cls._make(method.split(':'))
class RedisPubSubRPC(AsyncClient):
"""
Band RPC interface.
Used to interract with other microservices
Class constants:
RPC_TIMEOUT Default timeout for RPC request (seconds)
"""
RPC_TIMEOUT = 2
def __init__(self, name, rpc_params=None, redis_params=None,
**kwargs):
super(RedisPubSubRPC, self).__init__('noop')
self.name = name
self.pending = {}
# TODO: remove redis_params
def build_funcmap(klass,
pred=None) -> Dict[str, Transforms]:
""" Inspect a class object and build a map of transforms for each function
(ignoring self)
"""
def should_scrape(fobj):
return inspect.isfunction(fobj) and not fobj.__name__.startswith('_')
if not pred:
pred = should_scrape
return {
fname: _build_function_type_transforms(fname, fobj)
for fname, fobj in inspect.getmembers(klass, pred)
}
class UnixSocketClient(AsyncClient):
"""
jsonrpc async client implementation based on jsonrpc.clients.socket_client
"""
class _Connection:
def __init__(self, socket_path, reader, writer):
self.socket_path = socket_path
self.reader = reader
self.writer = writer
self.decoder = JsonStreamDecoder(reader)
def __init__(self) -> None:
self._connection: Optional[UnixSocketClient._Connection] = None
self._funcmap = build_funcmap(API)
@classmethod
"""ZeroMQ Client"""
from typing import Any
import zmq # type: ignore
import zmq.asyncio # type: ignore
from ..async_client import AsyncClient
from ..response import Response
class ZeroMQAsyncClient(AsyncClient):
"""
:param endpoint:
:param socket_type:
:param **kwargs: Passed through to Client class.
"""
def __init__(
self, endpoint: str, *args: Any, socket_type: int = zmq.REQ, **kwargs: Any
) -> None:
super().__init__(*args, **kwargs)
self.context = zmq.asyncio.Context()
self.socket = self.context.socket(socket_type)
self.socket.bind(endpoint)
async def send_message( # type: ignore
self, request: str, **kwargs: Any
"""
Websockets client.
http://websockets.readthedocs.io/
"""
from typing import Any
from websockets import WebSocketCommonProtocol # type: ignore
from ..async_client import AsyncClient
from ..response import Response
class WebSocketsClient(AsyncClient):
def __init__(
self, socket: WebSocketCommonProtocol, *args: Any, **kwargs: Any
) -> None:
"""
Args:
socket: Connected websocket (websockets.connect("ws://localhost:5000"))
"""
super().__init__(*args, **kwargs)
self.socket = socket
async def send_message(
self, request: str, response_expected: bool, **kwargs: Any
) -> Response:
"""
Transport the message to the server and return the response.
"""
Tornado Client.
Represents an endpoint to communicate with using Tornado asynchronous HTTP client.
"""
from typing import Any, Optional
from tornado.httpclient import AsyncHTTPClient # type: ignore
from ..async_client import AsyncClient
from ..response import Response
class TornadoClient(AsyncClient):
"""
Note: Tornado raises its own HTTP response status code exceptions, so there's no
need to raise ReceivedNon2xxResponseError.
"""
DEFAULT_HEADERS = {"Content-Type": "application/json", "Accept": "application/json"}
DEFAULT_RESPONSE_LOG_FORMAT = "<-- %(message)s (%(http_code)s %(http_reason)s)"
def __init__(
self,
endpoint: str,
*args: Any,
client: Optional[AsyncHTTPClient] = None,
**kwargs: Any
) -> None:
super().__init__(*args, **kwargs)
Requires aiohttp >= 3.0.
http://aiohttp.readthedocs.io/
"""
from ssl import SSLContext
from typing import Any, Optional
import async_timeout # type: ignore
from aiohttp import ClientSession # type: ignore
from ..async_client import AsyncClient
from ..exceptions import ReceivedNon2xxResponseError
from ..response import Response
class AiohttpClient(AsyncClient):
DEFAULT_RESPONSE_LOG_FORMAT = "<-- %(message)s (%(http_code)s %(http_reason)s)"
def __init__(
self,
session: ClientSession,
endpoint: str,
*args: Any,
ssl: Optional[SSLContext] = None,
timeout: int = 10,
**kwargs: Any
) -> None:
super().__init__(*args, **kwargs)
self.endpoint = endpoint
self.session = session
self.ssl = ssl