Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_execute_command_captures_output():
full_command = ['foo', 'bar']
expected_output = '[]'
flexmock(module.os, environ={'a': 'b'})
flexmock(module.subprocess).should_receive('check_output').with_args(
full_command, shell=False, env=None, cwd=None
).and_return(flexmock(decode=lambda: expected_output)).once()
output = module.execute_command(full_command, output_log_level=None)
assert output == expected_output
def test_execute_command_calls_full_command():
full_command = ['foo', 'bar']
flexmock(module).should_receive('execute_and_log_output').with_args(
full_command, output_log_level=logging.INFO, shell=False
).once()
output = module.execute_command(full_command)
assert output is None
def test_execute_command_captures_output_with_shell():
full_command = ['foo', 'bar']
expected_output = '[]'
flexmock(module.subprocess).should_receive('check_output').with_args(
full_command, stderr=module.subprocess.STDOUT, shell=True
).and_return(flexmock(decode=lambda: expected_output)).once()
output = module.execute_command(full_command, output_log_level=None, shell=True)
assert output == expected_output
def test_execute_command_calls_full_command_with_working_directory():
full_command = ['foo', 'bar']
flexmock(module.os, environ={'a': 'b'})
flexmock(module.subprocess).should_receive('Popen').with_args(
full_command,
stdin=None,
stdout=module.subprocess.PIPE,
stderr=module.subprocess.STDOUT,
shell=False,
env=None,
cwd='/working',
).and_return(flexmock(stdout=None)).once()
flexmock(module).should_receive('log_output')
output = module.execute_command(full_command, working_directory='/working')
assert output is None
def test_execute_command_captures_output_with_working_directory():
full_command = ['foo', 'bar']
expected_output = '[]'
flexmock(module.os, environ={'a': 'b'})
flexmock(module.subprocess).should_receive('check_output').with_args(
full_command, shell=False, env=None, cwd='/working'
).and_return(flexmock(decode=lambda: expected_output)).once()
output = module.execute_command(
full_command, output_log_level=None, shell=False, working_directory='/working'
)
assert output == expected_output
+ (('--port', str(database['port'])) if 'port' in database else ())
+ (('--username', database['username']) if 'username' in database else ())
+ (() if all_databases else ('--format', database.get('format', 'custom')))
+ (tuple(database['options'].split(' ')) if 'options' in database else ())
+ (() if all_databases else (name,))
)
extra_environment = {'PGPASSWORD': database['password']} if 'password' in database else None
logger.debug(
'{}: Dumping PostgreSQL database {} to {}{}'.format(
log_prefix, name, dump_filename, dry_run_label
)
)
if not dry_run:
os.makedirs(os.path.dirname(dump_filename), mode=0o700, exist_ok=True)
execute_command(command, extra_environment=extra_environment)
analyze_command = (
('psql', '--no-password', '--quiet')
+ (('--host', database['hostname']) if 'hostname' in database else ())
+ (('--port', str(database['port'])) if 'port' in database else ())
+ (('--username', database['username']) if 'username' in database else ())
+ ('--dbname', database['name'])
+ ('--command', 'ANALYZE')
)
logger.debug(
'{}: Restoring PostgreSQL database {}{}'.format(
log_prefix, database['name'], dry_run_label
)
)
if not dry_run:
execute_command(restore_command, extra_environment=extra_environment)
execute_command(analyze_command, extra_environment=extra_environment)
'{}: Running {} commands for {} hook{}'.format(
config_filename, len(commands), description, dry_run_label
)
)
if umask:
parsed_umask = int(str(umask), 8)
logger.debug('{}: Set hook umask to {}'.format(config_filename, oct(parsed_umask)))
original_umask = os.umask(parsed_umask)
else:
original_umask = None
try:
for command in commands:
if not dry_run:
execute.execute_command(
[command],
output_log_level=logging.ERROR
if description == 'on-error'
else logging.WARNING,
shell=True,
)
finally:
if original_umask:
os.umask(original_umask)
'''
Given a local or remote repository path, a storage configuration dict, a Borg encryption mode,
whether the repository should be append-only, and the storage quota to use, initialize the
repository. If the repository already exists, then log and skip initialization.
'''
info_command = (
(local_path, 'info')
+ (('--info',) if logger.getEffectiveLevel() == logging.INFO else ())
+ (('--debug',) if logger.isEnabledFor(logging.DEBUG) else ())
+ (('--remote-path', remote_path) if remote_path else ())
+ (repository,)
)
logger.debug(' '.join(info_command))
try:
execute_command(info_command, output_log_level=None)
logger.info('Repository already exists. Skipping initialization.')
return
except subprocess.CalledProcessError as error:
if error.returncode != INFO_REPOSITORY_NOT_FOUND_EXIT_CODE:
raise
extra_borg_options = storage_config.get('extra_borg_options', {}).get('init', '')
init_command = (
(local_path, 'init')
+ (('--encryption', encryption_mode) if encryption_mode else ())
+ (('--append-only',) if append_only else ())
+ (('--storage-quota', storage_quota) if storage_quota else ())
+ (('--info',) if logger.getEffectiveLevel() == logging.INFO else ())
+ (('--debug',) if logger.isEnabledFor(logging.DEBUG) else ())
+ (('--remote-path', remote_path) if remote_path else ())