Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return self.GET(self.path)
def add(self, name=None):
return self.PUT(self.path / name)
def get(self, name):
return self.GET(self.path / name)
def delete(self, name):
return self.DELETE(self.path / name)
def __repr__(self):
return repr(self.all())
class Queues(Section):
def all(self):
return self.GET(self.path)
def get(self, name):
return self.GET(self.path / name)
def delete(self, name):
return self.DELETE(self.path / name)
def add(self, name, exchange=None, exchange_type=None,
routing_key=None, **options):
options = anyjson.serialize(options) if options else None
return self.PUT(self.path / name,
data={"exchange": exchange,
"exchange_type": exchange_type,
"exchange_type": exchange_type,
"routing_key": routing_key,
"options": options})
def stats(self, name):
return self.GET(self.path / name / "stats")
def autoscale(self, name, max=None, min=None):
return self.POST(self.path / name / "autoscale",
params={"max": max, "min": min})
def __repr__(self):
return repr(self.all())
class Consumers(Section):
path = Path("instances")
def add(self, instance, queue):
return self.PUT(self.path / instance / "queues" / queue)
def remove(self, instance, queue):
return self.PUT(self.path / instance / "queues" / queue)
class Client(Base):
Consumers = Consumers
Instances = Instances
Queues = Queues
app = "scs"
self.path = Path(self.name)
def GET(self, *args, **kwargs):
return self.client.GET(*args, **kwargs)
def PUT(self, *args, **kwargs):
return self.client.PUT(*args, **kwargs)
def POST(self, *args, **kwargs):
return self.client.POST(*args, **kwargs)
def DELETE(self, *args, **kwargs):
return self.client.DELETE(*args, **kwargs)
class Instances(Section):
def all(self):
return self.GET(self.path)
def add(self, name=None):
return self.PUT(self.path / name)
def get(self, name):
return self.GET(self.path / name)
def delete(self, name):
return self.DELETE(self.path / name)
def __repr__(self):
return repr(self.all())