Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _check_needack_queue(self):
"""
check needack_queue
"""
log.debug('start check needack_queue')
msg_item = None
ack_flag = async_msg.MSG_FLAG2NUM['FLAG_ACK']
while True:
msg_item = None
try:
msg_item = self._needack_context_queue.get_nowait()
except queue.Empty:
log.debug('no need ack msg found yet')
break
ack_success = False
toaddr = None
uniq_id = msg_item.get_uniq_id()
toaddr = msg_item.get_to_addr()[0]
if msg_item.get_flag() & ack_flag == ack_flag:
# if msg_item is a ack msg
log.info(
'msgack received, stop resending '
'msguniq_id:{0}'.format(uniq_id)
:TODO:
If the msg queue is too big, consider close the network link
"""
succ = None
self._lock.acquire()
if self._is_1st_send_msg:
msg.set_need_head(True)
# pylint: disable=W0212
msg._set_msg_len()
self._is_1st_send_msg = False
else:
msg.set_need_head(False)
msg._set_msg_len()
urgency = 1
is_urgent = flag & async_msg.MSG_FLAG2NUM['FLAG_URGENT']
if is_urgent == async_msg.MSG_FLAG2NUM['FLAG_URGENT']:
urgency = 0
try:
self._send_queue.put_nowait((urgency, self._msgind_in_sendque, msg))
self._msgind_in_sendque += 1
succ = 0
except queue.Full:
log.debug(
'network is busy. send_msg_queue is full, peerinfo:{0}'.format(
msg.get_to_addr()[0]
)
)
succ = 1
self._lock.release()
return succ
"""
ret = 0
if msg is None:
log.warn('put a None into msg send queue. return')
ret = -1
return ret
valid, errmsg = msg.is_valid4send(msg)
if not valid:
log.error('failed to send msg as msg is not valid to send')
return -1
flag = msg.get_flag()
peer = msg.get_to_addr()[0]
new_created = False
context = None
sock = None
if isinstance(msg, async_msg.CNeedAckMsg):
log.debug('CNeedAckMsg is to be sent. msg_type:%d,'
'msg_flag:%d, msg_dest:%s, uniqid:%d' %
(
msg.get_msg_type(),
msg.get_flag(),
str(msg.get_to_addr()),
msg.get_uniq_id()
)
)
# no need head by default
# msg.set_need_head(b_need=False)
if msg.get_last_retry_time() is None:
msg.set_last_retry_time(time.time())
# if not in the self._needack_context_dict
if msg.get_retry_times() <= 0:
self._needack_context_queue.put(msg)
def __init__(self, ip, port, thdpool_param=None, stat_intvl=20):
if thdpool_param is None:
thdpool_param = (3, 5)
self._conn_mgr = conn.CConnectionManager(
ip, port, thdpool_param
)
self._stop = False
self._stat_intvl = stat_intvl
self._stat_cond = threading.Condition()
self._type_man = async_msg.CMsgType()
self._type_man.register_types(async_msg.MSG_TYPE2NUM)
continue
# not ack_success + not in context_dict
else:
if msg_key not in self._needack_context_dict:
self._needack_context_dict[msg_key] = msg_item
time_out_list = []
for key in self._needack_context_dict.keys():
msg_item = self._needack_context_dict[key]
msg_flag = msg_item.get_resend_flag()
msg_info = 'msg_type:%d, msg_flag:%d, msg_dest:%s,uniqid:%d' % (
msg_item.get_msg_type(),
msg_item.get_flag(),
str(msg_item.get_to_addr()),
msg_item.get_uniq_id()
)
if msg_flag == async_msg.MSG_RESEND_SUCCESS:
time_out_list.append(key)
log.debug(
'del succ-msg from resending queue: {0}'.format(msg_info)
)
elif msg_flag == async_msg.MSG_RESENDING_FLAG:
msg_total_timeout = msg_item.get_total_timeout()
# if msg resending timeouts
if msg_total_timeout <= 0:
msg_item.set_resend_flag(async_msg.MSG_TIMEOUT_TO_DELETE)
log.error(
'timeout, failed to get ack for netmsg:{0}'.format(
msg_info)
)
time_out_list.append(key)
else:
msg_last_retry_time = msg_item.get_last_retry_time()
def move2recving_msg(self):
"""
get the net msg being received
"""
# if no recving msg pending there, create one.
if self._recving_msg is None:
self._recving_msg = async_msg.CNetMsg(is_postmsg=False)
self._recving_msg.set_msg_context(self)
if self._is_1st_recv_msg:
self._recving_msg.set_need_head(True)
else:
self._recving_msg.set_need_head(False)
self._recv_queue = queue.PriorityQueue(0)
self._stopsign = False
self._recv_msg_ind = 0
self._mlock = threading.Lock()
# _needack_context_queue
# infinite queue TODO: may change it in the future
self._needack_context_queue = queue.Queue()
self._dict_lock = threading.Lock()
self._needack_context_dict = {}
self._executor = executor.ExecutionService(
#int('queue_exec_thdnum'), # todo num?
#int('queue_delay_exe_thdnum') # todo num?
3,
4
)
self._type_man = async_msg.CMsgType()
self._type_man.register_types(async_msg.MSG_TYPE2NUM)
def _post_ackok_msg(self, to_addr, from_addr, uniq_id):
"""
create an ack msg
"""
log.info('post ack ok msg.')
msg = async_msg.CNetMsg(is_postmsg=True)
msg.set_to_addr(to_addr[0], to_addr[1])
msg.set_from_addr(from_addr[0], from_addr[1])
msg.set_msg_type(self._type_man.getnumber_bytype('ACK_OK'))
msg.set_flag(async_msg.MSG_FLAG2NUM['FLAG_NORMAL'])
msg.set_uniq_id(uniq_id)
msg.set_body('0')
self.post_msg(msg)
msg_info = 'msg_type:%d, msg_flag:%d, msg_dest:%s,uniqid:%d' % (
msg_item.get_msg_type(),
msg_item.get_flag(),
str(msg_item.get_to_addr()),
msg_item.get_uniq_id()
)
if msg_flag == async_msg.MSG_RESEND_SUCCESS:
time_out_list.append(key)
log.debug(
'del succ-msg from resending queue: {0}'.format(msg_info)
)
elif msg_flag == async_msg.MSG_RESENDING_FLAG:
msg_total_timeout = msg_item.get_total_timeout()
# if msg resending timeouts
if msg_total_timeout <= 0:
msg_item.set_resend_flag(async_msg.MSG_TIMEOUT_TO_DELETE)
log.error(
'timeout, failed to get ack for netmsg:{0}'.format(
msg_info)
)
time_out_list.append(key)
else:
msg_last_retry_time = msg_item.get_last_retry_time()
msg_retry_interval = msg_item.get_retry_interval()
now = time.time()
elapse_time = now - msg_last_retry_time
if elapse_time >= msg_retry_interval:
# update total_timeout
msg_item.set_total_timeout(
msg_total_timeout - elapse_time
)
msg_item.set_last_retry_time(now)