Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
repositories:
- hostname.borg
retention:
!include include.yaml
'''
)
builtins = flexmock(sys.modules['builtins'])
builtins.should_receive('open').with_args('include.yaml').and_return(
'''
keep_daily: 7
keep_hourly: 24
'''
)
result = module.parse_configuration('config.yaml', 'schema.yaml')
assert result == {
'location': {'source_directories': ['/home'], 'repositories': ['hostname.borg']},
'retention': {'keep_daily': 7, 'keep_hourly': 24},
}
repositories:
- hostname.borg
retention:
keep_minutely: 60
keep_hourly: 24
keep_daily: 7
consistency:
checks:
- repository
- archives
'''
)
result = module.parse_configuration('config.yaml', 'schema.yaml')
assert result == {
'location': {'source_directories': ['/home', '/etc'], 'repositories': ['hostname.borg']},
'retention': {'keep_daily': 7, 'keep_hourly': 24, 'keep_minutely': 60},
'consistency': {'checks': ['repository', 'archives']},
}
def test_parse_configuration_raises_for_syntax_error():
mock_config_and_schema('foo:\nbar')
with pytest.raises(ValueError):
module.parse_configuration('config.yaml', 'schema.yaml')
- hostname.borg
retention:
keep_daily: 1
<<: !include include.yaml
'''
)
builtins = flexmock(sys.modules['builtins'])
builtins.should_receive('open').with_args('include.yaml').and_return(
'''
keep_daily: 7
keep_hourly: 24
'''
)
result = module.parse_configuration('config.yaml', 'schema.yaml')
assert result == {
'location': {'source_directories': ['/home'], 'repositories': ['hostname.borg']},
'retention': {'keep_daily': 1, 'keep_hourly': 24},
}
escaped_punctuation = string.punctuation.replace('\\', r'\\').replace('"', r'\"')
mock_config_and_schema(
'''
location:
source_directories:
- /home
repositories:
- "{}.borg"
'''.format(
escaped_punctuation
)
)
result = module.parse_configuration('config.yaml', 'schema.yaml')
assert result == {
'location': {
'source_directories': ['/home'],
'repositories': ['{}.borg'.format(string.punctuation)],
}
map:
location:
required: true
map:
source_directories:
required: true
seq:
- type: scalar
repositories:
required: true
seq:
- type: scalar
''',
)
module.parse_configuration('config.yaml', 'schema.yaml')
def main(): # pragma: no cover
args = parse_arguments(*sys.argv[1:])
logging.basicConfig(level=logging.INFO, format='%(message)s')
config_filenames = tuple(collect.collect_config_filenames(args.config_paths))
if len(config_filenames) == 0:
logger.critical('No files to validate found')
sys.exit(1)
found_issues = False
for config_filename in config_filenames:
try:
validate.parse_configuration(config_filename, validate.schema_filename())
except (ValueError, OSError, validate.Validation_error) as error:
logging.critical('{}: Error parsing configuration file'.format(config_filename))
logging.critical(error)
found_issues = True
if found_issues:
sys.exit(1)
else:
logger.info(
'All given configuration files are valid: {}'.format(', '.join(config_filenames))
)
def load_configurations(config_filenames, overrides=None):
'''
Given a sequence of configuration filenames, load and validate each configuration file. Return
the results as a tuple of: dict of configuration filename to corresponding parsed configuration,
and sequence of logging.LogRecord instances containing any parse errors.
'''
# Dict mapping from config filename to corresponding parsed config dict.
configs = collections.OrderedDict()
logs = []
# Parse and load each configuration file.
for config_filename in config_filenames:
try:
configs[config_filename] = validate.parse_configuration(
config_filename, validate.schema_filename(), overrides
)
except (ValueError, OSError, validate.Validation_error) as error:
logs.extend(
[
logging.makeLogRecord(
dict(
levelno=logging.CRITICAL,
levelname='CRITICAL',
msg='{}: Error parsing configuration file'.format(config_filename),
)
),
logging.makeLogRecord(
dict(levelno=logging.CRITICAL, levelname='CRITICAL', msg=error)
),
]