Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_yield_seq_children_direct(self, tmp_path_factory, quiet_logger):
from yamlpath.enums import PathSeperators, PathSearchMethods
from yamlpath.path import SearchTerms
from yamlpath.func import get_yaml_data, get_yaml_editor
from yamlpath.commands.yaml_paths import yield_children
from itertools import zip_longest
content = """---
- &value Test value
- value
- *value
"""
processor = get_yaml_editor()
yaml_file = create_temp_yaml_file(tmp_path_factory, content)
yaml_data = get_yaml_data(processor, quiet_logger, yaml_file)
seen_anchors = []
assertions = ["/&value", "/[1]"]
results = []
for assertion, path in zip_longest(assertions, yield_children(
quiet_logger, yaml_data,
SearchTerms(False, PathSearchMethods.EQUALS, "*", "value"),
PathSeperators.FSLASH, "", seen_anchors, search_anchors=True,
include_aliases=False
)):
assert assertion == str(path)
def test_get_yaml_data_filenotfound_error(
self, capsys, quiet_logger,
force_ruamel_load_keyboardinterrupt
):
yp = get_yaml_editor()
assert None == get_yaml_data(yp, quiet_logger, "no-such.file")
captured = capsys.readouterr()
assert -1 < captured.err.find("File not found")
def test_get_yaml_data_construction_error(
self, capsys, quiet_logger, tmp_path_factory
):
yp = get_yaml_editor()
content = """---
missing:
<<:
"""
yaml_file = create_temp_yaml_file(tmp_path_factory, content)
assert None == get_yaml_data(yp, quiet_logger, yaml_file)
captured = capsys.readouterr()
assert -1 < captured.err.find("YAML construction error")
from yamlpath.func import get_yaml_data, get_yaml_editor
from yamlpath.commands.yaml_paths import yield_children
from itertools import zip_longest
content = """---
aliases:
- &aValue val2
hash:
key1: val1
key2: *aValue
key3: val3
"""
processor = get_yaml_editor()
yaml_file = create_temp_yaml_file(tmp_path_factory, content)
yaml_data = get_yaml_data(processor, quiet_logger, yaml_file)
seen_anchors = []
results = []
for assertion, path in zip_longest(assertions, yield_children(
quiet_logger, yaml_data,
SearchTerms(False, PathSearchMethods.EQUALS, "*", "anchor"),
PathSeperators.FSLASH, "", seen_anchors, search_anchors=True,
include_value_aliases=include_aliases
)):
assert assertion == str(path)
def test_get_yaml_data_keyboardinterrupt_error(
self, capsys, quiet_logger, tmp_path_factory,
force_ruamel_load_keyboardinterrupt
):
yp = get_yaml_editor()
content = """---
no: ''
"""
yaml_file = create_temp_yaml_file(tmp_path_factory, content)
assert None == get_yaml_data(yp, quiet_logger, yaml_file)
captured = capsys.readouterr()
assert -1 < captured.err.find("keyboard interrupt")
new_value = ''.join(sys.stdin.readlines())
elif args.file:
with open(args.file, 'r') as fhnd:
new_value = fhnd.read().rstrip()
elif args.random is not None:
new_value = ''.join(
secrets.choice(
string.ascii_uppercase + string.ascii_lowercase + string.digits
) for _ in range(args.random)
)
# Prep the YAML parser
yaml = get_yaml_editor()
# Attempt to open the YAML file; check for parsing errors
yaml_data = get_yaml_data(yaml, log, args.yaml_file)
if yaml_data is None:
# An error message has already been logged
sys.exit(1)
# Load the present value at the specified YAML Path
change_node_coordinates = []
old_format = YAMLValueFormats.DEFAULT
processor = EYAMLProcessor(
log, yaml_data, binary=args.eyaml,
publickey=args.publickey, privatekey=args.privatekey)
try:
for node_coordinate in processor.get_nodes(
change_path, mustexist=(args.mustexist or args.saveto),
default_value=("" if new_value else " ")):
log.debug('Got "{}" from {}.'.format(node_coordinate, change_path))
change_node_coordinates.append(node_coordinate)
def main():
"""Main code."""
args = processcli()
log = ConsolePrinter(args)
validateargs(args, log)
yaml_path = YAMLPath(args.query, pathsep=args.pathsep)
# Prep the YAML parser
yaml = get_yaml_editor()
# Attempt to open the YAML file; check for parsing errors
yaml_data = get_yaml_data(yaml, log, args.yaml_file)
if yaml_data is None:
# An error message has already been logged
sys.exit(1)
# Seek the queried value(s)
discovered_nodes = []
processor = EYAMLProcessor(
log, yaml_data, binary=args.eyaml,
publickey=args.publickey, privatekey=args.privatekey)
try:
for node in processor.get_eyaml_values(yaml_path, mustexist=True):
log.debug("Got {} from {}.".format(repr(node), yaml_path))
discovered_nodes.append(unwrap_node_coords(node))
except YAMLPathException as ex:
log.critical(ex, 1)
except EYAMLCommandException as ex:
backup_file = yaml_file + ".bak"
seen_anchors = []
# Each YAML_FILE must actually be a file
if not isfile(yaml_file):
log.error("Not a file: {}".format(yaml_file))
exit_state = 2
continue
# Don't bother with the file change update when there's only one input
# file.
if in_file_count > 1:
log.info("Processing {}...".format(yaml_file))
# Try to open the file
yaml_data = get_yaml_data(yaml, log, yaml_file)
if yaml_data is None:
# An error message has already been logged
exit_state = 3
continue
# Process all EYAML values
processor.data = yaml_data
for yaml_path in processor.find_eyaml_paths():
# Use ::get_nodes() instead of ::get_eyaml_values() here in order
# to ignore values that have already been decrypted via their
# Anchors.
for node_coordinate in processor.get_nodes(yaml_path):
node = node_coordinate.node
# Ignore values which are Aliases for those already decrypted
anchor_name = (
node.anchor.value if hasattr(node, "anchor") else None
elif args.include_aliases is IncludeAliases.INCLUDE_VALUE_ALIASES:
include_value_aliases = True
# Prepare the YAML processor
yaml = get_yaml_editor()
processor = EYAMLProcessor(
log, None, binary=args.eyaml,
publickey=args.publickey, privatekey=args.privatekey)
# Process the input file(s)
exit_state = 0
# pylint: disable=too-many-nested-blocks
for yaml_file in args.yaml_files:
# Try to open the file
yaml_data = get_yaml_data(yaml, log, yaml_file)
if yaml_data is None:
# An error message has already been logged
exit_state = 3
continue
# Process all searches
processor.data = yaml_data
yaml_paths = []
for expression in args.search:
exterm = get_search_term(log, expression)
log.debug(("yaml_paths::main:"
+ "converting search expression '{}' into '{}'"
).format(expression, exterm))
if exterm is None:
exit_state = 1
continue