Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
@author
@brief
@note
"""
import time
import random
import cup
import logging
import threading
import test_msg_center
@cup.decorators.Singleton
class MsgGenerator(object):
def __init__(self, is_ack=False):
self._flagman = cup.net.async.CMsgFlag()
self._flagman.register_flags({
'URGENT': 0,
'NORMAL': 1,
})
self._type_man = cup.net.async.CMsgType()
self._is_ack = is_ack
def get_msg(self, self_ipport, to_ipport, i):
msg = cup.net.async.CNetMsg(is_postmsg=True)
msg.set_flag(self._flagman.getnumber_byflag('NORMAL'))
msg.set_from_addr(self_ipport, (1, 2))
msg.set_to_addr(to_ipport, (3, 4))
msg.set_msg_type(self._type_man.getnumber_bytype('ACK'))
:param exec_cwd:
exec working directory. Plz use
:param tostr:
recipt list, separated by ,
:param subject:
subject
:param body:
email content
:param attach:
email attachment
:param content_is_html:
is htm mode opened
:return:
return True on success, False otherwise
"""
decorators.needlinux(mutt_sendmail)
shellobj = shell.ShellExec()
temp_cwd = os.getcwd()
str_att = ''
cmdstr = ''
if attach == '':
if content_is_html is True:
cmdstr = 'echo "%s"|/usr/bin/mutt -e "my_hdr Content-Type:'\
'text/html" -s "%s" %s' \
% (body, subject, tostr)
else:
cmdstr = 'echo "%s"|/usr/bin/mutt -s "%s" %s' % (
body, subject, tostr
)
else:
attlist = attach.strip().split(',')
@decorators.needlinux
def _real_is_proc_exist(path, name):
"""
_real_is_proc_exist
"""
path = os.path.realpath(os.path.abspath(path))
cmd = 'ps -ef|grep %s|grep -v "^grep "|grep -v "^vim "|grep -v "^less "|\
grep -v "^vi "|grep -v "^cat "|grep -v "^more "|grep -v "^tail "|\
awk \'{print $2}\'' % (name)
ret = cup.shell.ShellExec().run(cmd, 10)
pids = ret['stdout'].strip().split('\n')
if len(pids) == 0 or len(pids) == 1 and len(pids[0]) == 0:
return False
for pid in pids:
for sel_path in ["cwd", "exe"]:
cmd = 'ls -l /proc/%s/%s|awk \'{print $11}\' ' % (pid, sel_path)
ret = cup.shell.ShellExec().run(cmd, 10)
def get_net_transmit_speed(str_interface, intvl_in_sec=1):
"""
get network interface write/read speed
E.g.
::
import cup
print cup.res.linux.get_net_transmit_speed('eth1', 5)
"""
decorators.needlinux(True)
cup.unittest.assert_gt(intvl_in_sec, 0)
rx_bytes0 = get_net_through(str_interface)[0]
time.sleep(intvl_in_sec)
rx_bytes1 = get_net_through(str_interface)[0]
return (rx_bytes1 - rx_bytes0) / intvl_in_sec
'ACK_FAILURE': 5,
'ACK_HEART_BEAT': 6,
'ACK_CREATE': 7,
'NEED_ACK': 8
}
MSG_FLAG2NUM = {
'FLAG_URGENT': 0X00000001,
'FLAG_NORMAL': 0X00000002,
'FLAG_NEEDACK':0X00000004,
'FLAG_ACK': 0X00000008,
}
@cup.decorators.Singleton
class CMsgType(object):
"""
for netmsg types
"""
def __init__(self):
self._type2number = {}
self._number2type = {}
def register_types(self, kvs):
"""
register types
"""
for key_value in kvs.items():
self._type2number[key_value[0]] = key_value[1]
self._number2type[str(key_value[1])] = key_value[0]
#!/usr/bin/env python
# -*- coding: utf-8 -*
"""
:authors:
Guannan Ma maguannan @mythmgn
:create_date:
2016.5.10
:description:
generate unique id for services
"""
from cup import decorators
@decorators.Singleton
class UniqueID(object):
"""
generate unique id
"""
def __init__(self, low, high, increment):
pass
def next(self):
"""get next unique id"""
import string
import socket
import struct
import threading
import cup
from cup import decorators
__all__ = [
'CGeneratorMan',
'CycleIDGenerator'
]
@decorators.Singleton
class CGeneratorMan(object):
"""
refer to the docstring
"""
def __init__(self, str_prefix='localhost'):
"""
Generate unique integers, strings and auto incremental uint.
"""
if str_prefix == 'localhost':
prefix = cup.net.get_local_hostname()
else:
prefix = str(str_prefix)
self._prefix = prefix + str(os.getpid())
self._lock = threading.Lock()
self._ind = 0
@decorators.needmac
def _get_cpu_nums():
return os.sysconf("SC_NPROCESSORS_ONLN")
return _get_cpu_nums()
@decorators.needposix
def lock(self, blocking=True):
"""
lock the file
:param blocking:
If blocking is True, will block there until cup gets the lock.
True by default.
:return:
return False if locking fails
:raise Exception:
raise cup.err.LockFileError if blocking is False and
the lock action failed
"""
if platforms.is_linux():
import sys
import signal
_NOW_PATH = os.path.dirname(os.path.abspath(__file__)) + '/'
_TOP_PATH = os.path.abspath(_NOW_PATH + '/../../')
from cup import decorators
from cup import log
from cup import net
from cup.util import conf
from arrow.master import control
from arrow.common import settings
@decorators.Singleton
class Master(object):
"""
Master class
"""
_heartbeat_service = None
_control_service = None
_conf_dict = None
def __init__(self, conf_file):
# load conf
self._load_conf(conf_file)
# control service
self._control_service = control.ControlService(
net.get_hostip(), int(self._conf_dict['control']['port']),
self._conf_dict
)