Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@orm.db_session
def get(self, topic_id):
topic_id = int(topic_id)
page = force_int(self.get_argument('page', 0), 0)
topic = Topic.get(id=topic_id)
if not topic:
raise tornado.web.HTTPError(404)
category = self.get_argument('category', None)
if not category:
category = 'all'
if category == 'all':
reply_count = topic.reply_count
url = topic.url
elif category == 'hot':
reply_count = orm.count(topic.get_replies(page=None,
category=category))
url = topic.url + '?category=hot'
page_count = (reply_count + config.reply_paged - 1) // config.reply_paged
if page == 0:
page = page_count
replies = topic.get_replies(page=page, category=category)
form = ReplyForm()
return self.render("topic/index.html", topic=topic, replies=replies,
form=form, category=category, page=page,
page_count=page_count, url=url)
url += '/topics'
elif view == 'replies':
items = user.get_replies(page=page, category=category)
item_count = orm.count(user.get_replies(page=None, category=category))
url += '/replies'
elif view == 'followings':
items = user.get_followings(page=page)
item_count = orm.count(user.get_followings(page=None))
url += '/followings'
elif view == 'followers':
items = user.get_followers(page=page)
item_count = orm.count(user.get_followers(page=None))
url += '/followers'
elif view == 'albums':
items = user.get_albums(page=page)
item_count = orm.count(user.get_albums(page=None))
url += '/albums'
page_count = (item_count + config.paged - 1) // config.paged
return self.render("user/index.html", user=user, items=items,
view=view, category=category, page=page,
page_count=page_count, url=url)
@orm.db_session
def get(self):
page = force_int(self.get_argument('page', 1), 1)
page_count = 0
nodes = []
category = self.get_argument('category', None)
hot_nodes = Node.get_nodes(category='hot', limit=8)
new_nodes = Node.get_nodes(category='new', limit=8)
url = '/nodes'
if category == 'all':
nodes = Node.get_nodes(category='all', page=page)
node_count = orm.count(Node.get_nodes(page=None))
page_count = (node_count + config.node_paged - 1) // config.node_paged
url = '/nodes?category=' + category
return self.render("node/show.html", hot_nodes=hot_nodes,
new_nodes=new_nodes, nodes=nodes, category=category, page=page,
page_count=page_count, url=url)
@orm.db_session
def get(self):
page = force_int(self.get_argument('page', 1), 1)
category = self.get_argument('category', None)
limit = 12
hot_users = User.get_users(category='hot', limit=limit)
new_users = User.get_users(category='new', limit=limit)
page_count = 0
users = []
url = '/users'
if category == 'all':
user_count = orm.count(User.get_users(page=None))
page_count = (user_count + config.user_paged - 1) // config.user_paged
users = User.get_users(page=page)
url = '/users?category=all'
elif category == 'online':
online_members = User.get_online_members()
online_members = [int(i) for i in online_members]
user_count = len(online_members)
online_members = online_members[(page - 1) * config.user_paged: page * config.user_paged]
users = User.select(lambda rv: rv.id in online_members)
page_count = (user_count + config.user_paged - 1) // config.user_paged
url = '/users?category=online'
return self.render("user/show.html", users=users, hot_users=hot_users,
new_users=new_users, page=page,
page_count=page_count, url=url, category=category)
def normalize(self):
# If we run the normalization for the first time during the runtime, we have to gather the activity from DB
self.total_activity = self.total_activity or orm.sum(g.votes for g in db.ChannelMetadata)
channel_count = orm.count(db.ChannelMetadata.select(lambda g: g.status != LEGACY_ENTRY))
if not channel_count:
return
if self.total_activity > 0.0:
self.rescale(self.total_activity / channel_count)
self.bump_amount = 1.0
def history_count(self):
return orm.count(self.histories)
stats = get(
(
# Last 60 minutes
sum(
r.points
for r in Round
if r.solver == self and r.start_time > now - timedelta(hours=1)
),
count(r.solver == self and r.start_time > now - timedelta(hours=1)),
# This day
sum(
r.points
for r in Round
if r.solver == self and r.start_time.date() == now.date()
),
count(r.solver == self and r.start_time.date() == now.date()),
)
for r in Round
)
return {
"points-1h": "{} ({})".format(stats[0], stats[1]),
"points-day": "{} ({})".format(stats[2], stats[3]),
}
def get_num_torrents(self):
return orm.count(self.TorrentMetadata.select(lambda g: g.metadata_type == REGULAR_TORRENT))
def add_tracker(self, tracker_url):
"""
Adds a new tracker into the tracker info dict and the database.
:param tracker_url: The new tracker URL to be added.
"""
sanitized_tracker_url = get_uniformed_tracker_url(tracker_url)
if sanitized_tracker_url is None:
self._logger.warning(u"skip invalid tracker: %s", repr(tracker_url))
return
with db_session:
num = count(g for g in self.tracker_store if g.url == sanitized_tracker_url)
if num > 0:
self._logger.debug(u"skip existing tracker: %s", repr(tracker_url))
return
# insert into database
self.tracker_store(url=sanitized_tracker_url,
last_check=0,
failures=0,
alive=True,
torrents={})
def contents_len(self):
return orm.count(self.contents)