Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Leave follow up comments if best practises were not followed."""
from gidgethub import routing
router = routing.Router()
REPLACE_GH_NUMBER_MESSAGE = "@{committer}: Please replace `#` with `GH-` in the commit message next time. Thanks!"
@router.register("pull_request", action="closed")
async def remind_replace_gh_number(event, gh, *args, **kwargs):
"""Remind core dev to replace # with GH-."""
if event.data["pull_request"]["merged"]:
pr_number = event.data["pull_request"]["number"]
commit_hash = event.data["pull_request"]["merge_commit_sha"]
commit = await gh.getitem(
event.data["repository"]["commits_url"],
{"sha": commit_hash})
commit_message = commit["commit"]["message"]
committer = event.data["pull_request"]["merged_by"]["login"]
if f"(#{pr_number})" in commit_message:
"""Automatically close PR that tries to merge maintenance branch into master."""
import re
import gidgethub.routing
PYTHON_MAINT_BRANCH_RE = re.compile(r'^\w+:\d+\.\d+$')
INVALID_PR_COMMENT = """\
PRs attempting to merge a maintenance branch into the \
master branch are deemed to be spam and automatically closed. \
If you were attempting to report a bug, please go to bugs.python.org; \
see devguide.python.org for further instruction as needed."""
router = gidgethub.routing.Router()
@router.register("pull_request", action="opened")
@router.register("pull_request", action="synchronize")
async def close_invalid_pr(event, gh, *args, **kwargs):
"""Close the invalid PR, add 'invalid' label, and post a message.
PR is considered invalid if:
* base_label is 'python:master'
* head_label is ':'
"""
head_label = event.data["pull_request"]["head"]["label"]
base_label = event.data["pull_request"]["base"]["label"]
if PYTHON_MAINT_BRANCH_RE.match(head_label) and \
base_label == "python:master":
data = {'state': 'closed'}
import asyncio
import os
import random
import gidgethub.routing
from kombu import exceptions as kombu_ex
from redis import exceptions as redis_ex
from . import tasks, util
EASTER_EGG = "I'm not a witch! I'm not a witch!"
router = gidgethub.routing.Router()
@router.register("pull_request", action="closed")
@router.register("pull_request", action="labeled")
async def backport_pr(event, gh, *args, **kwargs):
if event.data["pull_request"]["merged"]:
issue_number = event.data["pull_request"]["number"]
merged_by = event.data["pull_request"]["merged_by"]["login"]
created_by = event.data["pull_request"]["user"]["login"]
commit_hash = event.data["pull_request"]["merge_commit_sha"]
pr_labels = []
if event.data["action"] == "labeled":
pr_labels = [event.data["label"]]
import gidgethub.routing
from . import tasks
router = gidgethub.routing.Router()
SECRET_CODES = [
"turn off the sun",
"take down the moon",
"switch off the stars",
"paint the sky black",
"black out the sun",
]
@router.register("issues", action="reopened")
@router.register("issues", action="opened")
async def issue_opened(event, gh, *args, **kwargs):
if event.data["issue"]["title"].strip().lower() in SECRET_CODES:
issue_number = event.data["issue"]["number"]
import gidgethub.routing
router = gidgethub.routing.Router()
@router.register("pull_request", action="closed")
async def delete_branch(event, gh, *args, **kwargs):
"""
Delete the branch once miss-islington's PR is closed.
"""
if event.data["pull_request"]["user"]["login"] == "miss-islington":
branch_name = event.data["pull_request"]["head"]["ref"]
url = f"/repos/miss-islington/cpython/git/refs/heads/{branch_name}"
await gh.delete(url)
"""Check for a news entry."""
import functools
import pathlib
import re
import gidgethub.routing
from . import util
router = gidgethub.routing.Router()
create_status = functools.partial(util.create_status, 'bedevere/news')
BLURB_IT_URL = 'https://blurb-it.herokuapp.com'
FILENAME_RE = re.compile(r"""# YYYY-mm-dd or YYYY-mm-dd-HH-MM-SS
\d{4}-\d{2}-\d{2}(?:-\d{2}-\d{2}-\d{2})?\.
bpo-\d+(?:,\d+)*\. # Issue number(s)
[A-Za-z0-9_=-]+\. # Nonce (URL-safe base64)
rst # File extension""",
re.VERBOSE)
SKIP_NEWS_LABEL = util.skip_label("news")
SKIP_LABEL_STATUS = create_status(util.StatusState.SUCCESS,
description='"skip news" label found')
post = await request.post()
sha = post['sha'].strip()
async with dbpool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute('INSERT INTO authorized_shas (sha) VALUES (%s);', sha)
log.info(f'authorized sha: {sha}')
session = await aiohttp_session.get_session(request)
set_message(session, f'SHA {sha} authorized.', 'info')
raise web.HTTPFound('/')
@routes.get('/healthcheck')
async def healthcheck(request): # pylint: disable=unused-argument
return web.Response(status=200)
gh_router = gh_routing.Router()
@gh_router.register('pull_request')
async def pull_request_callback(event):
gh_pr = event.data['pull_request']
number = gh_pr['number']
target_branch = FQBranch.from_gh_json(gh_pr['base'])
for wb in watched_branches:
if (wb.prs and number in wb.prs) or (wb.branch == target_branch):
await wb.notify_github_changed(event.app)
@gh_router.register('push')
async def push_callback(event):
data = event.data
ref = data['ref']
while these are processed. Commands should not do anything taking
more than milliseconds.
"""
import logging
import re
import gidgethub.routing
from .commands import command_routes
from . import tasks
from .config import APP_ID
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
event_routes = gidgethub.routing.Router() # pylint: disable=invalid-name
BOT_ALIAS_RE = re.compile(r'@bioconda[- ]?bot', re.IGNORECASE)
@event_routes.register("issue_comment", action="created")
async def handle_comment_created(event, ghapi, *args, **_kwargs):
"""Handles comments on issues
- dispatches @biocondabot commands
- re-iterates commenets from non-members attempting to @mention @bioconda/xxx
"""
issue_number = event.get('issue/number', "NA")
comment_author = event.get("comment/user/login", "")
comment_body = event.get("comment/body", "")
# Ignore self mentions. This is important not only to avoid loops,
"""Check if a bugs.python.org issue number is specified in the pull request's title."""
import re
from gidgethub import routing
from . import util
router = routing.Router()
TAG_NAME = "issue-number"
CLOSING_TAG = f""
BODY = f"""\
{{body}}
https://bugs.python.org/issue{{issue_number}}
{CLOSING_TAG}
"""
ISSUE_RE = re.compile(r"bpo-(?P\d+)")
SKIP_ISSUE_LABEL = util.skip_label("issue")
STATUS_CONTEXT = "bedevere/issue-number"
# Try to keep descriptions at or below 50 characters, else GitHub's CSS will truncate it.
SKIP_ISSUE_STATUS = util.create_status(STATUS_CONTEXT, util.StatusState.SUCCESS,
description="Issue report skipped")
import aiohttp
from aiohttp import web
from celery import Celery
from celery import Task as _Task
from celery.signals import worker_process_init, celeryd_init
import gidgethub.routing
from . import utils
from . import __version__ as VERSION
from .githubhandler import GitHubAppHandler, GitHubHandler, Event
from .githandler import TempGitHandler
from .recipe import Recipe
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
event_routes = gidgethub.routing.Router() # pylint: disable=invalid-name
web_routes = web.RouteTableDef() # pylint: disable=invalid-name
BOT_NAME = "BiocondaBot"
BOT_ALIAS_RE = re.compile(r'@bioconda[- ]?bot', re.IGNORECASE)
BOT_EMAIL = "47040946+BiocondaBot@users.noreply.github.com"
class Task(_Task):
"""Task class with support for async tasks
We override celery.Task with our own version, with some extra
features and defaults:
- Since we already use a lot of async stuff elsewhere, it's useful
to allow the ``run`` method of tasks be ``async``. This Task