Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _stream_wbopen(self, fname):
"""open new stream"""
ret = False
try:
parent = os.path.dirname(fname)
if not os.path.exists(parent):
os.makedirs(parent)
self._writestream = open(fname, 'w+b')
log.debug('open new stream succeed')
ret = True
except IOError as err:
log.error(
'IOError, failed to open stream, err:{0}, file:{1}'.format(
err, fname
)
)
except OSError as err:
log.error(
'OSError, failed to open stream, err:{0}, file:{1}'.format(
err, fname
)
)
return ret
ret = 0
if msg is None:
log.warn('put a None into msg send queue. return')
ret = -1
return ret
valid, errmsg = msg.is_valid4send(msg)
if not valid:
log.error('failed to send msg as msg is not valid to send')
return -1
flag = msg.get_flag()
peer = msg.get_to_addr()[0]
new_created = False
context = None
sock = None
if isinstance(msg, async_msg.CNeedAckMsg):
log.debug('CNeedAckMsg is to be sent. msg_type:%d,'
'msg_flag:%d, msg_dest:%s, uniqid:%d' %
(
msg.get_msg_type(),
msg.get_flag(),
str(msg.get_to_addr()),
msg.get_uniq_id()
)
)
# no need head by default
# msg.set_need_head(b_need=False)
if msg.get_last_retry_time() is None:
msg.set_last_retry_time(time.time())
# if not in the self._needack_context_dict
if msg.get_retry_times() <= 0:
self._needack_context_queue.put(msg)
try:
def dump_stats(self, print_stdout=False):
"""
Dump the threadpool stat to log or stdout. Info is from class method
[get_stats]
"""
stat = self.get_stats()
if print_stdout:
print(stat)
log.info('ThreadPool Stat %s: %s' % (self._name, stat))
log.debug('queue: %s' % self._jobqueue.queue)
log.debug('waiters: %s' % self._waiters)
log.debug('workers: %s' % self._working)
log.debug('total: %s' % self._threads)
return stat
if bol:
debug(msg, back_trace_len)
if __name__ == '__main__':
cup.log.debug('中文')
cup.log.init_comlog(
'test', cup.log.DEBUG, './test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.init_comlog(
'test', cup.log.DEBUG, './test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.info('test info')
cup.log.debug('test debug')
cup.log.info('中文'.decode('utf8'))
cup.log.reinit_comlog(
're-test', cup.log.DEBUG, './re.test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.reinit_comlog(
're-test', cup.log.DEBUG, './re.test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.info('re:test info')
cup.log.debug('re:test debug')
cup.log.debug('re:中文')
files = self._get_ordered_logfiles(folder)
length = len(files)
ind = -1
try:
ind = files.index(fname)
except ValueError:
log.error('cannot find current log stream:{0}'.format(fname))
return LOGFILE_BAD_RECORD
newfile = None
if ind < (length - 2):
newfile = '{0}/{1}'.format(folder, files[ind + 1])
elif ind == (length - 2):
if files[length - 1].find('writing') < 0:
newfile = '{0}/{1}'.format(folder, files[length - 1])
else:
log.debug('does not have more finished log edits to read')
return LOGFILE_EOF
elif ind == (length - 1):
log.info('does not have more log edits to read, return')
return LOGFILE_EOF
try:
self._load_stream.close()
self._load_stream = open(newfile, 'rb')
return LOGFILE_GOOD
except StandardError as err:
log.error('failed to move to next load stream:{0}'.format(newfile))
log.error('err:{0}'.format(err))
return LOGFILE_BAD_RECORD
'test', cup.log.DEBUG, './test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.info('test info')
cup.log.debug('test debug')
cup.log.info('中文'.decode('utf8'))
cup.log.reinit_comlog(
're-test', cup.log.DEBUG, './re.test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.reinit_comlog(
're-test', cup.log.DEBUG, './re.test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.info('re:test info')
cup.log.debug('re:test debug')
cup.log.debug('re:中文')
def cleanup_expired(self):
"""
Delete all expired items
"""
expired_keys = None
with self._lock_release(b_rw_lock=True):
expired_keys = self._get_expired_keys()
for key in expired_keys:
del self._kv_data[key]
cup.log.debug('key:%s cleaned up' % key)
return expired_keys
else:
if msg_key not in self._needack_context_dict:
self._needack_context_dict[msg_key] = msg_item
time_out_list = []
for key in self._needack_context_dict.keys():
msg_item = self._needack_context_dict[key]
msg_flag = msg_item.get_resend_flag()
msg_info = 'msg_type:%d, msg_flag:%d, msg_dest:%s,uniqid:%d' % (
msg_item.get_msg_type(),
msg_item.get_flag(),
str(msg_item.get_to_addr()),
msg_item.get_uniq_id()
)
if msg_flag == async_msg.MSG_RESEND_SUCCESS:
time_out_list.append(key)
log.debug(
'del succ-msg from resending queue: {0}'.format(msg_info)
)
elif msg_flag == async_msg.MSG_RESENDING_FLAG:
msg_total_timeout = msg_item.get_total_timeout()
# if msg resending timeouts
if msg_total_timeout <= 0:
msg_item.set_resend_flag(async_msg.MSG_TIMEOUT_TO_DELETE)
log.error(
'timeout, failed to get ack for netmsg:{0}'.format(
msg_info)
)
time_out_list.append(key)
else:
msg_last_retry_time = msg_item.get_last_retry_time()
msg_retry_interval = msg_item.get_retry_interval()
now = time.time()
)
ret = -1
else:
context = self._peer2context[peer]
self._mlock.release()
else:
context = self._peer2context[peer]
if ret != 0:
return ret
if not context.is_detroying():
if context.put_msg(flag, msg) == 0:
ret = 0
# set up last modify
else:
ret = -1
log.debug('start handle new send.')
self._handle_new_send(context)
return ret
def _do_write(self, context):
"""write into interface sending buffer"""
sock = context.get_sock()
msg = context.try_move2next_sending_msg()
if msg is None:
log.debug('send queue is empty, quit the _do_write thread')
return context
# log.debug('To enter write loop until eagin')
# pylint:disable=w0212
while not self._stopsign:
data = msg.get_write_bytes(self.NET_RW_SIZE)
log.debug('msg get_write_bytes_len to be sent: %d' % len(data))
try:
succ_len = sock.send(data)
msg.seek_write(succ_len)
except cuperr.AsyncMsgError as error:
log.debug('has seek out of msg len, continue')
except socket.error as error:
err = error.args[0]
if err == errno.EAGAIN:
log.debug(
'EAGAIN happend, context info %s' %
context.get_context_info()
)
return context
elif err == errno.EWOULDBLOCK:
log.debug(