Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def start_transmit(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)',
self, fd, data)
self._wfds[fd] = (data or fd, self._generation)
self._control(fd)
def _control(self, fd, filters, flags):
mitogen.core._vv and IOLOG.debug(
'%r._control(%r, %r, %r)', self, fd, filters, flags)
# TODO: at shutdown it is currently possible for KQ_EV_ADD/KQ_EV_DEL
# pairs to be pending after the associated file descriptor has already
# been closed. Fixing this requires maintaining extra state, or perhaps
# making fd closure the poller's responsibility. In the meantime,
# simply apply changes immediately.
# self._changelist.append(select.kevent(fd, filters, flags))
changelist = [select.kevent(fd, filters, flags)]
events, _ = mitogen.core.io_op(self._kqueue.control, changelist, 0, 0)
assert not events
def start_receive(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)',
self, fd, data)
if fd not in self._rfds:
self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
self._rfds[fd] = (data or fd, self._generation)
def on_receive(self, broker):
s = self.receive_side.read()
IOLOG.debug('%r.on_receive() -> len %r', self, len(s))
if s:
mitogen.core.fire(self, 'receive', s)
else:
self.on_disconnect(broker)
def _control(self, fd):
mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd)
mask = (((fd in self._rfds) and select.EPOLLIN) |
((fd in self._wfds) and select.EPOLLOUT))
if mask:
if fd in self._registered_fds:
self._epoll.modify(fd, mask)
else:
self._epoll.register(fd, mask)
self._registered_fds.add(fd)
elif fd in self._registered_fds:
self._epoll.unregister(fd)
self._registered_fds.remove(fd)
changelist, 32, timeout)
for event in events:
fd = event.ident
if event.flags & select.KQ_EV_ERROR:
LOG.debug('ignoring stale event for fd %r: errno=%d: %s',
fd, event.data, errno.errorcode.get(event.data))
elif event.filter == select.KQ_FILTER_READ:
data, gen = self._rfds.get(fd, (None, None))
# Events can still be read for an already-discarded fd.
if gen and gen < self._generation:
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
yield data
elif event.filter == select.KQ_FILTER_WRITE and fd in self._wfds:
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
yield data