Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import anyconfig.backends
import anyconfig.backend.json
import anyconfig.compat
import anyconfig.dicts
import anyconfig.processors
import anyconfig.schema
import anyconfig.template
import tests.common
from tests.common import (
CNF_0, SCM_0, CNF_1, dicts_equal, resdir, skip_test
)
# suppress logging messages.
TT.LOGGER.setLevel(logging.CRITICAL)
CNF_XML_1 = {'config': {'@attrs': {'name': 'foo'},
'a': '0',
'b': {'@attrs': {'id': 'b0'}, '@text': 'bbb'},
'c': None,
'sect0': {'d': 'x, y, z'},
'list1': [{'item': '0'}, {'item': '1'},
{'item': '2'}],
'list2': {'@attrs': {'id': 'list2'},
'@children': [{'item': 'i'},
{'item': 'j'}]}}}
NULL_CNTNR = TT.anyconfig.dicts.convert_to({})
class MyODict(anyconfig.compat.OrderedDict):
def test_10(self):
a = dict(name="a", a=1, b=dict(b=[1, 2], c="C"))
infile = os.path.join(self.workdir, "a.json")
output = os.path.join(self.workdir, "b.json")
anyconfig.api.dump(a, infile)
self.assertTrue(os.path.exists(infile))
TT.main(["dummy", "-o", output, infile])
self.assertTrue(os.path.exists(output))
def _show_psrs():
"""Show list of info of parsers available
"""
sep = os.linesep
types = "Supported types: " + ", ".join(API.list_types())
cids = "IDs: " + ", ".join(c for c, _ps in API.list_by_cid())
x_vs_ps = [" %s: %s" % (x, ", ".join(p.cid() for p in ps))
for x, ps in API.list_by_extension()]
exts = "File extensions:" + sep + sep.join(x_vs_ps)
_exit_with_output(sep.join([types, exts, cids]))
def _show_psrs():
"""Show list of info of parsers available
"""
sep = os.linesep
types = "Supported types: " + ", ".join(API.list_types())
cids = "IDs: " + ", ".join(c for c, _ps in API.list_by_cid())
x_vs_ps = [" %s: %s" % (x, ", ".join(p.cid() for p in ps))
for x, ps in API.list_by_extension()]
exts = "File extensions:" + sep + sep.join(x_vs_ps)
_exit_with_output(sep.join([types, exts, cids]))
diff = _load_diff(args, extra_opts)
if cnf:
API.merge(cnf, diff)
else:
cnf = diff
if args.args:
diff = anyconfig.parser.parse(args.args)
API.merge(cnf, diff)
if args.validate:
_exit_with_output("Validation succeds")
cnf = API.gen_schema(cnf) if args.gen_schema else _do_filter(cnf, args)
_output_result(cnf, args.output, args.otype, args.inputs, args.itype,
extra_opts=extra_opts)
%(prog)s '/etc/xyz/conf.d/*.json' -o xyz.yml \\
--atype json -A '{"obsoletes": "syscnf", "conflicts": "syscnf-old"}'
%(prog)s '/etc/xyz/conf.d/*.json' -o xyz.yml \\
-A obsoletes:syscnf;conflicts:syscnf-old
%(prog)s /etc/foo.json /etc/foo/conf.d/x.json /etc/foo/conf.d/y.json
%(prog)s '/etc/foo.d/*.json' -M noreplace
# Query/Get/set part of input config
%(prog)s '/etc/foo.d/*.json' --query 'locs[?state == 'T'].name | sort(@)'
%(prog)s '/etc/foo.d/*.json' --get a.b.c
%(prog)s '/etc/foo.d/*.json' --set a.b.c=1
# Validate with JSON schema or generate JSON schema:
%(prog)s --validate -S foo.conf.schema.yml '/etc/foo.d/*.xml'
%(prog)s --gen-schema '/etc/foo.d/*.xml' -o foo.conf.schema.yml"""
DEFAULTS = dict(loglevel=0, list=False, output=None, itype=None,
otype=None, atype=None, merge=API.MS_DICTS,
ignore_missing=False, template=False, env=False,
schema=None, validate=False, gen_schema=False,
extra_opts=None)
def to_log_level(level):
"""
:param level: Logging level in int = 0 .. 2
>>> to_log_level(0) == logging.WARN
True
>>> to_log_level(5) # doctest: +IGNORE_EXCEPTION_DETAIL, +ELLIPSIS
Traceback (most recent call last):
...
ValueError: wrong log level passed: 5
>>>
def _load_diff(args, extra_opts):
"""
:param args: :class:`argparse.Namespace` object
:param extra_opts: Map object given to API.load as extra options
"""
try:
diff = API.load(args.inputs, args.itype,
ac_ignore_missing=args.ignore_missing,
ac_merge=args.merge,
ac_template=args.template,
ac_schema=args.schema,
**extra_opts)
except API.UnknownProcessorTypeError:
_exit_with_output("Wrong input type '%s'" % args.itype, 1)
except API.UnknownFileTypeError:
_exit_with_output("No appropriate backend was found for given file "
"type='%s', inputs=%s" % (args.itype,
", ".join(args.inputs)),
1)
_exit_if_load_failure(diff,
"Failed to load: args=%s" % ", ".join(args.inputs))
return diff
def _output_type_by_input_path(inpaths, itype, fmsg):
"""
:param inpaths: List of input file paths
:param itype: Input type or None
:param fmsg: message if it cannot detect otype by 'inpath'
:return: Output type :: str
"""
msg = ("Specify inpath and/or outpath type[s] with -I/--itype "
"or -O/--otype option explicitly")
if itype is None:
try:
otype = API.find(inpaths[0]).type()
except API.UnknownFileTypeError:
_exit_with_output((fmsg % inpaths[0]) + msg, 1)
except (ValueError, IndexError):
_exit_with_output(msg, 1)
else:
otype = itype
return otype