Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Set up a "parent" repository with an empty initial commit that we'll operate on
upstream_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("upstream_dir preserved at: {}".format(upstream_dir)))
else:
self.addCleanup(lambda: shutil.rmtree(upstream_dir))
self.upstream_sh = ghstack.shell.Shell(cwd=upstream_dir, testing=True)
# I plan to fix this type error soon
self.github = ghstack.github_fake.FakeGitHubEndpoint(self.upstream_sh)
local_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("local_dir preserved at: {}".format(local_dir)))
else:
self.addCleanup(lambda: shutil.rmtree(local_dir))
self.sh = ghstack.shell.Shell(cwd=local_dir, testing=True)
self.sh.git("clone", upstream_dir, ".")
self.rev_map = {}
self.substituteRev(GitCommitHash("HEAD"), SubstituteRev("rINI0"))
def setUp(self) -> None:
self.sh = ghstack.shell.Shell()
def strip_trailing_whitespace(text: str) -> str:
return re.sub(r' +$', '', text, flags=re.MULTILINE)
def indent(text: str, prefix: str) -> str:
return ''.join(prefix + line if line.strip() else line
for line in text.splitlines(True))
class TestGh(expecttest.TestCase):
proc: ClassVar[subprocess.Popen]
github: ghstack.github.GitHubEndpoint
rev_map: Dict[SubstituteRev, GitCommitHash]
upstream_sh: ghstack.shell.Shell
sh: ghstack.shell.Shell
def setUp(self) -> None:
# Set up a "parent" repository with an empty initial commit that we'll operate on
upstream_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("upstream_dir preserved at: {}".format(upstream_dir)))
else:
self.addCleanup(lambda: shutil.rmtree(upstream_dir))
self.upstream_sh = ghstack.shell.Shell(cwd=upstream_dir, testing=True)
# I plan to fix this type error soon
self.github = ghstack.github_fake.FakeGitHubEndpoint(self.upstream_sh)
local_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("local_dir preserved at: {}".format(local_dir)))
def main(msg=None, github=None, github_rest=None, sh=None, repo_owner=None,
repo_name=None, username="ezyang"):
if sh is None:
# Use CWD
sh = ghstack.shell.Shell()
if repo_owner is None or repo_name is None:
# Grovel in remotes to figure it out
origin_url = sh.git("remote", "get-url", "origin")
while True:
m = re.match(r'^git@github.com:([^/]+)/([^.]+)\.git$', origin_url)
if m:
repo_owner = m.group(1)
repo_name = m.group(2)
break
m = re.match(r'https://github.com/([^/]+)/([^.]+).git', origin_url)
if m:
repo_owner = m.group(1)
repo_name = m.group(2)
break
raise RuntimeError(
def main(pull_request: str,
github: ghstack.github.GitHubEndpoint,
sh: ghstack.shell.Shell) -> None:
# We land the entire stack pointed to by a URL.
# Local state is ignored; PR is source of truth
# Furthermore, the parent commits of PR are ignored: we always
# take the canonical version of the patch from any given pr
params = ghstack.github_utils.parse_pull_request(pull_request)
orig_ref = lookup_pr_to_orig_ref(github, **params)
if sh is None:
# Use CWD
sh = ghstack.shell.Shell()
# Get up-to-date
sh.git("fetch", "origin")
remote_orig_ref = "origin/" + orig_ref
base = GitCommitHash(sh.git("merge-base", "origin/master", remote_orig_ref))
# compute the stack of commits in chronological order (does not
# include base)
stack = ghstack.git.parse_header(
sh.git("rev-list", "--reverse", "--header", "^" + base, remote_orig_ref))
# Switch working copy
try:
prev_ref = sh.git("symbolic-ref", "--short", "HEAD")
except RuntimeError:
prev_ref = sh.git("rev-parse", "HEAD")
def main(commits: Optional[List[str]] = None,
sh: Optional[ghstack.shell.Shell] = None) -> GitCommitHash:
# If commits is empty, we unlink the entire stack
#
# For now, we only process commits on our current
# stack, because we have no way of knowing how to
# "restack" for other commits.
if sh is None:
# Use CWD
sh = ghstack.shell.Shell()
# Parse the commits
parsed_commits: Optional[Set[GitCommitHash]] = None
if commits:
parsed_commits = set()
for c in commits:
parsed_commits.add(GitCommitHash(sh.git("rev-parse", c)))
base = GitCommitHash(sh.git("merge-base", "origin/master", "HEAD"))
# compute the stack of commits in chronological order (does not
# include base)
stack = ghstack.git.split_header(
sh.git("rev-list", "--reverse", "--header", "^" + base, "HEAD"))
# sanity check the parsed_commits
def main(msg: Optional[str],
username: str,
github: ghstack.github.GitHubEndpoint,
update_fields: bool = False,
sh: Optional[ghstack.shell.Shell] = None,
stack_header: str = STACK_HEADER,
repo_owner: Optional[str] = None,
repo_name: Optional[str] = None,
short: bool = False,
force: bool = False,
no_skip: bool = False,
) -> List[DiffMeta]:
if sh is None:
# Use CWD
sh = ghstack.shell.Shell()
if repo_owner is None or repo_name is None:
# Grovel in remotes to figure it out
origin_url = sh.git("remote", "get-url", "origin")
while True:
m = re.match(r'^git@github.com:([^/]+)/([^.]+)(?:\.git)?$', origin_url)
if m:
repo_owner_nonopt = m.group(1)
repo_name_nonopt = m.group(2)
break
m = re.search(r'github.com/([^/]+)/([^.]+)', origin_url)
if m:
repo_owner_nonopt = m.group(1)
repo_name_nonopt = m.group(2)
break
raise RuntimeError(
# TODO: support number as well
status.add_argument('pull_request', metavar='PR',
help='GitHub pull request URL to perform action on')
args = parser.parse_args()
if args.version:
print("ghstack {}".format(ghstack.__version__))
return
if args.cmd is None:
args.cmd = 'submit'
with ghstack.logging.manager(debug=args.debug):
sh = ghstack.shell.Shell()
conf = ghstack.config.read_config()
ghstack.logging.formatter.redact(conf.github_oauth, '')
github = ghstack.github_real.RealGitHubEndpoint(
oauth_token=conf.github_oauth,
proxy=conf.proxy
)
if args.cmd == 'rage':
ghstack.rage.main(latest=args.latest)
elif args.cmd == 'submit':
ghstack.submit.main(
msg=args.message,
username=conf.github_username,
sh=sh,
github=github,
update_fields=args.update_fields,
'maintainer_can_modify': bool,
})
CreatePullRequestPayload = TypedDict('CreatePullRequestPayload', {
'number': int,
})
# The "database" for our mock instance
class GitHubState:
repositories: Dict[GraphQLId, 'Repository']
pull_requests: Dict[GraphQLId, 'PullRequest']
_next_id: int
_next_pull_request_number: Dict[GraphQLId, int]
root: 'Root'
upstream_sh: Optional[ghstack.shell.Shell]
def repository(self, owner: str, name: str) -> 'Repository':
nameWithOwner = "{}/{}".format(owner, name)
for r in self.repositories.values():
if r.nameWithOwner == nameWithOwner:
return r
raise RuntimeError("unknown repository {}".format(nameWithOwner))
def pull_request(self, repo: 'Repository', number: GitHubNumber
) -> 'PullRequest':
for pr in self.pull_requests.values():
if repo.id == pr._repository and pr.number == number:
return pr
raise RuntimeError(
"unrecognized pull request #{} in repository {}"
.format(number, repo.nameWithOwner))