Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def check_termination(self, method):
"""Helper method for process termination tests."""
with ExternalCommand('sleep', '60', check=False) as cmd:
timer = Timer()
# We use a positive but very low timeout so that all of the code
# involved gets a chance to run, but without slowing us down.
getattr(cmd, method)(timeout=0.1)
# Gotcha: Call wait() so that the process (our own subprocess) is
# reclaimed because until we do so proc.is_running will be True!
cmd.wait()
# Now we can verify our assertions.
assert not cmd.is_running, "Child still running despite graceful termination request!"
assert timer.elapsed_time < 10, "It look too long to terminate the child!"
def test_vcs_control_field(self):
"""Test that Debian ``Vcs-*`` control file fields can be generated."""
with TemporaryDirectory() as directory:
repository = self.get_instance(bare=False, local=directory)
self.create_initial_commit(repository)
returncode, output = run_cli(
main, '--repository=%s' % repository.local,
'--vcs-control-field',
)
self.assertEquals(returncode, 0)
assert repository.control_field in output
assert repository.find_revision_id() in output
def test_find_revision_id(self):
"""Test querying the command line interface for global revision ids."""
with TemporaryDirectory() as directory:
repository = self.get_instance(bare=False, local=directory)
repository.create()
self.create_initial_commit(repository)
# Check the global revision id of the initial commit.
revision_id = repository.find_revision_id()
self.assertIsInstance(revision_id, string_types)
self.assertTrue(revision_id)
# Get the global revision id using the command line interface.
returncode, output = run_cli(
main, '--repository=%s' % repository.local,
'--find-revision-id',
)
self.assertEquals(returncode, 0)
self.assertEquals(output.strip(), revision_id)
def test_find_revision_number(self):
"""Test querying the command line interface for local revision numbers."""
with TemporaryDirectory() as directory:
repository = self.get_instance(bare=False, local=directory)
repository.create()
self.create_initial_commit(repository)
# Check the revision number of the initial commit.
initial_revision_number = repository.find_revision_number()
assert initial_revision_number in (0, 1)
# Create a second commit.
self.create_followup_commit(repository)
# Check the revision number of the second commit.
second_revision_number = repository.find_revision_number()
assert second_revision_number in (1, 2)
assert second_revision_number > initial_revision_number
# Get the local revision number of a revision using the command line interface.
returncode, output = run_cli(
main, '--repository=%s' % repository.local,
'--find-revision-number',
def test_checkout(self):
"""Test checking out of branches."""
contents_on_default_branch = b"This will be part of the initial commit.\n"
contents_on_dev_branch = b"Not the same contents.\n"
unversioned_contents = b"This was never committed.\n"
with TemporaryDirectory() as directory:
repository = self.get_instance(bare=False, local=directory)
self.create_initial_commit(repository)
# Commit a change to README on another branch.
repository.create_branch('dev')
self.create_followup_commit(repository)
# Make sure the working tree can be updated to the default branch.
repository.checkout()
self.assertEquals(repository.context.read_file('README'), contents_on_default_branch)
# Make sure the working tree can be updated to the `dev' branch.
repository.checkout('dev')
self.assertEquals(repository.context.read_file('README'), contents_on_dev_branch)
# Make sure changes in the working tree can be discarded.
repository.context.write_file('README', unversioned_contents)
repository.checkout(clean=True)
self.assertEquals(repository.context.read_file('README'), contents_on_default_branch)
def test_is_clean(self):
"""Test check whether working directory is clean."""
with TemporaryDirectory() as directory:
# Initialize a repository object of the parametrized type.
repository = self.get_instance(bare=False, local=directory)
# Create the local repository.
repository.create()
# Make sure the working tree is considered clean.
assert repository.is_clean is True
# Commit a file to version control.
self.create_initial_commit(repository)
# Make sure the working tree is still considered clean.
assert repository.is_clean is True
# Change the previously committed file.
repository.context.write_file('README', "Not the same contents.\n")
# Make sure the working tree is now considered dirty.
assert repository.is_clean is False
# Make sure ensure_clean() now raises the expected exception.
self.assertRaises(WorkingTreeNotCleanError, repository.ensure_clean)
"""Test querying the command line interface for local revision numbers."""
with TemporaryDirectory() as directory:
repository = self.get_instance(bare=False, local=directory)
repository.create()
self.create_initial_commit(repository)
# Check the revision number of the initial commit.
initial_revision_number = repository.find_revision_number()
assert initial_revision_number in (0, 1)
# Create a second commit.
self.create_followup_commit(repository)
# Check the revision number of the second commit.
second_revision_number = repository.find_revision_number()
assert second_revision_number in (1, 2)
assert second_revision_number > initial_revision_number
# Get the local revision number of a revision using the command line interface.
returncode, output = run_cli(
main, '--repository=%s' % repository.local,
'--find-revision-number',
)
self.assertEquals(returncode, 0)
self.assertEquals(int(output), second_revision_number)
def fudge_factor_hammer():
timer = Timer()
returncode, output = run_cli(
main,
'--fudge-factor=%i' % fudge_factor,
*python_golf('import sys', 'sys.exit(0)')
)
assert returncode == 0
assert timer.elapsed_time > (fudge_factor / 2.0)
retry(fudge_factor_hammer, 60)
def test_tags(self):
"""Test that tags can be created and introspected."""
with TemporaryDirectory() as directory:
repository = self.get_instance(bare=False, local=directory)
# Create an initial commit and give it a tag.
self.create_initial_commit(repository)
initial_tag = random_string(10)
assert initial_tag not in repository.tags
repository.create_tag(initial_tag)
assert initial_tag in repository.tags
# Create a follow up commit and give it a tag.
self.create_followup_commit(repository)
followup_tag = random_string(10)
assert followup_tag not in repository.tags
repository.create_tag(followup_tag)
assert followup_tag in repository.tags
def commit_file(self, repository, filename=None, contents=None, message=None):
"""Commit a file to the given repository."""
filename = filename or random_string(15)
contents = contents or random_string(1024)
exists = repository.context.exists(filename)
repository.context.write_file(filename, contents)
repository.add_files(filename)
repository.commit(message=message or ("Committing %s file '%s'" % (
"changed" if exists else "new", filename,
)))