Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_async_fetch(*_):
__main__.main(['fetch'])
mock_run = utils.run_async.mock
assert mock_run.call_count == 2
cmds = ['git', 'fetch']
# print(mock_run.call_args_list)
mock_run.assert_any_call('repo1', '/a/bc', cmds)
mock_run.assert_any_call('repo2', '/d/efg', cmds)
def test_async_output(capfd):
tasks = [
utils.run_async('myrepo', '.', [
'python3', '-c',
f"print({i});import time; time.sleep({i});print({i})"
]) for i in range(4)
]
# I don't fully understand why a new loop is needed here. Without a new
# loop, "pytest" fails but "pytest tests/test_utils.py" works. Maybe pytest
# itself uses asyncio (or maybe pytest-xdist)?
asyncio.set_event_loop(asyncio.new_event_loop())
utils.exec_async_tasks(tasks)
out, err = capfd.readouterr()
assert err == ''
assert out == 'myrepo: 0\nmyrepo: 0\n\nmyrepo: 1\nmyrepo: 1\n\nmyrepo: 2\nmyrepo: 2\n\nmyrepo: 3\nmyrepo: 3\n\n'
chosen[k] = repos[k]
if k in groups:
for r in groups[k]:
chosen[r] = repos[r]
repos = chosen
cmds = ['git'] + args.cmd
if len(repos) == 1 or cmds[1] in args.async_blacklist:
for path in repos.values():
print(path)
subprocess.run(cmds, cwd=path)
else: # run concurrent subprocesses
# Async execution cannot deal with multiple repos' user name/password.
# Here we shut off any user input in the async execution, and re-run
# the failed ones synchronously.
errors = utils.exec_async_tasks(
utils.run_async(repo_name, path, cmds) for repo_name, path in repos.items())
for path in errors:
if path:
print(path)
subprocess.run(cmds, cwd=path)