Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
jsonnet (str): Either a path to a jsonnet file or the jsonnet content.
ext_vars (dict): External variables. Values can be strings or any other basic type.
Returns:
SimpleNamespace: The parsed jsonnet object.
Raises:
TypeError: If the input is neither a path to an existent file nor a jsonnet.
"""
ext_vars, ext_codes = self.split_ext_vars(ext_vars)
fpath = None
fname = 'snippet'
snippet = jsonnet
try:
fpath = Path(jsonnet, mode=config_read_mode)
except:
pass
else:
fname = jsonnet(absolute=False) if isinstance(jsonnet, Path) else jsonnet
snippet = fpath.get_content()
try:
values = yaml.safe_load(_jsonnet.evaluate_snippet(fname, snippet, ext_vars=ext_vars, ext_codes=ext_codes))
except Exception as ex:
raise ParserError('Problems evaluating jsonnet "'+fname+'" :: '+str(ex))
if self._validator is not None:
self._validator.validate(values)
if with_meta and isinstance(values, dict) and fpath is not None:
values['__path__'] = fpath
return dict_to_namespace(values)
Args:
cfg_path (str or Path): Path to the configuration file to parse.
ext_vars (dict): Optional external variables used for parsing jsonnet.
env (bool or None): Whether to merge with the parsed environment. None means use the ArgumentParser's default.
defaults (bool): Whether to merge with the parser's defaults.
nested (bool): Whether the namespace should be nested.
with_meta (bool): Whether to include metadata in config object.
Returns:
types.SimpleNamespace: An object with all parsed values as nested attributes.
Raises:
ParserError: If there is a parsing error and error_handler=None.
"""
fpath = Path(cfg_path, mode=config_read_mode)
if not fpath.is_url:
cwd = os.getcwd()
os.chdir(os.path.abspath(os.path.join(fpath(absolute=False), os.pardir)))
try:
cfg_str = fpath.get_content()
parsed_cfg = self.parse_string(cfg_str, cfg_path, ext_vars, env, defaults, nested, with_meta=with_meta,
_skip_logging=True, _skip_check=_skip_check, _base=_base)
if with_meta or (with_meta is None and self._default_meta):
parsed_cfg.__path__ = fpath
finally:
if not fpath.is_url:
os.chdir(cwd)
self._logger.info('Parsed %s from path: %s', self.parser_mode, cfg_path)
return parsed_cfg
def __init__(self, **kwargs):
"""Initializer for ActionPath instance.
Args:
mode (str): The required type and access permissions among [fdrwxcuFDRWX] as a keyword argument, e.g. ActionPath(mode='drw').
skip_check (bool): Whether to skip path checks (def.=False).
Raises:
ValueError: If the mode parameter is invalid.
"""
if 'mode' in kwargs:
_check_unknown_kwargs(kwargs, {'mode', 'skip_check'})
Path._check_mode(kwargs['mode'])
self._mode = kwargs['mode']
self._skip_check = kwargs['skip_check'] if 'skip_check' in kwargs else False
elif '_mode' not in kwargs:
raise ValueError('Expected mode keyword argument.')
else:
self._mode = kwargs.pop('_mode')
self._skip_check = kwargs.pop('_skip_check')
kwargs['type'] = str
super().__init__(**kwargs)
def _check_type(self, value, cfg=None, islist=None):
islist = _is_action_value_list(self) if islist is None else islist
if not islist:
value = [value]
elif not isinstance(value, list):
raise TypeError('For ActionPath with nargs='+str(self.nargs)+' expected value to be list, received: value='+str(value)+'.')
try:
for num, val in enumerate(value):
if isinstance(val, str):
val = Path(val, mode=self._mode, skip_check=self._skip_check)
elif isinstance(val, Path):
val = Path(val(absolute=False), mode=self._mode, skip_check=self._skip_check, cwd=val.cwd)
else:
raise TypeError('expected either a string or a Path object, received: value='+str(val)+' type='+str(type(val))+'.')
value[num] = val
except TypeError as ex:
raise TypeError('Parser key "'+self.dest+'": '+str(ex))
return value if islist else value[0]
for action in self._actions:
if action.default != SUPPRESS and action.dest != SUPPRESS:
if isinstance(action, ActionParser):
cfg.update(namespace_to_dict(action._parser.get_defaults(nested=False)))
else:
cfg[action.dest] = action.default
cfg = namespace_to_dict(_dict_to_flat_namespace(cfg))
self._logger.info('Loaded default values from parser.')
default_config_files = [] # type: List[str]
for pattern in self._default_config_files:
default_config_files += glob.glob(os.path.expanduser(pattern))
if len(default_config_files) > 0:
default_config = Path(default_config_files[0], mode=config_read_mode).get_content()
cfg_file = self._load_cfg(default_config)
cfg = self._merge_config(cfg_file, cfg)
self._logger.info('Parsed configuration from default path: %s', default_config_files[0])
if nested:
cfg = _flat_namespace_to_dict(SimpleNamespace(**cfg))
except TypeError as ex:
self.error(str(ex))
return dict_to_namespace(cfg)
Args:
cfg (types.SimpleNamespace or dict): The configuration object to save.
path (str): Path to the location where to save config.
format (str): The output format: "yaml", "json", "json_indented" or "parser_mode".
skip_none (bool): Whether to exclude checking values that are None.
skip_check (bool): Whether to skip parser checking.
overwrite (bool): Whether to overwrite existing files.
multifile (bool): Whether to save multiple config files by using the __path__ metas.
Raises:
TypeError: If any of the values of cfg is invalid according to the parser.
"""
if not overwrite and os.path.isfile(path):
raise ValueError('Refusing to overwrite existing file: '+path)
path = Path(path, mode='fc')
if format not in {'parser_mode', 'yaml', 'json_indented', 'json'}:
raise ValueError('Unknown output format '+str(format))
if format == 'parser_mode':
format = 'yaml' if self.parser_mode == 'yaml' else 'json_indented'
dump_kwargs = {'format': format, 'skip_none': skip_none, 'skip_check': skip_check}
if not multifile:
with open(path(), 'w') as f:
f.write(self.dump(cfg, **dump_kwargs)) # type: ignore
else:
cfg = deepcopy(cfg)
if not isinstance(cfg, dict):
cfg = namespace_to_dict(cfg)
def _check_type(self, value, cfg=None):
islist = _is_action_value_list(self)
if not islist:
value = [value]
elif not isinstance(value, list):
raise TypeError('For ActionJsonSchema with nargs='+str(self.nargs)+' expected value to be list, received: value='+str(value)+'.')
for num, val in enumerate(value):
try:
fpath = None
if isinstance(val, str):
val = yaml.safe_load(val)
if isinstance(val, str):
try:
fpath = Path(val, mode=config_read_mode)
except:
pass
else:
val = yaml.safe_load(fpath.get_content())
if isinstance(val, SimpleNamespace):
val = namespace_to_dict(val)
path_meta = val.pop('__path__') if isinstance(val, dict) and '__path__' in val else None
self._validator.validate(val)
if path_meta is not None:
val['__path__'] = path_meta
if isinstance(val, dict) and fpath is not None:
val['__path__'] = fpath
value[num] = val
except (TypeError, yaml.parser.ParserError, jsonschema.exceptions.ValidationError) as ex:
elem = '' if not islist else ' element '+str(num+1)
raise TypeError('Parser key "'+self.dest+'"'+elem+': '+str(ex))
if isinstance(value, list) and all(isinstance(v, str) for v in value):
path_list_files = value
value = []
for path_list_file in path_list_files:
try:
with sys.stdin if path_list_file == '-' else open(path_list_file, 'r') as f:
path_list = [x.strip() for x in f.readlines()]
except:
raise TypeError('Problems reading path list: '+path_list_file)
cwd = os.getcwd()
if self._rel == 'list' and path_list_file != '-':
os.chdir(os.path.abspath(os.path.join(path_list_file, os.pardir)))
try:
for num, val in enumerate(path_list):
try:
path_list[num] = Path(val, mode=self._mode)
except TypeError as ex:
raise TypeError('Path number '+str(num+1)+' in list '+path_list_file+', '+str(ex))
finally:
os.chdir(cwd)
value += path_list
return value
else:
return ActionPath._check_type(self, value, islist=True)
def save_paths(cfg, base=None):
replace_keys = {}
for key, val in cfg.items():
kbase = key if base is None else base+'.'+key
if isinstance(val, dict):
if '__path__' in val:
val_path = Path(os.path.join(dirname, os.path.basename(val['__path__']())), mode='fc')
if not overwrite and os.path.isfile(val_path()):
raise ValueError('Refusing to overwrite existing file: '+val_path)
action = _find_action(self, kbase)
if isinstance(action, ActionParser):
replace_keys[key] = val_path
action._parser.save(val, val_path(), branch=action.dest, **save_kwargs)
elif isinstance(action, (ActionJsonSchema, ActionJsonnet)):
replace_keys[key] = val_path
val_out = strip_meta(val)
if format == 'json_indented' or isinstance(action, ActionJsonnet):
val_str = json.dumps(val_out, indent=2, sort_keys=True)
elif format == 'yaml':
val_str = yaml.dump(val_out, default_flow_style=False, allow_unicode=True)
elif format == 'json':
val_str = json.dumps(val_out, sort_keys=True)
with open(val_path(), 'w') as f:
def error(self, message):
"""Logs error message if a logger is set, calls the error handler and raises a ParserError."""
self._logger.error(message)
if self._error_handler is not None:
with redirect_stderr(self._stderr):
self._error_handler(self, message)
raise ParserError(message)