Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _verify_all(self, force=False):
if self._last_update and self._last_update.ready():
try:
self._last_update.wait() # collect result
except self.GreenletExit:
pass
force = True
if not self._last_update or force:
self._last_update = self.verify(Node.objects.all(), ratelimit=True)
for component in reversed(self.components):
if self._components_ready[component.thread]:
try:
component.stop()
except KeyboardInterrupt:
pass
except BaseException, exc:
component.error("Error in shutdown: %r", exc)
def maybe_wait(g, nowait):
not nowait and g.wait()
class Cluster(object):
Nodes = Node._default_manager
Brokers = Broker._default_manager
def get(self, nodename):
return self.Nodes.get(name=nodename)
def add(self, nodename=None, queues=None,
max_concurrency=1, min_concurrency=1, broker=None,
pool=None, app=None, nowait=False, **kwargs):
broker = self.Brokers.get_or_create(url=broker)[0] if broker else None
node = self.Nodes.add(nodename, queues, max_concurrency,
min_concurrency, broker, pool, app)
maybe_wait(self.sup.verify([node]), nowait)
return node
def remove(self, nodename, nowait=False):
node = self.Nodes.remove(nodename)
def gather_stats(self):
now = rfc2822(datetime.utcnow())
return {"agents": {self.id: {
"loadavg": [now] + list(metrics.load_average()),
"instances": [now, self.Nodes.all().count(),
self.Nodes.enabled().count()],
"drive_used": [now, metrics.df(Node.cwd).capacity]},
"instances": self.get_instance_stats()}}
return self._cache[name]
def _get(self, name):
try:
return self.state.get(name)
except self.Next:
replies = self.scatter("get", {"name": name}, propagate=False)
for reply in replies:
if not isinstance(reply, Exception):
return reply
apps = App()
class Node(ModelActor):
"""Actor for managing the Node model."""
model = models.Node
exchange = Exchange("xscs.Node")
default_timeout = 60
types = ("direct", "scatter", "round-robin")
meta_lookup_section = "nodes"
class state:
def all(self, app=None):
fun = self.objects.all
if app:
fun = partial(self.objects.filter, app=apps.get(app))
return [node.name for node in fun()]
def get(self, name, app=None):
try:
x = self.objects.get(name=name)
"processes": [list of pids],
"put-guarded-by-semaphore": True,
"timeouts": [seconds soft_timeout, seconds hard_timeout],
},
"total": {task_types_and_total_count},
}
}}
Where ``now`` is an RFC2822 formatted timestamp in the UTC timezone,
and ``procs`` is a number of either processes, threads or green threads
depending on the pool type used.
"""
#: Manager object for all nodes managed by this agent.
Nodes = Node._default_manager
#: Exchange used to query available instances.
query_exchange = Exchange("srs.agent.query-instances",
"fanout", auto_delete=True)
#: Exchange used to request instance updates.
update_exchange = Exchange("srs.instance.update",
"topic", auto_delete=True)
#: Exchange we publish statistics to.
stats_exchange = Exchange("srs.statistics",
"fanout", auto_delete=True)
#: Exchange we publish replies to.
reply_exchange = Exchange("reply", "direct")