Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_search_keyword(self):
"""
Test searching in a database with some torrent metadata inserted
"""
torrent1 = self.mds.TorrentMetadata.from_dict(dict(rnd_torrent(), title="foo bar 123"))
torrent2 = self.mds.TorrentMetadata.from_dict(dict(rnd_torrent(), title="eee 123"))
self.mds.TorrentMetadata.from_dict(dict(rnd_torrent(), title="xoxoxo bar"))
self.mds.TorrentMetadata.from_dict(dict(rnd_torrent(), title="xoxoxo bar"))
self.mds.TorrentMetadata.from_dict(dict(rnd_torrent(), title=u"\""))
self.mds.TorrentMetadata.from_dict(dict(rnd_torrent(), title=u"\'"))
orm.flush()
# Search for torrents with the keyword 'foo', it should return one result
results = self.mds.TorrentMetadata.search_keyword("foo")[:]
self.assertEqual(len(results), 1)
self.assertEqual(results[0].rowid, torrent1.rowid)
# Search for torrents with the keyword 'eee', it should return one result
results = self.mds.TorrentMetadata.search_keyword("eee")[:]
self.assertEqual(len(results), 1)
self.assertEqual(results[0].rowid, torrent2.rowid)
# Search for torrents with the keyword '123', it should return two results
results = self.mds.TorrentMetadata.search_keyword("123")[:]
self.assertEqual(len(results), 2)
else:
LOGGER.warning(
"Failed to link to unknown entry '%s -> %s'; ignoring", relpath, attach)
remove_attach = []
for attach in record.attachments:
if attach not in set_attach:
remove_attach.append(attach)
LOGGER.debug("set_attach %s remove_attach %s", set_attach, remove_attach)
for attach in remove_attach:
record.attachments.remove(attach)
for attach in set_attach:
record.attachments.add(attach)
orm.commit()
# do final fixups
if record.status == model.PublishStatus.DRAFT.value:
LOGGER.info("Not touching draft entry %s", fullpath)
elif fixup_needed:
LOGGER.info("Fixing up entry %s", fullpath)
result = save_file(fullpath, entry, check_fingerprint)
return result
@orm.db_session
def expire_record(record):
""" Expire a record for a missing entry """
# This entry no longer exists so delete anything that relies on it
orm.delete(pa for pa in model.PathAlias if pa.entry == record)
# mark the entry as GONE to remove it from indexes
record.status = model.PublishStatus.GONE.value
orm.commit()
@orm.db_session
@tornado.web.authenticated
def put(self, tweet_id):
tweet_id = int(tweet_id)
tweet = Tweet.get(id=tweet_id)
if not tweet:
raise tornado.web.HTTPError(404)
action = self.get_argument('action', None)
user = self.current_user
if not action:
result = {'status': 'error', 'message':
'缺少参数'}
return self.send_result(result)
if action == 'up':
if tweet.user_id != user.id:
result = user.up(tweet_id=tweet.id)
else:
# We're not assigning IDs yet
return None
if not entry_id:
# See if we already have an entry with this file path
by_filepath = model.Entry.select(lambda e: e.file_path == fullpath).first()
if by_filepath:
entry_id = by_filepath.id
if not entry_id:
# We still don't have an ID; generate one pseudo-randomly, based on the
# entry file path. This approach averages around 0.25 collisions per ID
# generated while keeping the entry ID reasonably short. In general,
# count*N averages 1/(N-1) collisions per ID.
limit = max(10, orm.get(orm.count(e)
for e in model.Entry) * 5) # type:ignore
attempt = 0
while not entry_id or model.Entry.get(id=entry_id):
# Stably generate a quasi-random entry ID from the file path
md5 = hashlib.md5()
md5.update(f"{fullpath} {attempt}".encode('utf-8'))
entry_id = int.from_bytes(md5.digest(), byteorder='big') % limit
attempt = attempt + 1
if other_entry:
LOGGER.warning("Entry '%s' had ID %d, which belongs to '%s'. Reassigned to %d",
fullpath, other_entry.id, other_entry.file_path, entry_id)
return entry_id
def garbage_collect(self):
orm.delete(g for g in self.older_entries if g.type == MetadataTypes.DELETED.value)
# coding: utf-8
import time
from pony import orm
from ._base import db, BaseModel
import collipa.models
from collipa.helpers import cached_property
class Follow(db.Entity, BaseModel):
who_id = orm.Required(int)
whom_id = orm.Optional(int)
topic_id = orm.Optional(int)
node_id = orm.Optional(int)
created_at = orm.Required(int, default=int(time.time()))
follow_class_id = orm.Optional(int)
def __str__(self):
return self.id
def __repr__(self):
return '' % self.id
@cached_property
def who(self):
return collipa.models.User.get(id=self.who_id)
@cached_property
def whom(self):
return collipa.models.User.get(id=self.whom_id)
def make_migration_entity(db):
db.migration_in_progress = True
class Migration(db.Entity):
name = orm.Required(str)
applied = orm.Required(datetime)
@orm.db_session
def put(self, tweet_id):
tweet_id = int(tweet_id)
tweet = Tweet.get(id=tweet_id)
if not tweet:
raise tornado.web.HTTPError(404)
action = self.get_argument('action', None)
user = self.current_user
if action and user:
if action == 'up':
if tweet.user_id != user.id:
result = user.up(tweet_id=tweet.id)
else:
result = {'status': 'info', 'message':
'不能为自己的推文投票'}
if action == 'down':
if tweet.user_id != user.id: