Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if self.get_listened_peer() is None:
listened_peer = self._recving_msg.get_from_addr()[0]
self.set_listened_peer(listened_peer)
log.info(
'set listened peer {0} for this context({1})'.format(
listened_peer, self._peerinfo)
)
self._recving_msg = None
if self._conn.get_recv_queue().qsize() >= 500:
time.sleep(0.1)
self.move2recving_msg()
# the pushed data should span on two msg datas
if data_len > ret:
return self.do_recv_data(data[ret:], (data_len - ret))
else:
log.error(
'Socket error. We cannot get more than pushed data length'
)
assert False
return
ret = 0
try:
self._epoll.register(
sock.fileno(), self._epoll_write_params()
)
except Exception as error: # pylint: disable=W0703
log.warn(
'failed to register the socket fileno, err_msg:%s,'
'perinfo:%s:%s. To epoll modify it' %
(str(error), peer[0], peer[1])
)
self._epoll.modify(
sock.fileno(), self._epoll_write_params()
)
else:
log.error(
'failed to post msg. Connect failed. peer info:{0}.'
' msg_type:{1}'.format(
str(peer), msg.get_msg_type()
)
)
ret = -1
else:
context = self._peer2context[peer]
self._mlock.release()
else:
context = self._peer2context[peer]
if ret != 0:
return ret
if not context.is_detroying():
if context.put_msg(flag, msg) == 0:
ret = 0
elif err == errno.EWOULDBLOCK:
log.debug(
'EWOULDBLOCK happend, context info %s' %
context.get_context_info()
)
return context
else:
log.warn(
'Socket error happend. But its not eagin,error:%s,\
context info %s, errno:%s' %
(str(error), context.get_context_info(), err)
)
context.to_destroy()
break
except Exception as error:
log.error(
'Socket error happend, error:%s, context info %s, trace:%s' %
(str(error), context.get_context_info(), traceback.format_exc())
)
context.to_destroy()
break
finally:
del data
if msg.is_msg_already_sent():
log.info(
'sent out a msg uniqid:{0}'.format(
async_msg.netmsg_tostring(msg))
)
# if we have successfully send out a msg. Then move to next one
msg = context.try_move2next_sending_msg()
if msg is None:
break
def setup(self):
"""
setup the message center
"""
try:
self._bind_port()
return True
except socket.error as error:
log.error('bind error:{0}'.format(error))
return False
def push_msg2sendqueue(self, msg):
"""
push msg into the send queue
"""
ret = 0
if msg is None:
log.warn('put a None into msg send queue. return')
ret = -1
return ret
valid, errmsg = msg.is_valid4send(msg)
if not valid:
log.error('failed to send msg as msg is not valid to send')
return -1
flag = msg.get_flag()
peer = msg.get_to_addr()[0]
new_created = False
context = None
sock = None
if isinstance(msg, async_msg.CNeedAckMsg):
log.debug('CNeedAckMsg is to be sent. msg_type:%d,'
'msg_flag:%d, msg_dest:%s, uniqid:%d' %
(
msg.get_msg_type(),
msg.get_flag(),
str(msg.get_to_addr()),
msg.get_uniq_id()
)
)
'got less than data len from stream:{0}'.format(
stream.name)
)
return (LOGFILE_BAD_RECORD, None)
log_id = self.convert_bytes2uint(str_data[0: 16])
log_type = self.convert_bytes2uint(str_data[16: 16 + 2])
log_mode = self.convert_bytes2uint(str_data[18: 18 + 2])
log_binary = str_data[20:]
return (
LOGFILE_GOOD, LogRecord(
datalen, log_id, log_type, log_mode, log_binary
)
)
except Exception as err:
log.error('failed to parse log record:{0}'.format(err))
log.error(traceback.format_exc())
return (LOGFILE_BAD_RECORD, None)
log.warn(
'got less than data len from stream:{0}'.format(
stream.name)
)
return (LOGFILE_BAD_RECORD, None)
log_id = self.convert_bytes2uint(str_data[0: 16])
log_type = self.convert_bytes2uint(str_data[16: 16 + 2])
log_mode = self.convert_bytes2uint(str_data[18: 18 + 2])
log_binary = str_data[20:]
return (
LOGFILE_GOOD, LogRecord(
datalen, log_id, log_type, log_mode, log_binary
)
)
except Exception as err:
log.error('failed to parse log record:{0}'.format(err))
log.error(traceback.format_exc())
return (LOGFILE_BAD_RECORD, None)
log.debug('to fetch a msg from recv_queue for handle function')
try:
# should use block-mode, othwersie the while loop in the upper
# code scope will crazily occupy a full cpu-core capacity.
msg = self._recv_queue.get(block=True, timeout=0.5)[1]
except queue.Empty as error:
msg = None
except TypeError as error:
log.error('type error, seems received SIGTERM, err:{0}'.format(
error)
)
msg = None
except Exception as error:
msg = 'Catch a error that I cannot handle, err_msg:%s' % error
log.error(msg)
log.error(type(error))
raise CConnectionManager.QueueError(msg)
return msg