Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def addNote(req, db, user):
repository_id = req.getParameter("repository", filter=int)
branch = req.getParameter("branch")
upstream = req.getParameter("upstream")
sha1 = req.getParameter("sha1")
review_id = req.getParameter("review", None)
text = req.read().strip()
if review_id is not None:
review = dbutils.Review.fromId(db, review_id)
else:
review = None
cursor = db.cursor()
cursor.execute("DELETE FROM checkbranchnotes WHERE repository=%s AND branch=%s AND upstream=%s AND sha1=%s",
(repository_id, branch, upstream, sha1))
cursor.execute("INSERT INTO checkbranchnotes (repository, branch, upstream, sha1, uid, review, text) VALUES (%s, %s, %s, %s, %s, %s, %s)",
(repository_id, branch, upstream, sha1, user.id, review_id, text or None))
db.commit()
response = "ok"
if review and review.repository.id == repository_id:
repository = gitutils.Repository.fromId(db, repository_id)
commit = gitutils.Commit.fromSHA1(db, repository, sha1)
commitset = log.commitset.CommitSet(review.branch.getCommits(db))
def renderFilterChanges(req, db, user):
review_id = req.getParameter("review", filter=int)
first_sha1 = req.getParameter("first", None)
last_sha1 = req.getParameter("last", None)
cursor = db.cursor()
review = dbutils.Review.fromId(db, review_id)
root_directories = {}
root_files = {}
def processFile(file_id):
components = dbutils.describe_file(db, file_id).split("/")
directories, files = root_directories, root_files
for directory_name in components[:-1]:
directories, files = directories.setdefault(directory_name, ({}, {}))
files[components[-1]] = file_id
if first_sha1 and last_sha1:
cursor.execute("""SELECT commits.sha1
FROM commits
JOIN changesets ON (changesets.child=commits.id)
JOIN reviewchangesets ON (reviewchangesets.changeset=changesets.id)
def renderManageReviewers(req, db, user):
review_id = req.getParameter("review", filter=int)
cursor = db.cursor()
review = dbutils.Review.fromId(db, review_id)
root_directories = {}
root_files = {}
def processFile(file_id):
components = dbutils.describe_file(db, file_id).split("/")
directories, files = root_directories, root_files
for directory_name in components[:-1]:
directories, files = directories.setdefault(directory_name, ({}, {}))
files[components[-1]] = file_id
cursor.execute("SELECT file FROM reviewfiles WHERE review=%s", (review.id,))
for (file_id,) in cursor:
processFile(file_id)
def __call__(self, value, context):
super(Review, self).__call__(value, context)
context.review = dbutils.Review.fromId(context.db, value)
context.repository = context.review.repository
return context.review
def process(self, db, user, review_id, new_head_sha1):
review = dbutils.Review.fromId(db, review_id)
old_head = review.branch.getHead(db)
new_head = gitutils.Commit.fromSHA1(db, review.repository, new_head_sha1)
mergebase = review.repository.mergebase([old_head, new_head])
sha1s = review.repository.revlist([new_head], [mergebase])
valid = True
for sha1 in sha1s:
commit = gitutils.Commit.fromSHA1(db, review.repository, sha1)
if commit.tree == old_head.tree:
break
else:
valid = False
@staticmethod
def fromArgument(db, argument):
try:
return Review.fromId(db, int(argument))
except:
branch = Branch.fromName(db, str(argument))
if not branch: return None
return Review.fromBranch(db, branch)
@staticmethod
def fromBranch(db, branch):
if branch:
cursor = db.cursor()
cursor.execute("SELECT id FROM reviews WHERE branch=%s", [branch.id])
row = cursor.fetchone()
if not row: return None
else: return Review.fromId(db, row[0], branch)
else:
return None
def process(self, db, user, review_id):
review = dbutils.Review.fromId(db, review_id)
reviewing.utils.applyFilters(db, user, review, globalfilters=True)
return OperationResult()
[user.id])
profiler.check("query: open")
for review_id, summary, branch_id in cursor:
if includeReview(review_id):
other_open[review_id] = summary, branch_id, None, None
profiler.check("processing: open")
if other_open:
accepted = []
pending = []
for review_id, (summary, branch_id, lines, comments) in sortedReviews(other_open):
if dbutils.Review.isAccepted(db, review_id):
accepted.append((review_id, (summary, branch_id, lines, comments)))
else:
pending.append((review_id, (summary, branch_id, lines, comments)))
table = target.table("paleyellow reviews", id="open", align="center", cellspacing=0)
table.col(width="30%")
table.col(width="70%")
header = table.tr().td("h1", colspan=4).h1()
header.text("Open Reviews" if user.isAnonymous() else "Other Open Reviews")
header.span("right").a(href=hidden("open")).text("[hide]")
if accepted:
table.tr().td("h2", colspan=4).h2().text("Accepted")
renderReviews(table, accepted, False)
if pending: