Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pull_api(self):
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
from vcstool.commands.pull import main
stdout_stderr = StringIO()
# change and restore cwd
cwd_bck = os.getcwd()
os.chdir(TEST_WORKSPACE)
try:
# change and restore USE_COLOR flag
from vcstool import executor
use_color_bck = executor.USE_COLOR
executor.USE_COLOR = False
try:
# change and restore os.environ
env_bck = os.environ
os.environ = dict(os.environ)
os.environ.update(
LANG='en_US.UTF-8',
PYTHONPATH=(
os.path.dirname(os.path.dirname(__file__)) +
os.pathsep + os.environ.get('PYTHONPATH', '')))
try:
rc = main(
args=['--workers', '1'],
stdout=stdout_stderr, stderr=stdout_stderr)
finally:
os.environ = env_bck
# change and restore os.environ
env_bck = os.environ
os.environ = dict(os.environ)
os.environ.update(
LANG='en_US.UTF-8',
PYTHONPATH=(
os.path.dirname(os.path.dirname(__file__)) +
os.pathsep + os.environ.get('PYTHONPATH', '')))
try:
rc = main(
args=['--workers', '1'],
stdout=stdout_stderr, stderr=stdout_stderr)
finally:
os.environ = env_bck
finally:
executor.USE_COLOR = use_color_bck
finally:
os.chdir(cwd_bck)
assert rc == 1
expected = get_expected_output('pull').decode()
assert stdout_stderr.getvalue() == expected
def test_pull_api(self):
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
from vcstool.commands.pull import main
stdout_stderr = StringIO()
# change and restore cwd
cwd_bck = os.getcwd()
os.chdir(TEST_WORKSPACE)
try:
# change and restore USE_COLOR flag
from vcstool import executor
use_color_bck = executor.USE_COLOR
executor.USE_COLOR = False
try:
# change and restore os.environ
env_bck = os.environ
os.environ = dict(os.environ)
os.environ.update(
LANG='en_US.UTF-8',
PYTHONPATH=(
os.path.dirname(os.path.dirname(__file__)) +
os.pathsep + os.environ.get('PYTHONPATH', '')))
try:
rc = main(
args=['--workers', '1'],
stdout=stdout_stderr, stderr=stdout_stderr)
finally:
os.environ = env_bck
finally:
def main(args=None, stdout=None, stderr=None):
set_streams(stdout=stdout, stderr=stderr)
parser = get_parser()
add_common_arguments(parser, skip_hide_empty=True, path_nargs='?')
args = parser.parse_args(args)
command = ExportCommand(args)
clients = find_repositories(command.paths, nested=command.nested)
if command.output_repos:
output_repositories(clients)
jobs = generate_jobs(clients, command)
results = execute_jobs(jobs, number_of_workers=args.workers)
# check if at least one repo was found in the client directory
basename = None
for result in results:
result['path'] = get_relative_path_of_result(result)
def main(args=None, stdout=None, stderr=None):
set_streams(stdout=stdout, stderr=stderr)
parser = get_parser()
add_common_arguments(parser)
# separate anything followed after --args to not confuse argparse
if args is None:
args = sys.argv[1:]
try:
index = args.index('--args') + 1
except ValueError:
# should generate error due to missing --args
parser.parse_known_args(args)
client_args = args[index:]
args = parser.parse_args(args[0:index])
args.args = client_args
def main(args=None, stdout=None, stderr=None):
set_streams(stdout=stdout, stderr=stderr)
parser = get_parser()
add_common_arguments(
parser, skip_hide_empty=True, skip_nested=True, path_nargs='?',
path_help='Base path to clone repositories to')
args = parser.parse_args(args)
try:
repos = get_repositories(args.input)
except RuntimeError as e:
print(ansi('redf') + str(e) + ansi('reset'), file=sys.stderr)
return 1
jobs = generate_jobs(repos, args)
add_dependencies(jobs)
if args.repos:
output_repositories([job['client'] for job in jobs])
def main(args=None, stdout=None, stderr=None):
set_streams(stdout=stdout, stderr=stderr)
# no help to extract command first (which might be followed by --help)
parser = get_parser(add_help=False)
ns, _ = parser.parse_known_args(args)
# help for a specific command
if ns.command:
# relay help request foe specific command
entrypoint = get_entrypoint(ns.command)
if not entrypoint:
return 1
return entrypoint(['--help'])
# regular parsing validating options and arguments
parser = get_parser()
ns = parser.parse_args(args)
def main(args=None, stdout=None, stderr=None):
set_streams(stdout=stdout, stderr=stderr)
parser = get_parser()
add_common_arguments(
parser, skip_hide_empty=True, skip_nested=True, path_nargs='?',
path_help='Base path to clone repositories to')
args = parser.parse_args(args)
try:
repos = get_repositories(args.input)
except RuntimeError as e:
print(ansi('redf') + str(e) + ansi('reset'), file=sys.stderr)
return 1
jobs = generate_jobs(repos, args)
add_dependencies(jobs)
if args.repos:
output_repositories([job['client'] for job in jobs])
results = execute_jobs(
jobs, show_progress=True, number_of_workers=args.workers,
debug_jobs=args.debug)
output_results(results)
any_error = any(r['returncode'] for r in results)
return 1 if any_error else 0
# errors are handled by a separate function
if result['returncode']:
return
try:
lines = []
lines.append(' %s:' % result['path'])
lines.append(' type: ' + result['client'].__class__.type)
export_data = result['export_data']
lines.append(' url: ' + export_data['url'])
if 'version' in export_data and export_data['version']:
lines.append(' version: ' + export_data['version'])
print('\n'.join(lines))
except KeyError as e:
print(
ansi('redf') + (
"Command '%s' failed for path '%s': %s: %s" % (
result['command'].__class__.command,
result['client'].path, e.__class__.__name__, e)) +
ansi('reset'),
file=sys.stderr)
ansi('yellowf') + 'List of repositories is empty' + ansi('reset'),
file=sys.stderr)
return repos
for path in repositories:
repo = {}
attributes = repositories[path]
try:
repo['type'] = attributes['type']
repo['url'] = attributes['url']
if 'version' in attributes:
repo['version'] = attributes['version']
except AttributeError as e:
print(
ansi('yellowf') + (
"Repository '%s' does not provide the necessary "
'information: %s' % (path, e)) + ansi('reset'),
file=sys.stderr)
continue
repos[path] = repo
return repos