Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_22_stream(self):
stdin = os.fdopen(0)
res = TT.inspect_io_obj(stdin)
self.assertEqual(res[0], IOI_STREAM)
self.assertEqual(res[2], anyconfig.utils.noop)
>>> apath = "/path/to/a_conf.ext"
>>> assert guess_io_type(apath) == IOI_PATH_STR
>>> from anyconfig.compat import pathlib
>>> if pathlib is not None:
... assert guess_io_type(pathlib.Path(apath)) == IOI_PATH_OBJ
>>> assert guess_io_type(open(__file__)) == IOI_STREAM
>>> guess_io_type(1) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ValueError: ...
"""
if obj is None:
return IOI_NONE
if anyconfig.utils.is_path(obj):
return IOI_PATH_STR
if anyconfig.utils.is_path_obj(obj):
return IOI_PATH_OBJ
if anyconfig.utils.is_file_stream(obj):
return IOI_STREAM
raise ValueError("Unknown I/O type object: %r" % obj)
def load_from_stream(self, stream, container, **opts):
"""
:param stream: XML file or file-like object
:param container: callble to make a container object
:param opts: optional keyword parameters to be sanitized
:return: Dict-like object holding config parameters
"""
root = ET.parse(stream).getroot()
path = anyconfig.utils.get_path_from_stream(stream)
nspaces = _namespaces_from_file(path)
return root_to_container(root, container=container,
nspaces=nspaces, **opts)
:return: A tuple of (objtype, objpath, objopener)
:raises: UnknownFileTypeError
"""
itype = guess_io_type(obj)
if itype == IOI_PATH_STR:
ipath = anyconfig.utils.normpath(obj)
ext = anyconfig.utils.get_file_extension(ipath)
opener = open
elif itype == IOI_PATH_OBJ:
ipath = anyconfig.utils.normpath(obj.as_posix())
ext = anyconfig.utils.get_file_extension(ipath)
opener = obj.open
elif itype == IOI_STREAM:
ipath = anyconfig.utils.get_path_from_stream(obj)
ext = anyconfig.utils.get_file_extension(ipath) if ipath else None
opener = anyconfig.utils.noop
elif itype == IOI_NONE:
ipath = ext = None
opener = anyconfig.utils.noop
else:
raise UnknownFileTypeError("%r" % obj)
return (itype, ipath, opener, ext)
def list_parsers_by_extension(cps=None):
"""
:return: List (generator) of (config_ext, [config_parser])
"""
if cps is None:
cps = PARSERS
cps_by_ext = anyconfig.utils.concat(([(x, p) for x in p.extensions()] for p
in cps))
return ((x, _list_xppairs(xps)) for x, xps in groupby_key(cps_by_ext, fst))
def _parsed_items(items, sep=_SEP, **options):
"""
:param items: List of pairs, [(key, value)], or generator yields pairs
:param sep: Seprator string
:return: Generator to yield (key, value) pair of 'dic'
"""
parse = _parse if options.get("ac_parse_value") else anyconfig.utils.noop
for key, val in items:
yield (key, parse(val, sep))
def _to_str_fn(**options):
"""
:param options: Keyword options might have 'ac_parse_value' key
:param to_str: Callable to convert value to string
"""
return str if options.get("ac_parse_value") else anyconfig.utils.noop
def yml_dump(data, stream, yml_fnc=yml_fnc, **options):
"""An wrapper of yaml.safe_dump and yaml.dump.
:param data: Some data to dump
:param stream: a file or file-like object to dump YAML data
"""
_is_dict = anyconfig.utils.is_dict_like(data)
if options.get("ac_safe", False):
options = {}
elif not options.get("Dumper", False) and _is_dict:
# TODO: Any other way to get its constructor?
maybe_container = options.get("ac_dict", type(data))
options["Dumper"] = _customized_dumper(maybe_container)
if _is_dict:
# Type information and the order of items are lost on dump currently.
data = anyconfig.dicts.convert_to(data, ac_dict=dict)
options = common.filter_from_options("ac_dict", options)
return yml_fnc("dump", data, stream, **options)
def _to_s(val, sep=", "):
"""Convert any to string.
:param val: An object
:param sep: separator between values
>>> _to_s([1, 2, 3])
'1, 2, 3'
>>> _to_s("aaa")
'aaa'
"""
if anyconfig.utils.is_iterable(val):
return sep.join(str(x) for x in val)
return str(val)