Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# See https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
use_annotations = True if output_format == "annotated_yaml" else False
input_docs = []
for input_stream in input_streams:
if input_format == "yaml":
loader = get_loader(use_annotations=use_annotations)
input_docs.extend(yaml.load_all(input_stream, Loader=loader))
elif input_format == "xml":
import xmltodict
input_docs.append(xmltodict.parse(input_stream.read(), disable_entities=True))
elif input_format == "toml":
import toml
input_docs.append(toml.load(input_stream))
else:
raise Exception("Unknown input format")
input_payload = "\n".join(json.dumps(doc, cls=JSONDateTimeEncoder) for doc in input_docs)
jq_out, jq_err = jq.communicate(input_payload)
json_decoder = json.JSONDecoder(object_pairs_hook=OrderedDict)
if output_format == "yaml" or output_format == "annotated_yaml":
yaml.dump_all(decode_docs(jq_out, json_decoder), stream=output_stream,
Dumper=get_dumper(use_annotations=use_annotations, indentless=indentless_lists),
width=width, allow_unicode=True, default_flow_style=False)
elif output_format == "xml":
import xmltodict
for doc in decode_docs(jq_out, json_decoder):
if xml_root:
doc = {xml_root: doc}
elif not isinstance(doc, OrderedDict):
msg = ("{}: Error converting JSON to XML: cannot represent non-object types at top level. "
"Use --xml-root=name to envelope your output with a root element.")
exit_func(msg.format(program_name))
full_document = True if xml_dtd else False