Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_execute(self):
"""Testing "execute" method."""
self.assertTrue(re.match('.*?%d.%d.%d' % sys.version_info[:3],
process.execute([sys.executable, '-V'])))
'ClearCase backend cannot set label when some elements are '
'checked out:\n%s' % ''.join(checkedout_elements))
# First create label in vob database.
execute(['cleartool', 'mklbtype', '-c', 'label created for rbtools',
label],
with_errors=True)
# We ignore return code 1 in order to omit files that ClearCase cannot
# read.
recursive_option = ''
if cpath.isdir(path):
recursive_option = '-recurse'
# Apply label to path.
execute(['cleartool', 'mklabel', '-nc', recursive_option, label, path],
extra_ignore_errors=(1,),
with_errors=False)
else:
sys.stderr.write('Failed to determine SVN '
'tracking branch. Defaulting'
'to "master"\n')
self.upstream_branch = 'master'
return SVNRepositoryInfo(path=path,
base_path=base_path,
uuid=uuid,
supports_parent_diffs=True)
else:
# Versions of git-svn before 1.5.4 don't (appear to) support
# 'git svn info'. If we fail because of an older git install,
# here, figure out what version of git is installed and give
# the user a hint about what to do next.
version = execute([self.git, "svn", "--version"],
ignore_errors=True)
version_parts = re.search('version (\d+)\.(\d+)\.(\d+)',
version)
svn_remote = execute(
[self.git, "config", "--get", "svn-remote.svn.url"],
ignore_errors=True)
if (version_parts and svn_remote and
not is_valid_version((int(version_parts.group(1)),
int(version_parts.group(2)),
int(version_parts.group(3))),
(1, 5, 4))):
die("Your installation of git-svn must be upgraded to "
"version 1.5.4 or later")
# Okay, maybe Perforce (git-p4).
cmdline.append(args[0])
if self.options:
if self.options.debug:
cmdline.append('--debug')
if getattr(self.options, 'tfs_shelveset_owner', None):
cmdline += ['--shelveset-owner',
self.options.tfs_shelveset_owner]
if getattr(self.options, 'tfs_login', None):
cmdline += ['--login', self.options.tfs_login]
cmdline += args[1:]
return execute(cmdline,
with_errors=False,
results_unicode=False,
return_error_code=True,
return_errors=True,
**kwargs)
old_tmp = make_tempfile(content=old_content)
new_tmp = make_tempfile(content=new_content)
diff_cmd = ['diff', '-uN', old_tmp, new_tmp]
dl = execute(diff_cmd,
extra_ignore_errors=(1, 2),
results_unicode=False,
split_lines=True)
# Replace temporary filenames with real directory names and add ids
if dl:
dl[0] = dl[0].replace(old_tmp.encode('utf-8'),
old_dir.encode('utf-8'))
dl[1] = dl[1].replace(new_tmp.encode('utf-8'),
new_dir.encode('utf-8'))
old_oid = execute(['cleartool', 'describe', '-fmt', '%On',
old_dir],
results_unicode=False)
new_oid = execute(['cleartool', 'describe', '-fmt', '%On',
new_dir],
results_unicode=False)
dl.insert(2, b'==== %s %s ====\n' % (old_oid, new_oid))
return dl
parentrevspec (bytes):
The revid spec of the old file.
changetype (bytes):
The change type as a single character string.
Returns:
list of bytes:
The computed diff.
"""
if filename.startswith(self.workspacedir):
filename = filename[len(self.workspacedir):]
# Diff returns "1" if differences were found.
dl = execute(['diff', '-urN', old_file, new_file],
extra_ignore_errors=(1, 2),
results_unicode=False)
# If the input file has ^M characters at end of line, lets ignore them.
dl = dl.replace(b'\r\r\n', b'\r\n')
dl = dl.splitlines(True)
# Special handling for the output of the diff tool on binary files:
# diff outputs "Files a and b differ"
# and the code below expects the output to start with
# "Binary files "
if (len(dl) == 1 and
dl[0].startswith(b'Files %s and %s differ'
% (old_file.encode('utf-8'),
new_file.encode('utf-8')))):
dl = [b'Binary files %s and %s differ\n'
def _get_current_branch(self):
"""Return the current branch of this repository."""
return execute(['hg', 'branch'], env=self._hg_env).strip()
Args:
filename (bytes):
The filename to fetch.
filespec (bytes):
The revision of the file to fetch.
tmpfile (unicode):
The name of the temporary file to write to.
"""
logging.debug('Writing "%s" (rev %s) to "%s"',
filename.decode('utf-8'),
filespec.decode('utf-8'),
tmpfile)
execute(['cm', 'cat', filespec, '--file=' + tmpfile])
if revert:
added_files = self.DELETED_FILES_RE.findall(patch)
deleted_files = self.ADDED_FILES_RE.findall(patch)
else:
added_files = self.ADDED_FILES_RE.findall(patch)
deleted_files = self.DELETED_FILES_RE.findall(patch)
# Prepend the root of the Perforce client to each file name.
p4_info = self.p4.info()
client_root = p4_info.get('Client root')
added_files = ['%s/%s' % (client_root, f) for f in added_files]
deleted_files = ['%s/%s' % (client_root, f) for f in deleted_files]
if added_files:
make_empty_files(added_files)
result = execute(['p4', 'add'] + added_files, ignore_errors=True,
none_on_ignored_error=True)
if result is None:
logging.error('Unable to execute "p4 add" on: %s',
', '.join(added_files))
else:
patched_empty_files = True
if deleted_files:
result = execute(['p4', 'delete'] + deleted_files,
ignore_errors=True, none_on_ignored_error=True)
if result is None:
logging.error('Unable to execute "p4 delete" on: %s',
', '.join(deleted_files))
else:
# 2014-01-02 First Name
#
# line 1
# line 2
# ...
#
# 2014-01-02 First Name
#
# ...
log_cmd = ['bzr', 'log', '-r',
'%s..%s' % (revisions['base'], revisions['tip'])]
# Find out how many commits there are, then log limiting to one fewer.
# This is because diff treats the range as (r1, r2] while log treats
# the lange as [r1, r2].
lines = execute(log_cmd + ['--line'],
ignore_errors=True, split_lines=True)
n_revs = len(lines) - 1
lines = execute(log_cmd + ['--gnu-changelog', '-l', str(n_revs)],
ignore_errors=True, split_lines=True)
message = []
for line in lines:
# We only care about lines that start with a tab (commit message
# lines) or blank lines.
if line.startswith('\t'):
message.append(line[1:])
elif not line.strip():
message.append(line)