Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def handle(request, logging=False, debug=False):
request = await request.text()
response = await async_dispatch(request, basic_logging=logging, debug=debug)
if response.wanted:
return web.json_response(response.deserialized(), headers=res_headers, status=response.http_status)
else:
return web.Response(headers=res_headers, content_type="text/plain")
async def api_handle(request: web.Request) -> web.Response:
request_text = await request.text()
response = await async_dispatch(request_text,
context=request,
debug=config.debug,
basic_logging=False,
trim_log_values=True)
if isinstance(response, ExceptionResponse):
logging.error("Server Error", exc_info=response.exc)
if response.wanted:
return web.json_response(response.deserialized(),
status=response.http_status,
dumps=compat_dumps)
else:
return web.Response()
return api_handle
async def handle(self, request):
async with self.auth_lock:
try:
await self.authenticate(request.headers)
except AuthenticationInvalidOrMissing:
return web.Response(headers={"WWW-Authenticate": "Basic realm=Electrum"},
text='Unauthorized', status=401)
except AuthenticationCredentialsInvalid:
return web.Response(text='Forbidden', status=403)
request = await request.text()
response = await jsonrpcserver.async_dispatch(request, methods=self.methods)
if isinstance(response, jsonrpcserver.response.ExceptionResponse):
self.logger.error(f"error handling request: {request}", exc_info=response.exc)
# this exposes the error message to the client
response.message = str(response.exc)
if response.wanted:
return web.json_response(response.deserialized(), status=response.http_status)
else:
return web.Response()
async def handle(self, request):
try:
self.authenticate(request.headers)
except AuthenticationError:
return web.Response(text='Forbidden', status='403')
request = await request.text()
self.logger.info(f'request: {request}')
response = await jsonrpcserver.async_dispatch(request, methods=self.methods)
if response.wanted:
return web.json_response(response.deserialized(), status=response.http_status)
else:
return web.Response()
async def handle(request):
request = await request.text()
response = await dispatch(request)
if response.wanted:
return web.json_response(response.deserialized(), status=response.http_status)
else:
return web.Response()
async def main():
rep = await aiozmq.create_zmq_stream(zmq.REP, bind="tcp://*:5000")
while True:
request = await rep.read()
response = await dispatch(request[0].decode())
rep.write((str(response).encode(),))
async def home(request: Request):
request = await request.stream.read()
response = await dispatch(request.decode())
return JsonResponse(response.deserialized())