Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for stream, state in self._state_by_stream.items():
state.lock.acquire()
try:
for sender, fp in reversed(state.jobs):
sender.close()
fp.close()
state.jobs.pop()
finally:
state.lock.release()
# The IO loop pumps 128KiB chunks. An ideal message is a multiple of this,
# odd-sized messages waste one tiny write() per message on the trailer.
# Therefore subtract 10 bytes pickle overhead + 24 bytes header.
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Message.HEADER_LEN + (
len(
mitogen.core.Message.pickled(
mitogen.core.Blob(b(' ') * mitogen.core.CHUNK_SIZE)
).data
) - mitogen.core.CHUNK_SIZE
))
def _schedule_pending_unlocked(self, state):
"""
Consider the pending transfers for a stream, pumping new chunks while
the unacknowledged byte count is below :attr:`window_size_bytes`. Must
be called with the FileStreamState lock held.
:param FileStreamState state:
Stream to schedule chunks for.
"""
while state.jobs and state.unacked < self.window_size_bytes:
sender, fp = state.jobs[0]
im_self = getattr(fn, IM_SELF_ATTR)
if not inspect.isclass(im_self):
raise TypeError(self.method_msg)
klass = mitogen.core.to_text(im_self.__name__)
else:
klass = None
tup = (
self.chain_id,
mitogen.core.to_text(fn.__module__),
klass,
mitogen.core.to_text(fn.__name__),
args,
mitogen.core.Kwargs(kwargs)
)
return mitogen.core.Message.pickled(tup,
handle=mitogen.core.CALL_FUNCTION)
def call_service_async(self, service_name, method_name, **kwargs):
_v and LOG.debug('calling service %s.%s of %r, args: %r',
service_name, method_name, self, kwargs)
if isinstance(service_name, BytesType):
service_name = service_name.encode('utf-8')
elif not isinstance(service_name, UnicodeType):
service_name = service_name.name() # Service.name()
tup = (service_name, to_text(method_name), Kwargs(kwargs))
msg = Message.pickled(tup, handle=CALL_SERVICE)
return self.send_async(msg)
def send(self, data):
"""
Send `data` to the remote end.
"""
_vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
self.context.send(Message.pickled(data, handle=self.dst_handle))
def _send_module_load_failed(self, stream, fullname):
self.bad_load_module_count += 1
stream.protocol.send(
mitogen.core.Message.pickled(
self._make_negative_response(fullname),
dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
def _send_load_module(self, stream, fullname):
if fullname not in stream.protocol.sent_modules:
tup = self._build_tuple(fullname)
msg = mitogen.core.Message.pickled(
tup,
dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
self._log.debug('sending %s (%.2f KiB) to %s',
fullname, len(msg.data) / 1024.0, stream.name)
self._router._async_route(msg)
stream.protocol.sent_modules.add(fullname)
if tup[2] is not None:
self.good_load_module_count += 1
self.good_load_module_size += len(msg.data)
else:
self.bad_load_module_count += 1