Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_skip_tag(tmpdir):
make_repo(
tmpdir,
bundles={
"test": {
'files': {
join(str(tmpdir), "foo"): {
'content': "nope",
'tags': ["nope"],
},
},
},
},
nodes={
"localhost": {
'bundles': ["test"],
'os': host_os(),
},
def test_only_bundle_with_dep(tmpdir):
make_repo(
tmpdir,
bundles={
"test": {
'files': {
join(str(tmpdir), "foo"): {
'content_type': 'any',
'needs': ["file:" + join(str(tmpdir), "bar")],
},
},
},
"test2": {
'files': {
join(str(tmpdir), "bar"): {
'content_type': 'any',
},
join(str(tmpdir), "baz"): {
def test_groups_node(tmpdir):
make_repo(
tmpdir,
groups={
"group1": {},
"group2": {},
},
nodes={
"node1": {'groups': {"group1"}},
"node2": {'groups': {"group1"}},
"node3": {'groups': {"group2"}},
},
)
stdout, stderr, rcode = run("bw hash -g node1", path=str(tmpdir))
assert rcode == 0
assert stdout == b"6f4615dc71426549e22df7961bd2b88ba95ad1fc\n"
def test_metadatapy_reactor_keyerror_from_dict(tmpdir):
make_repo(
tmpdir,
bundles={"test": {}},
nodes={
"node1": {
'bundles': ["test"],
},
},
)
with open(join(str(tmpdir), "bundles", "test", "metadata.py"), 'w') as f:
f.write(
"""
@metadata_reactor
def foo_reactor(metadata):
x = {}['baz']
return {'x': x}
""")
def test_action_pipe_utf8(tmpdir):
make_repo(
tmpdir,
bundles={
"test": {
'actions': {
"pipe": {
'command': "cat",
'data_stdin': "hello 🐧\n",
'expected_stdout': "hello 🐧\n",
},
},
},
},
nodes={
"localhost": {
'bundles': ["test"],
'os': host_os(),
def test_fix(tmpdir):
symlink(join(str(tmpdir), "bar"), join(str(tmpdir), "foo"))
make_repo(
tmpdir,
bundles={
"test": {
'symlinks': {
join(str(tmpdir), "foo"): {
'target': "/dev/null",
},
},
},
},
nodes={
"localhost": {
'bundles': ["test"],
'os': host_os(),
},
},
def test_metadatapy_defaults_atomic(tmpdir):
make_repo(
tmpdir,
bundles={"test": {}},
)
with open(join(str(tmpdir), "nodes.py"), 'w') as f:
f.write(
"""
from bundlewrap.metadata import atomic
nodes = {
"node1": {
'bundles': ["test"],
'metadata': {"foo": atomic({"bar": "baz"})},
},
}
""")
with open(join(str(tmpdir), "bundles", "test", "metadata.py"), 'w') as f:
def test_fault_content_skipped_mako(tmpdir):
make_repo(
tmpdir,
bundles={
"test": {
'files': {
join(str(tmpdir), "secret"): {
'content': "${repo.vault.password_for('test', key='unavailable')}",
'content_type': 'mako',
},
},
},
},
nodes={
"localhost": {
'bundles': ["test"],
'os': host_os(),
},
def test_fault_unavailable(tmpdir):
make_repo(
tmpdir,
nodes={
"node1": {
'bundles': ["bundle1"],
},
},
bundles={
"bundle1": {
'files': {
"/test": {
'content': "${repo.vault.password_for('test', key='404')}",
'content_type': 'mako',
},
},
},
},
def test_encrypt_file(tmpdir):
make_repo(tmpdir)
source_file = join(str(tmpdir), "data", "source")
with open(source_file, 'w') as f:
f.write("ohai")
stdout, stderr, rcode = run(
"bw debug -c 'repo.vault.encrypt_file(\"{}\", \"{}\")'".format(
source_file,
"encrypted",
),
path=str(tmpdir),
)
assert stderr == b""
assert rcode == 0
stdout, stderr, rcode = run(