Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
that the result failed and later receive the actual result.
"""
outstanding = self.queues[eid]
for msg_id in outstanding:
self.pending.remove(msg_id)
self.all_completed.add(msg_id)
try:
raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
except:
content = error.wrap_exception()
# build a fake header:
header = {}
header['engine'] = uuid
header['date'] = util.utcnow()
rec = dict(result_content=content, result_header=header, result_buffers=[])
rec['completed'] = util.ensure_timezone(header['date'])
rec['engine_uuid'] = uuid
try:
self.db.update_record(msg_id, rec)
except Exception:
self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
@util.log_errors
@coroutine
def dispatch_query(self, msg):
"""Route registration requests and queries from clients."""
try:
idents, msg = self.session.feed_identities(msg)
except ValueError:
idents = []
if not idents:
self.log.error("Bad Query Message: %r", msg)
return
client_id = idents[0]
try:
msg = self.session.deserialize(msg, content=True)
except Exception:
content = error.wrap_exception()
self.log.error("Bad Query Message: %r", msg, exc_info=True)
else:
# in a process, don't use instance()
# for safety with multiprocessing
ctx = zmq.Context()
loop = ioloop.IOLoop()
ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
util.set_hwm(ins, 0)
ins.setsockopt(zmq.IDENTITY, identity + b'_in')
ins.bind(in_addr)
outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
util.set_hwm(outs, 0)
outs.setsockopt(zmq.IDENTITY, identity + b'_out')
outs.bind(out_addr)
mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
util.set_hwm(mons, 0)
mons.connect(mon_addr)
nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
nots.setsockopt(zmq.SUBSCRIBE, b'')
nots.connect(not_addr)
querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
querys.connect(reg_addr)
# setup logging.
if in_thread:
log = Application.instance().log
else:
if log_url:
log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
else:
log = local_logger(logname, loglevel)
----------
ns : dict
dict of keys with which to update engine namespace(s)
block : bool [default : self.block]
whether to wait to be notified of engine receipt
"""
block = block if block is not None else self.block
track = track if track is not None else self.track
targets = targets if targets is not None else self.targets
# applier = self.apply_sync if block else self.apply_async
if not isinstance(ns, dict):
raise TypeError("Must be a dict, not %s"%type(ns))
return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
content = msg['content']
md = {'msg_id' : parent['msg_id'],
'received' : util.utcnow(),
'engine_uuid' : msg_meta.get('engine', None),
'follow' : msg_meta.get('follow', []),
'after' : msg_meta.get('after', []),
'status' : content['status'],
}
if md['engine_uuid'] is not None:
md['engine_id'] = self._engines.get(md['engine_uuid'], None)
if 'date' in parent:
md['submitted'] = parent['date']
if 'started' in msg_meta:
md['started'] = util._parse_date(msg_meta['started'])
if 'date' in header:
md['completed'] = header['date']
return md
@util.log_errors
def dispatch_notification(self, msg):
"""dispatch register/unregister events."""
try:
idents,msg = self.session.feed_identities(msg)
except ValueError:
self.log.warn("task::Invalid Message: %r",msg)
return
try:
msg = self.session.deserialize(msg)
except ValueError:
self.log.warn("task::Unauthorized message from: %r"%idents)
return
msg_type = msg['header']['msg_type']
handler = self._notification_handlers.get(msg_type, None)
@util.interactive
def remote_import(name, fromlist, level):
"""the function to be passed to apply, that actually performs the import
on the engine, and loads up the user namespace.
"""
import sys
user_ns = globals()
mod = __import__(name, fromlist=fromlist, level=level)
if fromlist:
for key in fromlist:
user_ns[key] = getattr(mod, key)
else:
user_ns[name] = sys.modules[name]
"""get object(s) by `name` from remote namespace
will return one object if it is a key.
can also take a list of keys, in which case it will return a list of objects.
"""
block = block if block is not None else self.block
targets = targets if targets is not None else self.targets
if isinstance(names, string_types):
pass
elif isinstance(names, (list,tuple,set)):
for key in names:
if not isinstance(key, string_types):
raise TypeError("keys must be str, not type %r"%type(key))
else:
raise TypeError("names must be strs, not %r"%names)
return self._really_apply(util._pull, (names,), block=block, targets=targets)