Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get(self, request, app, name, queue=None, nowait=False):
return nodes.consuming_from(name)
def put(self, request, app, name=None, nowait=False):
return self.Created(nodes.add(name=name, app=app,
nowait=nowait,
**self.params("broker", "pool")))
post = put
def put(self, request, app, name, queue, nowait=False):
return self.Created(nodes.add_consumer(name, queue, nowait=nowait))
post = put
def get(self, request, app, name=None, nowait=False):
return nodes.get(name) if name else nodes.all(app=app)
def get(self, request, app, name):
node = nodes.get(name)
return {"max": node["max_concurrency"], "min": node["min_concurrency"]}
def delete(self, request, app, name, queue, nowait=False):
return self.Ok(nodes.cancel_consumer(name, queue, nowait=nowait))
def post(self, request, app, name, nowait=False):
return self.Ok(nodes.autoscale(name, nowait=nowait,
**self.params(("max", int),
("min", int))))