Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def dispatch(self, request, app, rest):
gd = lambda m: getattr(request, m)
queue, url = self.prepare_path(rest)
app = apps.get(app)
broker = app.get_broker()
method = request.method.upper()
pargs = {}
if queue:
queue = queues.get(queue)
pargs.update(exchange=queue["exchange"],
exchange_type=queue["exchange_type"],
routing_key=queue["routing_key"])
params = gd(method) if method in self.get_methods else gd("GET")
data = gd(method) if method not in self.get_methods else None
with producers[broker.connection()].acquire(block=True) as producer:
publisher = celery.amqp.TaskPublisher(
connection=producer.connection,
channel=producer.channel)
result = webhook.apply_async((url, method, params, data),
def delete(self, request, app):
return apps.delete(app)
def get(self, request, app=None):
return apps.get(app).as_dict() if app else apps.all()