Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def handle_message(request):
response = dispatch(request)
if response.wanted:
send(response, json=True)
def jsonrpc(request):
response = dispatch(request.body.decode())
return JsonResponse(
response.deserialized(), status=response.http_status, safe=False
)
def jsonrpc():
req = request.get_data().decode()
response = jsonrpcserver.dispatch(req, methods=methods, debug=True, convert_camel_case=False)
response_str = str(response)
return Response(response_str, response.http_status, mimetype="application/json")
import zmq
from jsonrpcserver import method, dispatch
socket = zmq.Context().socket(zmq.REP)
@method
def ping():
return 'pong'
if __name__ == '__main__':
socket.bind('tcp://*:5000')
while True:
request = socket.recv().decode()
response = dispatch(request)
socket.send_string(str(response))
def index():
response = dispatch(request.get_data().decode())
return Response(str(response), response.http_status, mimetype="application/json")
def index():
"""Dispatch requests to the handling methods."""
return dispatch(request.get_json(), HandleRequests)
async def dispatch(self, request):
return await dispatch(self, request)
def do_POST(self):
# Process request
request = self.rfile.read(int(self.headers['Content-Length'])).decode()
response = dispatch(request)
# Return response
self.send_response(response.http_status)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(str(response).encode())