Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param int handle:
:data:`mitogen.core.ADD_ROUTE` or :data:`mitogen.core.DEL_ROUTE`
:param int target_id:
ID of the connecting or disconnecting context.
:param str name:
Context name or :data:`None`.
"""
if not stream:
# We may not have a stream during shutdown.
return
data = str(target_id)
if name:
data = '%s:%s' % (target_id, name)
stream.protocol.send(
mitogen.core.Message(
handle=handle,
data=data.encode('utf-8'),
dst_id=stream.protocol.remote_id,
)
def _receive_one(self, broker):
if self._input_buf_len < Message.HEADER_LEN:
return False
msg = Message()
msg.router = self._router
(magic, msg.dst_id, msg.src_id, msg.auth_id,
msg.handle, msg.reply_to, msg_len) = struct.unpack(
Message.HEADER_FMT,
self._input_buf[0][:Message.HEADER_LEN],
)
if magic != Message.HEADER_MAGIC:
LOG.error(self.corrupt_msg, self.stream.name, self._input_buf[0][:2048])
self.stream.on_disconnect(broker)
return False
if msg_len > self._router.max_message_size:
LOG.error('Maximum message size exceeded (got %d, max %d)',
msg_len, self._router.max_message_size)
self.stream.on_disconnect(broker)
return False
total_len = msg_len + Message.HEADER_LEN
if self._input_buf_len < total_len:
_vv and IOLOG.debug(
'%r: Input too short (want %d, got %d)',
self, msg_len, self._input_buf_len - Message.HEADER_LEN
)
def _send_one_module(self, stream, tup):
if tup[0] not in stream.protocol.sent_modules:
stream.protocol.sent_modules.add(tup[0])
self.router._async_route(
mitogen.core.Message.pickled(
tup,
dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
def on_shutdown(self, broker):
"""
Called during :meth:`Broker.shutdown`, informs callbacks registered
with :meth:`add_handle_cb` the connection is dead.
"""
_v and LOG.debug('%r: shutting down', self, broker)
fire(self, 'shutdown')
for handle, (persist, fn) in self._handle_map.iteritems():
_v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
fn(Message.dead(self.broker_shutdown_msg))
def on_shutdown(self, broker):
"""
Respond to the broker's request for the stream to shut down by sending
SHUTDOWN to the child.
"""
LOG.debug('%r: requesting child shutdown', self)
self._send(
mitogen.core.Message(
src_id=mitogen.context_id,
dst_id=self.remote_id,
handle=mitogen.core.SHUTDOWN,
)
def reply(self, msg, router=None, **kwargs):
"""
Compose a reply to this message and send it using :attr:`router`, or
`router` is :attr:`router` is :data:`None`.
:param obj:
Either a :class:`Message`, or an object to be serialized in order
to construct a new message.
:param router:
Optional router to use if :attr:`router` is :data:`None`.
:param kwargs:
Optional keyword parameters overriding message fields in the reply.
"""
if not isinstance(msg, Message):
msg = Message.pickled(msg)
msg.dst_id = self.src_id
msg.handle = self.reply_to
vars(msg).update(kwargs)
if msg.handle:
(self.router or router).route(msg)
else:
LOG.debug('dropping reply to message with no return address: %r',
msg)
:param bool throw_dead:
If :data:`True`, raise exceptions, otherwise it is the caller's
responsibility.
:raises CallError:
The serialized data contained CallError exception.
:raises ChannelError:
The `is_dead` field was set.
"""
_vv and IOLOG.debug('%r.unpickle()', self)
if throw_dead and self.is_dead:
self._throw_dead()
obj = self._unpickled
if obj is Message._unpickled:
fp = BytesIO(self.data)
unpickler = _Unpickler(fp, **self.UNPICKLER_KWARGS)
unpickler.find_global = self._find_global
try:
# Must occur off the broker thread.
try:
obj = unpickler.load()
except:
LOG.error('raw pickle was: %r', self.data)
raise
self._unpickled = obj
except (TypeError, ValueError):
e = sys.exc_info()[1]
raise StreamError('invalid message: %s', e)
if throw:
def _receive_one(self, broker):
if self._input_buf_len < Message.HEADER_LEN:
return False
msg = Message()
msg.router = self._router
(magic, msg.dst_id, msg.src_id, msg.auth_id,
msg.handle, msg.reply_to, msg_len) = struct.unpack(
Message.HEADER_FMT,
self._input_buf[0][:Message.HEADER_LEN],
)
if magic != Message.HEADER_MAGIC:
LOG.error(self.corrupt_msg, self.stream.name, self._input_buf[0][:2048])
self.stream.on_disconnect(broker)
return False
if msg_len > self._router.max_message_size:
LOG.error('Maximum message size exceeded (got %d, max %d)',
msg_len, self._router.max_message_size)
self.stream.on_disconnect(broker)
return False
total_len = msg_len + Message.HEADER_LEN
if self._input_buf_len < total_len: