Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self) -> None:
# Set up a "parent" repository with an empty initial commit that we'll operate on
upstream_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("upstream_dir preserved at: {}".format(upstream_dir)))
else:
self.addCleanup(lambda: shutil.rmtree(upstream_dir))
self.upstream_sh = ghstack.shell.Shell(cwd=upstream_dir, testing=True)
# I plan to fix this type error soon
self.github = ghstack.github_fake.FakeGitHubEndpoint(self.upstream_sh)
local_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("local_dir preserved at: {}".format(local_dir)))
else:
self.addCleanup(lambda: shutil.rmtree(local_dir))
self.sh = ghstack.shell.Shell(cwd=local_dir, testing=True)
self.sh.git("clone", upstream_dir, ".")
self.rev_map = {}
self.substituteRev(GitCommitHash("HEAD"), SubstituteRev("rINI0"))
# Set up a "parent" repository with an empty initial commit that we'll operate on
upstream_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("upstream_dir preserved at: {}".format(upstream_dir)))
else:
self.addCleanup(lambda: shutil.rmtree(upstream_dir))
self.upstream_sh = ghstack.shell.Shell(cwd=upstream_dir, testing=True)
# I plan to fix this type error soon
self.github = ghstack.github_fake.FakeGitHubEndpoint(self.upstream_sh)
local_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("local_dir preserved at: {}".format(local_dir)))
else:
self.addCleanup(lambda: shutil.rmtree(local_dir))
self.sh = ghstack.shell.Shell(cwd=local_dir, testing=True)
self.sh.git("clone", upstream_dir, ".")
self.rev_map = {}
self.substituteRev(GitCommitHash("HEAD"), SubstituteRev("rINI0"))
def strip_trailing_whitespace(text: str) -> str:
return re.sub(r' +$', '', text, flags=re.MULTILINE)
def indent(text: str, prefix: str) -> str:
return ''.join(prefix + line if line.strip() else line
for line in text.splitlines(True))
class TestGh(expecttest.TestCase):
proc: ClassVar[subprocess.Popen]
github: ghstack.github.GitHubEndpoint
rev_map: Dict[SubstituteRev, GitCommitHash]
upstream_sh: ghstack.shell.Shell
sh: ghstack.shell.Shell
def setUp(self) -> None:
# Set up a "parent" repository with an empty initial commit that we'll operate on
upstream_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("upstream_dir preserved at: {}".format(upstream_dir)))
else:
self.addCleanup(lambda: shutil.rmtree(upstream_dir))
self.upstream_sh = ghstack.shell.Shell(cwd=upstream_dir, testing=True)
# I plan to fix this type error soon
self.github = ghstack.github_fake.FakeGitHubEndpoint(self.upstream_sh)
local_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("local_dir preserved at: {}".format(local_dir)))
else:
def setUp(self) -> None:
self.sh = ghstack.shell.Shell()
def strip_trailing_whitespace(text: str) -> str:
return re.sub(r' +$', '', text, flags=re.MULTILINE)
def indent(text: str, prefix: str) -> str:
return ''.join(prefix + line if line.strip() else line
for line in text.splitlines(True))
class TestGh(expecttest.TestCase):
proc: ClassVar[subprocess.Popen]
github: ghstack.github.GitHubEndpoint
rev_map: Dict[SubstituteRev, GitCommitHash]
upstream_sh: ghstack.shell.Shell
sh: ghstack.shell.Shell
def setUp(self) -> None:
# Set up a "parent" repository with an empty initial commit that we'll operate on
upstream_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("upstream_dir preserved at: {}".format(upstream_dir)))
else:
self.addCleanup(lambda: shutil.rmtree(upstream_dir))
self.upstream_sh = ghstack.shell.Shell(cwd=upstream_dir, testing=True)
# I plan to fix this type error soon
self.github = ghstack.github_fake.FakeGitHubEndpoint(self.upstream_sh)
local_dir = tempfile.mkdtemp()
if GH_KEEP_TMP:
self.addCleanup(lambda: print("local_dir preserved at: {}".format(local_dir)))
def gh_land(self, pull_request: str) -> None:
return ghstack.land.main(
pull_request=pull_request,
github=self.github,
sh=self.sh
)
def gh(self, msg: str = 'Update',
update_fields: bool = False,
short: bool = False,
no_skip: bool = False) -> List[ghstack.submit.DiffMeta]:
return ghstack.submit.main(
msg=msg,
username='ezyang',
github=self.github,
sh=self.sh,
update_fields=update_fields,
stack_header='Stack',
repo_owner='pytorch',
repo_name='pytorch',
short=short,
no_skip=no_skip)
def gh_unlink(self) -> None:
ghstack.unlink.main(
sh=self.sh
)
def main(msg=None, github=None, github_rest=None, sh=None, repo_owner=None,
repo_name=None, username="ezyang"):
if sh is None:
# Use CWD
sh = ghstack.shell.Shell()
if repo_owner is None or repo_name is None:
# Grovel in remotes to figure it out
origin_url = sh.git("remote", "get-url", "origin")
while True:
m = re.match(r'^git@github.com:([^/]+)/([^.]+)\.git$', origin_url)
if m:
repo_owner = m.group(1)
repo_name = m.group(2)
break
m = re.match(r'https://github.com/([^/]+)/([^.]+).git', origin_url)
if m:
repo_owner = m.group(1)
repo_name = m.group(2)
break
raise RuntimeError(
def main(pull_request: str,
github: ghstack.github.GitHubEndpoint,
sh: ghstack.shell.Shell) -> None:
# We land the entire stack pointed to by a URL.
# Local state is ignored; PR is source of truth
# Furthermore, the parent commits of PR are ignored: we always
# take the canonical version of the patch from any given pr
params = ghstack.github_utils.parse_pull_request(pull_request)
orig_ref = lookup_pr_to_orig_ref(github, **params)
if sh is None:
# Use CWD
sh = ghstack.shell.Shell()
# Get up-to-date
sh.git("fetch", "origin")
remote_orig_ref = "origin/" + orig_ref
base = GitCommitHash(sh.git("merge-base", "origin/master", remote_orig_ref))
# compute the stack of commits in chronological order (does not
# include base)
stack = ghstack.git.parse_header(
sh.git("rev-list", "--reverse", "--header", "^" + base, remote_orig_ref))
# Switch working copy
try:
prev_ref = sh.git("symbolic-ref", "--short", "HEAD")
except RuntimeError:
prev_ref = sh.git("rev-parse", "HEAD")