Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_make_database_dump_path_joins_arguments():
assert module.make_database_dump_path('/tmp', 'super_databases') == '/tmp/super_databases'
def test_make_database_dump_filename_uses_name_and_hostname():
flexmock(module.os.path).should_receive('expanduser').and_return('databases')
assert (
module.make_database_dump_filename('databases', 'test', 'hostname')
== 'databases/hostname/test'
)
def dump_databases(databases, log_prefix, location_config, dry_run):
'''
Dump the given MySQL/MariaDB databases to disk. The databases are supplied as a sequence of
dicts, one dict describing each database as per the configuration schema. Use the given log
prefix in any log entries. Use the given location configuration dict to construct the
destination path. If this is a dry run, then don't actually dump anything.
'''
dry_run_label = ' (dry run; not actually dumping anything)' if dry_run else ''
logger.info('{}: Dumping MySQL databases{}'.format(log_prefix, dry_run_label))
for database in databases:
name = database['name']
dump_filename = dump.make_database_dump_filename(
make_dump_path(location_config), name, database.get('hostname')
)
command = (
('mysqldump', '--add-drop-database')
+ (('--host', database['hostname']) if 'hostname' in database else ())
+ (('--port', str(database['port'])) if 'port' in database else ())
+ (('--protocol', 'tcp') if 'hostname' in database or 'port' in database else ())
+ (('--user', database['username']) if 'username' in database else ())
+ (tuple(database['options'].split(' ')) if 'options' in database else ())
+ (('--all-databases',) if name == 'all' else ('--databases', name))
)
extra_environment = {'MYSQL_PWD': database['password']} if 'password' in database else None
logger.debug(
'{}: Dumping MySQL database {} to {}{}'.format(
log_prefix, name, dump_filename, dry_run_label
def make_dump_path(location_config): # pragma: no cover
'''
Make the dump path from the given location configuration and the name of this hook.
'''
return dump.make_database_dump_path(
location_config.get('borgmatic_source_directory'), 'postgresql_databases'
)
def make_dump_path(location_config): # pragma: no cover
'''
Make the dump path from the given location configuration and the name of this hook.
'''
return dump.make_database_dump_path(
location_config.get('borgmatic_source_directory'), 'mysql_databases'
)
# Extract dumps for the named databases from the archive.
dump_patterns = dispatch.call_hooks(
'make_database_dump_patterns',
hooks,
repository,
dump.DATABASE_HOOK_NAMES,
location,
restore_names,
)
borg_extract.extract_archive(
global_arguments.dry_run,
repository,
arguments['restore'].archive,
dump.convert_glob_patterns_to_borg_patterns(
dump.flatten_dump_patterns(dump_patterns, restore_names)
),
location,
storage,
local_path=local_path,
remote_path=remote_path,
destination_path='/',
progress=arguments['restore'].progress,
# We don't want glob patterns that don't match to error.
error_on_warnings=False,
)
# Map the restore names or detected dumps to the corresponding database configurations.
restore_databases = dump.get_per_hook_database_configurations(
hooks, restore_names, dump_patterns
)
)
# Finally, restore the databases and cleanup the dumps.
dispatch.call_hooks(
'restore_database_dumps',
restore_databases,
repository,
dump.DATABASE_HOOK_NAMES,
location,
global_arguments.dry_run,
)
dispatch.call_hooks(
'remove_database_dumps',
restore_databases,
repository,
dump.DATABASE_HOOK_NAMES,
location,
global_arguments.dry_run,
)
if 'list' in arguments:
if arguments['list'].repository is None or validate.repositories_match(
repository, arguments['list'].repository
):
logger.info('{}: Listing archives'.format(repository))
json_output = borg_list.list_archives(
repository,
storage,
list_arguments=arguments['list'],
local_path=local_path,
remote_path=remote_path,
)
if json_output:
arguments['restore'].archive,
dump.convert_glob_patterns_to_borg_patterns(
dump.flatten_dump_patterns(dump_patterns, restore_names)
),
location,
storage,
local_path=local_path,
remote_path=remote_path,
destination_path='/',
progress=arguments['restore'].progress,
# We don't want glob patterns that don't match to error.
error_on_warnings=False,
)
# Map the restore names or detected dumps to the corresponding database configurations.
restore_databases = dump.get_per_hook_database_configurations(
hooks, restore_names, dump_patterns
)
# Finally, restore the databases and cleanup the dumps.
dispatch.call_hooks(
'restore_database_dumps',
restore_databases,
repository,
dump.DATABASE_HOOK_NAMES,
location,
global_arguments.dry_run,
)
dispatch.call_hooks(
'remove_database_dumps',
restore_databases,
repository,
def remove_database_dumps(databases, log_prefix, location_config, dry_run): # pragma: no cover
'''
Remove the database dumps for the given databases. The databases are supplied as a sequence of
dicts, one dict describing each database as per the configuration schema. Use the log prefix in
any log entries. Use the given location configuration dict to construct the destination path. If
this is a dry run, then don't actually remove anything.
'''
dump.remove_database_dumps(
make_dump_path(location_config), databases, 'PostgreSQL', log_prefix, dry_run
)