Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def filter_interesting_bugs(self, bugs):
"""Get the bugs with number of comments less than self.ncommments
"""
def comment_handler(bug, bugid, data):
if len(bug["comments"]) <= self.ncomments:
data.append(bugid)
bugids = list(bugs.keys())
data = []
Bugzilla(
bugids=bugids,
commenthandler=comment_handler,
commentdata=data,
comment_include_fields=["count"],
).get_data().wait()
bugs = {bugid: bugs[bugid] for bugid in data}
return bugs
"cf_status_firefox66",
"cf_status_firefox65",
"cf_status_firefox64",
"cf_status_firefox63",
"cf_status_firefox62",
],
"equals": ["commenter", "setters.login_name"],
}
queries = []
bugids = set()
for op, fs in fields.items():
for f in fs:
params = {"include_fields": "id", "f1": f, "o1": op, "v1": History.BOT}
queries.append(
Bugzilla(params, bughandler=bug_handler, bugdata=bugids, timeout=20)
)
for q in queries:
q.get_data().wait()
logger.info("History: get bugs: end.")
return bugids
attachmentdata=data,
attachment_include_fields=[
"bug_id",
"creator",
"data",
"is_obsolete",
"is_patch",
"content_type",
"flags",
],
).get_data().wait()
data = {bugid: v for bugid, v in data.items() if v["patch"] is not None}
splinter_bugs = [bugid for bugid, v in data.items() if v["patch"] == "splinter"]
Bugzilla(
bugids=splinter_bugs,
commenthandler=comment_handler,
commentdata=data,
comment_include_fields=["text"],
).get_data().wait()
data = {
bugid: {"authors": v["author"], "patch_count": v["count"]}
for bugid, v in data.items()
if v["patch"] == "phab" or not v["landed"]
}
return data
def get_not_landed(self, bugs):
not_landed = set()
def comment_handler(bug, bugid, data):
r = Bugzilla.get_landing_comments(bug["comments"], [], self.channel_pat)
if not r:
not_landed.add(bugid)
return
data[bugid]["land"] = {
i["revision"]: {"ok": False, "bugid": bugid} for i in r
}
bugids = list(bugs.keys())
Bugzilla(
bugids=bugids,
commenthandler=comment_handler,
commentdata=bugs,
comment_include_fields=["text"],
).get_data().wait()
not_landed |= self.get_hg(bugs)
for bugid, info in bugs.items():
if "land" in info:
del info["land"]
info["landed"] = "No" if bugid in not_landed else "Yes"
return bugs
def get_ids(params):
assert "include_fields" not in params or params["include_fields"] == "id"
old_CHUNK_SIZE = Bugzilla.BUGZILLA_CHUNK_SIZE
try:
Bugzilla.BUGZILLA_CHUNK_SIZE = 7000
all_ids = []
def bughandler(bug):
all_ids.append(bug["id"])
params["include_fields"] = "id"
Bugzilla(params, bughandler=bughandler).get_data().wait()
finally:
Bugzilla.BUGZILLA_CHUNK_SIZE = old_CHUNK_SIZE
return all_ids
def get_nightly_version_from_bz():
def bug_handler(bug, data):
status = "cf_status_firefox"
N = len(status)
for k in bug.keys():
if k.startswith(status):
k = k[N:]
if k.isdigit():
data.append(int(k))
data = []
Bugzilla(bugids=["1234567"], bughandler=bug_handler, bugdata=data).get_data().wait()
return max(data)
def get_bugs(self, date="today", bug_ids=[], chunk_size=None):
"""Get the bugs"""
bugs = self.get_data()
params = self.get_bz_params(date)
self.amend_bzparams(params, bug_ids)
self.query_url = utils.get_bz_search_url(params)
if isinstance(self, Nag):
self.query_params = params
old_CHUNK_SIZE = Bugzilla.BUGZILLA_CHUNK_SIZE
try:
if chunk_size:
Bugzilla.BUGZILLA_CHUNK_SIZE = chunk_size
Bugzilla(
params,
bughandler=self.bughandler,
bugdata=bugs,
timeout=self.get_config("bz_query_timeout"),
).get_data().wait()
finally:
Bugzilla.BUGZILLA_CHUNK_SIZE = old_CHUNK_SIZE
self.get_comments(bugs)
return bugs
data["bugid"] = ""
for info in bugs.values():
for rev, i in info["land"].items():
queries.append(Query(url, {"node": rev}, handler_rev, i))
if queries:
hgmozilla.Revision(queries=queries).wait()
# clean
bug_torm = []
for bug, info in bugs.items():
torm = []
for rev, i in info["land"].items():
if not i["bugid"] or not (
self.date <= lmdutils.get_date_ymd(i["date"]) < self.tomorrow
):
torm.append(rev)
for x in torm:
del info["land"][x]
if not info["land"]:
bug_torm.append(bug)
for x in bug_torm:
del bugs[x]
self.get_hg_patches(bugs)
data[bugid] = set()
m = HG_MAIL.match(user)
if m:
hgname = m.group(1).strip()
hgmail = m.group(2).strip()
data[bugid].add((hgname, hgmail))
url = hgmozilla.Revision.get_url("nightly")
queries = []
for bugid, info in bzdata.items():
hdler = functools.partial(handler_rev, bugid)
for rev in info["revisions"]:
queries.append(Query(url, {"node": rev}, hdler, self.hgdata))
if queries:
hgmozilla.Revision(queries=queries).wait()
self.set_autofixable(bzdata, user_info)
return self.hgdata
def get_hg(self, bugs):
url = hgmozilla.Revision.get_url(self.channel)
queries = []
not_landed = set()
def handler_rev(json, data):
info = utils.get_info_from_hg(json)
if info["bugid"] == data["bugid"] and not info["backedout"]:
data["ok"] = True
for info in bugs.values():
for rev, i in info.get("land", {}).items():
queries.append(Query(url, {"node": rev}, handler_rev, i))
if queries:
hgmozilla.Revision(queries=queries).wait()
for bugid, info in bugs.items():
if all(not i["ok"] for i in info.get("land", {}).values()):
not_landed.add(bugid)
return not_landed