Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def subscribe(self, descr, callback, eventMask):
pollEventMask = 0
if eventMask & POLL_EVENT_TYPE.READ:
pollEventMask |= select.POLLIN
if eventMask & POLL_EVENT_TYPE.WRITE:
pollEventMask |= select.POLLOUT
if eventMask & POLL_EVENT_TYPE.ERROR:
pollEventMask |= select.POLLERR
self.__descrToCallbacks[descr] = callback
self.__poll.register(descr, pollEventMask)
self.recvRandKey = None
self.encryptor = None
self.__socket = socket
self.__readBuffer = bytes()
self.__writeBuffer = bytes()
self.__lastReadTime = time.time()
self.__timeout = timeout
self.__poller = poller
if socket is not None:
self.__socket = socket
self.__fileno = socket.fileno()
self.__state = CONNECTION_STATE.CONNECTED
self.__poller.subscribe(self.__fileno,
self.__processConnection,
POLL_EVENT_TYPE.READ | POLL_EVENT_TYPE.WRITE | POLL_EVENT_TYPE.ERROR)
else:
self.__state = CONNECTION_STATE.DISCONNECTED
self.__fileno = None
self.__socket = None
self.__onMessageReceived = onMessageReceived
self.__onConnected = onConnected
self.__onDisconnected = onDisconnected
self.__sendBufferSize = sendBufferSize
self.__recvBufferSize = recvBufferSize
def __init__(self, poller, callback = None):
self.__callback = callback
self.__pipeR, self.__pipeW = os.pipe()
flag = fcntl.fcntl(self.__pipeR, fcntl.F_GETFD)
fcntl.fcntl(self.__pipeR, fcntl.F_SETFL, flag | os.O_NONBLOCK)
flag = fcntl.fcntl(self.__pipeW, fcntl.F_GETFD)
fcntl.fcntl(self.__pipeW, fcntl.F_SETFL, flag | os.O_NONBLOCK)
poller.subscribe(self.__pipeR, self.__onNewNotification, POLL_EVENT_TYPE.READ)
def bind(self):
self.__socket = socket.socket(self.__hostAddrType, socket.SOCK_STREAM)
self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.__sendBufferSize)
self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.__recvBufferSize)
self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.__socket.setblocking(0)
self.__socket.bind((self.__host, self.__port))
self.__socket.listen(5)
self.__fileno = self.__socket.fileno()
self.__poller.subscribe(self.__fileno,
self.__onNewConnection,
POLL_EVENT_TYPE.READ | POLL_EVENT_TYPE.ERROR)
self.__state = SERVER_STATE.BINDED
def subscribe(self, descr, callback, eventMask):
self.unsubscribe(descr)
if eventMask & POLL_EVENT_TYPE.READ:
self.__descrsRead.add(descr)
if eventMask & POLL_EVENT_TYPE.WRITE:
self.__descrsWrite.add(descr)
if eventMask & POLL_EVENT_TYPE.ERROR:
self.__descrsError.add(descr)
self.__descrToCallbacks[descr] = callback
def subscribe(self, descr, callback, eventMask):
self.unsubscribe(descr)
if eventMask & POLL_EVENT_TYPE.READ:
self.__descrsRead.add(descr)
if eventMask & POLL_EVENT_TYPE.WRITE:
self.__descrsWrite.add(descr)
if eventMask & POLL_EVENT_TYPE.ERROR:
self.__descrsError.add(descr)
self.__descrToCallbacks[descr] = callback
def __processConnection(self, descr, eventType):
poller = self.__poller
if descr != self.__fileno:
poller.unsubscribe(descr)
return
if eventType & POLL_EVENT_TYPE.ERROR:
self.disconnect()
return
if time.time() - self.__lastReadTime > self.__timeout:
self.disconnect()
return
if eventType & POLL_EVENT_TYPE.READ or eventType & POLL_EVENT_TYPE.WRITE:
if self.__socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
self.disconnect()
return
if self.__state == CONNECTION_STATE.CONNECTING:
if self.__onConnected is not None:
self.__onConnected()
self.__state = CONNECTION_STATE.CONNECTED