Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def mock_config_and_schema(config_yaml, schema_yaml=None):
'''
Set up mocks for the given config config YAML string and the schema YAML string, or the default
schema if no schema is provided. The idea is that that the code under test consumes these mocks
when parsing the configuration.
'''
config_stream = io.StringIO(config_yaml)
if schema_yaml is None:
schema_stream = open(module.schema_filename())
else:
schema_stream = io.StringIO(schema_yaml)
builtins = flexmock(sys.modules['builtins'])
builtins.should_receive('open').with_args('config.yaml').and_return(config_stream)
builtins.should_receive('open').with_args('schema.yaml').and_return(schema_stream)
def test_schema_filename_returns_plausable_path():
schema_path = module.schema_filename()
assert schema_path.endswith('/schema.yaml')
)
if json_output:
yield json.loads(json_output)
if 'check' in arguments and checks.repository_enabled_for_checks(repository, consistency):
logger.info('{}: Running consistency checks'.format(repository))
borg_check.check_archives(
repository,
storage,
consistency,
local_path=local_path,
remote_path=remote_path,
repair=arguments['check'].repair,
only_checks=arguments['check'].only,
)
if 'extract' in arguments:
if arguments['extract'].repository is None or validate.repositories_match(
repository, arguments['extract'].repository
):
logger.info(
'{}: Extracting archive {}'.format(repository, arguments['extract'].archive)
)
borg_extract.extract_archive(
global_arguments.dry_run,
repository,
arguments['extract'].archive,
arguments['extract'].paths,
location,
storage,
local_path=local_path,
remote_path=remote_path,
destination_path=arguments['extract'].destination,
progress=arguments['extract'].progress,
def main(): # pragma: no cover
try:
args = parse_arguments(*sys.argv[1:])
generate.generate_sample_configuration(
args.source_filename, args.destination_filename, validate.schema_filename()
)
print('Generated a sample configuration file at {}.'.format(args.destination_filename))
print()
if args.source_filename:
print(
'Merged in the contents of configuration file at {}.'.format(args.source_filename)
)
print('To review the changes made, run:')
print()
print(
' diff --unified {} {}'.format(args.source_filename, args.destination_filename)
)
print()
print('Please edit the file to suit your needs. The values are representative.')
print('All fields are optional except where indicated.')
As a side effect of running through these configuration files, output their JSON results, if
any, to stdout.
'''
# Run cross-file validation checks.
if 'extract' in arguments:
repository = arguments['extract'].repository
elif 'list' in arguments and arguments['list'].archive:
repository = arguments['list'].repository
elif 'mount' in arguments:
repository = arguments['mount'].repository
else:
repository = None
if repository:
try:
validate.guard_configuration_contains_repository(repository, configs)
except ValueError as error:
yield from make_error_log_records(str(error))
return
if not configs:
yield from make_error_log_records(
'{}: No configuration files found'.format(' '.join(arguments['global'].config_paths))
)
return
if 'create' in arguments:
try:
for config_filename, config in configs.items():
hooks = config.get('hooks', {})
command.execute_hook(
hooks.get('before_everything'),
'''
Given a sequence of configuration filenames, load and validate each configuration file. Return
the results as a tuple of: dict of configuration filename to corresponding parsed configuration,
and sequence of logging.LogRecord instances containing any parse errors.
'''
# Dict mapping from config filename to corresponding parsed config dict.
configs = collections.OrderedDict()
logs = []
# Parse and load each configuration file.
for config_filename in config_filenames:
try:
configs[config_filename] = validate.parse_configuration(
config_filename, validate.schema_filename(), overrides
)
except (ValueError, OSError, validate.Validation_error) as error:
logs.extend(
[
logging.makeLogRecord(
dict(
levelno=logging.CRITICAL,
levelname='CRITICAL',
msg='{}: Error parsing configuration file'.format(config_filename),
)
),
logging.makeLogRecord(
dict(levelno=logging.CRITICAL, levelname='CRITICAL', msg=error)
),
]
)
return (configs, logs)
else:
logger.info('{}: Mounting repository'.format(repository))
borg_mount.mount_archive(
repository,
arguments['mount'].archive,
arguments['mount'].mount_point,
arguments['mount'].paths,
arguments['mount'].foreground,
arguments['mount'].options,
storage,
local_path=local_path,
remote_path=remote_path,
)
if 'restore' in arguments:
if arguments['restore'].repository is None or validate.repositories_match(
repository, arguments['restore'].repository
):
logger.info(
'{}: Restoring databases from archive {}'.format(
repository, arguments['restore'].archive
)
)
restore_names = arguments['restore'].databases or []
if 'all' in restore_names:
restore_names = []
# Extract dumps for the named databases from the archive.
dump_patterns = dispatch.call_hooks(
'make_database_dump_patterns',
hooks,