Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_op_hosts_limit(self):
inventory = make_inventory()
state = State(inventory, Config())
connect_all(state)
# Add op to both hosts
add_op(state, server.shell, 'echo "hi"')
# Add op to just the first host
add_op(
state, server.user,
'somehost_user',
hosts=inventory['somehost'],
)
# Ensure there are two ops
self.assertEqual(len(state.get_op_order()), 2)
# Ensure somehost has two ops and anotherhost only has the one
def test_ignore_errors_op_fail(self):
inventory = make_inventory()
state = State(inventory, Config())
connect_all(state)
add_op(state, server.shell, 'echo "hi"', ignore_errors=True)
with patch('pyinfra.api.connectors.ssh.run_shell_command') as fake_run_command:
fake_channel = FakeChannel(1)
fake_run_command.return_value = (
False,
FakeBuffer('', fake_channel),
)
# This should run OK
run_ops(state)
somehost = inventory.get_host('somehost')
def test_op(self):
inventory = make_inventory()
somehost = inventory.get_host('somehost')
anotherhost = inventory.get_host('anotherhost')
state = State(inventory, Config())
# Enable printing on this test to catch any exceptions in the formatting
state.print_output = True
state.print_fact_info = True
state.print_fact_output = True
connect_all(state)
add_op(
state, files.file,
'/var/log/pyinfra.log',
user='pyinfra',
group='pyinfra',
mode='644',
sudo=True,
sudo_user='test_sudo',
def test_no_invalid_op_call(self):
inventory = make_inventory()
state = State(inventory, Config())
connect_all(state)
pseudo_state.set(state)
state.in_op = True
with self.assertRaises(PyinfraError):
server.user('someuser')
state.in_op = False
state.in_deploy = True
with self.assertRaises(PyinfraError):
server.user('someuser')
def test_connect_with_ssh_key_password(self):
state = State(make_inventory(hosts=(
('somehost', {'ssh_key': 'testkey', 'ssh_key_password': 'testpass'}),
)), Config())
with patch(
'pyinfra.api.connectors.ssh.path.isfile',
lambda *args, **kwargs: True,
), patch(
'pyinfra.api.connectors.ssh.RSAKey.from_private_key_file',
) as fake_key_open:
def fake_key_open_fail(*args, **kwargs):
if 'password' not in kwargs:
raise PasswordRequiredException()
fake_key_open.side_effect = fake_key_open_fail
fake_key = FakeRSAKey()
fake_key_open.return_value = fake_key
def load_config(deploy_dir):
'''
Loads any local config.py file.
'''
config = Config()
config_filename = path.join(deploy_dir, 'config.py')
if path.exists(config_filename):
attrs = exec_file(config_filename, return_locals=True)
for key, value in six.iteritems(attrs):
if hasattr(config, key):
setattr(config, key, value)
return config
def load_deploy_config(deploy_filename, config=None):
'''
Loads any local config overrides in the deploy file.
'''
if not config:
config = Config()
if not deploy_filename:
return
if path.exists(deploy_filename):
extract_file_config(deploy_filename, config)
return config
def make_command(
command,
env=None,
su_user=None,
sudo=False,
sudo_user=None,
preserve_sudo_env=False,
shell_executable=Config.SHELL,
):
'''
Builds a shell command with various kwargs.
'''
debug_meta = {}
for key, value in (
('shell_executable', shell_executable),
('sudo', sudo),
('sudo_user', sudo_user),
('su_user', su_user),
('env', env),
):
if value:
debug_meta[key] = value