Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return rows
def _compose_table(self, rows, majorusersN, activeusersN, totalusersN):
newtext = (self.INTRO).format(majorusersN, self.MINTOTEDITS,
activeusersN, self.MINRECEDITS,
"edits" if self.MINRECEDITS > 1 else "edit",
self.DAYS, self.modules.format_first_date(),
self.modules.format_last_date(), totalusersN)
newtext += Wikitable.assemble(self.FIELDS_FORMAT, rows)
self.text.replace(self.text, newtext, recursive=False)
if __name__ == "__main__":
import sys
import ws.config
statistics = ws.config.object_from_argparser(Statistics, description="Update the statistics page on ArchWiki")
sys.exit(statistics.run())
time_taken = time.time() - self._start_time
_logger.debug("%(type)s request from %(mac)s processed in %(seconds).4f seconds" % {
'type': self._packet_type,
'mac': self.mac,
'seconds': time_taken,
})
if self._definition:
ip = self._definition.ip
subnet = self._definition.subnet
serial = self._definition.serial
else:
subnet = serial = None
ip = self._associated_ip
statistics.emit(statistics.Statistics(
self.source_address, self.mac, ip, subnet, serial, self._packet_type, time_taken, not self._discarded, self.port,
))
from jsonrpc import AsyncJsonRpc
from asyncio import get_event_loop, gather, Task, sleep, ensure_future, iscoroutine
from config import cg_tcp_addr, cg_wsocket_addr, cg_public_ip_port, cg_node_name
from statistics import Statistics, get_timestamp
from glog import tcp_logger
# route_tree.create_node('node',cg_public_ip_port, data={Deposit:xx,Fee:xx,Balance:4 IP:xx,Publickey:xx,SpvList:[]}) # root node
node_list = set()
node = {
"wallet_info": None,
"route_tree": RouteTree(),
"spv_table": SPVHashTable(),
# configurable
"name": cg_node_name
}
global_statistics = Statistics()
class Gateway():
"""
gateway 类 定义了所有直接相关的行为与属性
"""
def __init__(self):
"""Counstruct"""
self.websocket = None
self.tcpserver = None
self.loop = None
self.tcp_pk_dict = {}
self.ws_pk_dict = {}
def _create_service_coros(self):
"""
创建tcp wsocket service coros\n
它们进入event_loop执行后最终返回tcp、wsocket server
"""
from asyncio import get_event_loop, gather, Task, sleep, ensure_future, iscoroutine
from config import cg_tcp_addr, cg_wsocket_addr, cg_public_ip_port, cg_node_name
from statistics import Statistics, get_timestamp
from glog import tcp_logger, wst_logger
# route_tree.create_node('node',cg_public_ip_port, data={Deposit:xx,Fee:xx,Balance:4 IP:xx,Publickey:xx,SpvList:[]}) # root node
node_list = set()
node = {
"wallet_info": None,
"route_tree": RouteTree(),
"route_graph": RouterGraph(),
"spv_table": SPVHashTable(),
# configurable
"name": cg_node_name
}
global_statistics = Statistics()
class Gateway():
"""
gateway class
"""
TransMessageType=["Rsmc",
"FounderSign",
"Founder",
"RsmcSign",
"FounderFail",
"Settle",
"SettleSign",
"SettleFail",
"RsmcFail",
"Htlc",
"HtlcSign",
"HtlcFail"]
def append2pushed_raffle(type, area_id=0, num=1):
inst = Statistics.instance
if '摩天' in type or '金人' in type or '/' in type:
inst.pushed_raffle[type] = inst.pushed_raffle.get(type, 0) + int(num)
else:
if area_id == 1:
inst.pushed_raffle[type] = inst.pushed_raffle.get(type, 0) + int(num)
Printer().printer(f"检测到房间 {real_roomid} 的广播抽奖 @[{self.area}分区]{self._roomId}", "Lottery", "cyan")
Rafflehandler().append2list_TV(real_roomid)
Statistics().append2pushed_TVlist(real_roomid, self.area[0])
else:
Printer().printer(f"{dic['msg_common']} @[{self.area}分区]{self._roomId}", "Info", "green")
except Exception:
Printer().printer(f"NOTICE_MSG出错,请联系开发者 {dic}", "Warning", "red")
elif cmd == 'SYS_MSG':
if set(dic) in [set(self.dic_bulletin), {'cmd', 'msg', 'msg_text'}, {'cmd', 'msg', 'url'}]:
Printer().printer(f"{dic['msg']} @[{self.area}分区]{self._roomId}", "Info", "green")
else:
try:
real_roomid = dic['real_roomid']
Printer().printer(f"检测到房间 {real_roomid} 的广播抽奖 @[{self.area}分区]{self._roomId}", "Lottery", "cyan")
Rafflehandler().append2list_TV(real_roomid)
Statistics().append2pushed_TVlist(real_roomid, self.area[0])
except:
Printer().printer(f"SYS_MSG出错,请联系开发者 {dic}", "Warning", "red")
# 观众相关 [欢迎入场,送礼,发弹幕]
elif cmd in ["WELCOME", "SEND_GIFT", "DANMU_MSG"]:
pass
# 各种通知 [通知(当前房间开奖 活动小时榜 各种SYS_MSG都会同时有NOTICE_MSG),系统通知(友爱社 心愿达成 绘马 主播招募 直播间强推)]
elif cmd in ["NOTICE_MSG", "SYS_MSG"]:
pass
# 各种高能 [节奏风暴(开始 结束),高能广播(无抽奖 活动高能 全频风暴),抽奖通知(现在广播全在这里了),总督广播]
elif cmd in ["SPECIAL_GIFT", "SYS_GIFT", "SYS_MSG", "GUARD_MSG", "GUIARD_MSG"]:
pass
# 礼物效果 [连击开始,连击结束,使用积分加成卡]
elif cmd in ["COMBO_SEND", "COMBO_END", "SCORE_CARD"]:
pass
# PK相关
async def query(self):
while 1:
await Statistics().clean_TV()
await asyncio.sleep(30)
def __init__(self, n, meta_db):
self.node = n
self.infrastructure = None
self.statistics = statistics.Statistics(self)
self.niu = None
self._cached_keys = CachedGzipResponse()
self.cached_keys_timer = periodic_timer(seconds=config.get('nodes_reload_period', 60))
self.statistics_monitor_enabled = bool(monitor.CoupleFreeEffectiveSpaceMonitor.STAT_CFG)
if self.statistics_monitor_enabled:
self.couple_free_eff_space_monitor = monitor.CoupleFreeEffectiveSpaceMonitor(meta_db)
self.couples_free_eff_space_collect_timer = periodic_timer(
seconds=self.couple_free_eff_space_monitor.DATA_COLLECT_PERIOD
)
try:
keys_db_uri = config['metadata']['cache']['db']
except KeyError:
date, days = __get_date_and_days(GarminDB.GarminDB(db_params_dict), latest, GarminDB.Sleep, GarminDB.Sleep.total_sleep, 'sleep')
if days > 0:
sleep_dir = GarminDBConfigManager.get_or_create_sleep_dir()
root_logger.info("Date range to update: %s (%d) to %s", date, days, sleep_dir)
download.get_sleep(sleep_dir, date, days, overwite)
root_logger.info("Saved sleep files for %s (%d) to %s for processing", date, days, sleep_dir)
if Statistics.weight in stats:
date, days = __get_date_and_days(GarminDB.GarminDB(db_params_dict), latest, GarminDB.Weight, GarminDB.Weight.weight, 'weight')
if days > 0:
weight_dir = GarminDBConfigManager.get_or_create_weight_dir()
root_logger.info("Date range to update: %s (%d) to %s", date, days, weight_dir)
download.get_weight(weight_dir, date, days, overwite)
root_logger.info("Saved weight files for %s (%d) to %s for processing", date, days, weight_dir)
if Statistics.rhr in stats:
date, days = __get_date_and_days(GarminDB.GarminDB(db_params_dict), latest, GarminDB.RestingHeartRate, GarminDB.RestingHeartRate.resting_heart_rate, 'rhr')
if days > 0:
rhr_dir = GarminDBConfigManager.get_or_create_rhr_dir()
root_logger.info("Date range to update: %s (%d) to %s", date, days, rhr_dir)
download.get_rhr(rhr_dir, date, days, overwite)
root_logger.info("Saved rhr files for %s (%d) to %s for processing", date, days, rhr_dir)
def DBStatis(keyword):
db = DB(Config.dbOpts)
sql = 'SELECT * FROM cve WHERE `desc` like "%%%s%%"' % keyword
ret = db.select(sql)
s = Statistics()
for kd in ret:
s.update(kd[1])
'''
if 'field' in kd[1]:
s.update(kd[1])
# print(kd[0], highlight(kd[1], ['field', keyword]))
'''
print(len(ret))
return s