Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, filename, target, task=None):
self._override_target = target
self._override_task = task
self._config = {}
self.filenames = []
for configfile in filename.split(':'):
configfile = os.path.abspath(configfile)
repo_path = Repo.get_root_path(os.path.dirname(configfile),
fallback=False)
if self.filenames == []:
common_path = repo_path
elif repo_path != common_path:
raise IncludeException('All concatenated config files must '
'belong to the same repository or all '
'must be outside of versioning control')
self.filenames.append(configfile)
self.handler = IncludeHandler(self.filenames)
self.repo_dict = self._get_repo_dict()
def _internal_dict_merge(dest, upd, recursive_merge=True):
"""
Merges upd recursively into a copy of dest as OrderedDict
If recursive_merge is False, it will use the classic dict.update,
otherwise it will fall back on a manual merge (helpful for non-dict
types like FunctionWrapper)
"""
if (not isinstance(dest, Mapping)) \
or (not isinstance(upd, Mapping)):
raise IncludeException('Cannot merge using non-dict')
dest = OrderedDict(dest)
updkeys = list(upd.keys())
if not set(list(dest.keys())) & set(updkeys):
recursive_merge = False
if recursive_merge:
for key in updkeys:
val = upd[key]
try:
dest_subkey = dest.get(key, None)
except AttributeError:
dest_subkey = None
if isinstance(dest_subkey, Mapping) \
and isinstance(val, Mapping):
ret = _internal_dict_merge(dest_subkey, val)
dest[key] = ret
else:
(cfg, rep) = _internal_include_handler(includefile)
configs.extend(cfg)
missing_repos.extend(rep)
elif isinstance(include, Mapping):
includerepo = include.get('repo', None)
if includerepo is not None:
includedir = repos.get(includerepo, None)
else:
raise IncludeException(
'"repo" is not specified: {}'
.format(include))
if includedir is not None:
try:
includefile = include['file']
except KeyError:
raise IncludeException(
'"file" is not specified: {}'
.format(include))
(cfg, rep) = _internal_include_handler(
os.path.abspath(
os.path.join(
includedir,
includefile)))
configs.extend(cfg)
missing_repos.extend(rep)
else:
missing_repos.append(includerepo)
configs.append((filename, current_config))
# Remove all possible duplicates in missing_repos
missing_repos = list(OrderedDict.fromkeys(missing_repos))
return (configs, missing_repos)
- include3.yml
-------
Includes are merged in in this order:
['include1.yml', 'include2.yml', 'include-repo1.yml',
'include-repo2.yml', 'include-repo2.yml', 'topfile.yml']
On conflict the latter includes overwrite previous ones and
the current file overwrites every include. (evaluation depth first
and from top to bottom)
"""
missing_repos = []
configs = []
current_config = load_config(filename)
if not isinstance(current_config, Mapping):
raise IncludeException('Configuration file does not contain a '
'dictionary as base type')
header = current_config.get('header', {})
for include in header.get('includes', []):
if isinstance(include, str):
includefile = ''
if include.startswith(os.path.pathsep):
includefile = include
else:
includefile = os.path.abspath(
os.path.join(
os.path.dirname(filename),
include))
(cfg, rep) = _internal_include_handler(includefile)
configs.extend(cfg)
missing_repos.extend(rep)
if include.startswith(os.path.pathsep):
includefile = include
else:
includefile = os.path.abspath(
os.path.join(
os.path.dirname(filename),
include))
(cfg, rep) = _internal_include_handler(includefile)
configs.extend(cfg)
missing_repos.extend(rep)
elif isinstance(include, Mapping):
includerepo = include.get('repo', None)
if includerepo is not None:
includedir = repos.get(includerepo, None)
else:
raise IncludeException(
'"repo" is not specified: {}'
.format(include))
if includedir is not None:
try:
includefile = include['file']
except KeyError:
raise IncludeException(
'"file" is not specified: {}'
.format(include))
(cfg, rep) = _internal_include_handler(
os.path.abspath(
os.path.join(
includedir,
includefile)))
configs.extend(cfg)
missing_repos.extend(rep)
def execute(self, ctx):
""" TODO refactor protected-access """
if not ctx.missing_repo_names:
return False
if ctx.missing_repo_names == ctx.missing_repo_names_old:
raise IncludeException('Could not fetch all repos needed by '
'includes.')
logging.debug('Missing repos for complete config:\n%s',
pprint.pformat(ctx.missing_repo_names))
ctx.config.repo_dict = ctx.config._get_repo_dict()
ctx.missing_repos = [ctx.config.repo_dict[repo_name]
for repo_name in ctx.missing_repo_names
if repo_name in ctx.config.repo_dict]
repos_fetch(ctx.missing_repos)
for repo in ctx.missing_repos:
repo.checkout()