Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main(args=None, stdout=None, stderr=None):
set_streams(stdout=stdout, stderr=stderr)
parser = get_parser()
add_common_arguments(
parser, skip_hide_empty=True, skip_nested=True, path_nargs='?',
path_help='Base path to clone repositories to')
args = parser.parse_args(args)
try:
repos = get_repositories(args.input)
except RuntimeError as e:
print(ansi('redf') + str(e) + ansi('reset'), file=sys.stderr)
return 1
jobs = generate_jobs(repos, args)
add_dependencies(jobs)
if args.repos:
output_repositories([job['client'] for job in jobs])
results = execute_jobs(
jobs, show_progress=True, number_of_workers=args.workers,
debug_jobs=args.debug)
output_results(results)
any_error = any(r['returncode'] for r in results)
return 1 if any_error else 0
# errors are handled by a separate function
if result['returncode']:
return
try:
lines = []
lines.append(' %s:' % result['path'])
lines.append(' type: ' + result['client'].__class__.type)
export_data = result['export_data']
lines.append(' url: ' + export_data['url'])
if 'version' in export_data and export_data['version']:
lines.append(' version: ' + export_data['version'])
print('\n'.join(lines))
except KeyError as e:
print(
ansi('redf') + (
"Command '%s' failed for path '%s': %s: %s" % (
result['command'].__class__.command,
result['client'].path, e.__class__.__name__, e)) +
ansi('reset'),
file=sys.stderr)
ansi('yellowf') + 'List of repositories is empty' + ansi('reset'),
file=sys.stderr)
return repos
for path in repositories:
repo = {}
attributes = repositories[path]
try:
repo['type'] = attributes['type']
repo['url'] = attributes['url']
if 'version' in attributes:
repo['version'] = attributes['version']
except AttributeError as e:
print(
ansi('yellowf') + (
"Repository '%s' does not provide the necessary "
'information: %s' % (path, e)) + ansi('reset'),
file=sys.stderr)
continue
repos[path] = repo
return repos
if repositories is None:
print(
ansi('yellowf') + 'List of repositories is empty' + ansi('reset'),
file=sys.stderr)
return repos
for path in repositories:
repo = {}
attributes = repositories[path]
try:
repo['type'] = attributes['type']
repo['url'] = attributes['url']
if 'version' in attributes:
repo['version'] = attributes['version']
except AttributeError as e:
print(
ansi('yellowf') + (
"Repository '%s' does not provide the necessary "
'information: %s' % (path, e)) + ansi('reset'),
file=sys.stderr)
continue
repos[path] = repo
return repos
try:
lines = []
lines.append(' %s:' % result['path'])
lines.append(' type: ' + result['client'].__class__.type)
export_data = result['export_data']
lines.append(' url: ' + export_data['url'])
if 'version' in export_data and export_data['version']:
lines.append(' version: ' + export_data['version'])
print('\n'.join(lines))
except KeyError as e:
print(
ansi('redf') + (
"Command '%s' failed for path '%s': %s: %s" % (
result['command'].__class__.command,
result['client'].path, e.__class__.__name__, e)) +
ansi('reset'),
file=sys.stderr)
def output_error_information(result, hide_empty=False):
# successful results are handled by a separate function
if not result['returncode']:
return
if result['returncode'] == NotImplemented:
color = 'yellow'
else:
color = 'red'
line = '%s: %s' % (result['path'], result['output'])
print(ansi('%sf' % color) + line + ansi('reset'), file=sys.stderr)
def get_repos_in_rosinstall_format(root):
repos = {}
for i, item in enumerate(root):
if len(item.keys()) != 1:
raise RuntimeError('Input data is not valid format')
repo = {'type': list(item.keys())[0]}
attributes = list(item.values())[0]
try:
path = attributes['local-name']
except AttributeError as e:
print(
ansi('yellowf') + (
'Repository #%d does not provide the necessary '
'information: %s' % (i, e)) + ansi('reset'),
file=sys.stderr)
continue
try:
repo['url'] = attributes['uri']
if 'version' in attributes:
repo['version'] = attributes['version']
except AttributeError as e:
print(
ansi('yellowf') + (
"Repository '%s' does not provide the necessary "
'information: %s' % (path, e)) + ansi('reset'),
file=sys.stderr)
continue
repos[path] = repo
def get_repos_in_vcstool_format(repositories):
repos = {}
if repositories is None:
print(
ansi('yellowf') + 'List of repositories is empty' + ansi('reset'),
file=sys.stderr)
return repos
for path in repositories:
repo = {}
attributes = repositories[path]
try:
repo['type'] = attributes['type']
repo['url'] = attributes['url']
if 'version' in attributes:
repo['version'] = attributes['version']
except AttributeError as e:
print(
ansi('yellowf') + (
"Repository '%s' does not provide the necessary "
'information: %s' % (path, e)) + ansi('reset'),
file=sys.stderr)