Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def yamlpath(quiet_logger):
"""Returns a YAMLPath with a quiet logger."""
return YAMLPath(quiet_logger)
def test_get_nodes_by_unknown_path_segment_error(self, quiet_logger):
from collections import deque
from enum import Enum
from yamlpath.enums import PathSegmentTypes
names = [m.name for m in PathSegmentTypes] + ['DNF']
PathSegmentTypes = Enum('PathSegmentTypes', names)
yamldata = """---
key: value
"""
yaml = YAML()
data = yaml.load(yamldata)
processor = Processor(quiet_logger, data)
path = YAMLPath("abc")
stringified = str(path) # Force Path to parse
path._escaped = deque([
(PathSegmentTypes.DNF, "abc"),
])
with pytest.raises(NotImplementedError):
nodes = list(processor._get_nodes_by_path_segment(data, path, 0))
NodeCoords(flatten_node, node_coord.parent, flatten_idx))
node_coords = flat_nodes
# As long as each next segment is an ADDITION or SUBTRACTION
# COLLECTOR, keep combining the results.
segments = yaml_path.escaped
next_segment_idx = segment_index + 1
# pylint: disable=too-many-nested-blocks
while next_segment_idx < len(segments):
(peek_type, peek_attrs) = segments[next_segment_idx]
if (
peek_type is PathSegmentTypes.COLLECTOR
and isinstance(peek_attrs, CollectorTerms)
):
peek_path: YAMLPath = YAMLPath(peek_attrs.expression)
if peek_attrs.operation == CollectorOperators.ADDITION:
for node_coord in self._get_required_nodes(
data, peek_path, 0, parent, parentref):
if (isinstance(node_coord, NodeCoords)
and isinstance(node_coord.node, list)):
for coord_idx, coord in enumerate(node_coord.node):
if not isinstance(coord, NodeCoords):
coord = NodeCoords(
coord, node_coord.node, coord_idx)
node_coords.append(coord)
else:
node_coords.append(node_coord)
elif peek_attrs.operation == CollectorOperators.SUBTRACTION:
rem_data = []
for node_coord in self._get_required_nodes(
data, peek_path, 0, parent, parentref):
escape_path_section(ele.anchor.value, pathsep)
)
if (not include_value_aliases
and anchor_matched in exclude_alias_matchers):
continue
if isinstance(ele, (CommentedMap, CommentedSeq)):
for path in yield_children(
logger, ele, terms, pathsep, tmp_path, seen_anchors,
search_anchors=search_anchors,
include_key_aliases=include_key_aliases,
include_value_aliases=include_value_aliases):
yield path
else:
yield YAMLPath(tmp_path)
elif isinstance(data, CommentedMap):
if build_path:
build_path += str(pathsep)
elif pathsep is PathSeperators.FSLASH:
build_path = str(pathsep)
pool = data.non_merged_items()
if include_key_aliases or include_value_aliases:
pool = data.items()
for key, val in pool:
tmp_path = build_path + escape_path_section(key, pathsep)
key_anchor_matched = search_anchor(
key, terms, seen_anchors, search_anchors=search_anchors,
default = PathSeperators.AUTO
Returns: (Generator) The requested YAML nodes as they are matched
Raises:
- `YAMLPathException` when YAML Path is invalid
"""
mustexist: bool = kwargs.pop("mustexist", False)
default_value: Any = kwargs.pop("default_value", None)
pathsep: PathSeperators = kwargs.pop("pathsep", PathSeperators.AUTO)
if self.data is None:
return
if isinstance(yaml_path, str):
yaml_path = YAMLPath(yaml_path, pathsep)
elif pathsep is not PathSeperators.AUTO:
yaml_path.seperator = pathsep
if mustexist:
matched_nodes: int = 0
for node_coords in self._get_required_nodes(self.data, yaml_path):
matched_nodes += 1
self.logger.debug(
"Processor::get_nodes: Relaying required node {}:"
.format(type(node_coords))
)
self.logger.debug(node_coords)
yield node_coords
if matched_nodes < 1:
raise YAMLPathException(
if anchor_matched in [AnchorMatches.MATCH,
AnchorMatches.ALIAS_INCLUDED]:
logger.debug(
("yaml_paths::search_for_paths:"
+ "yielding an Anchor/Alias match, {}.")
.format(tmp_path)
)
if expand_children:
for path in yield_children(
logger, ele, terms, pathsep, tmp_path,
seen_anchors, search_anchors=search_anchors,
include_key_aliases=include_key_aliases,
include_value_aliases=include_value_aliases):
yield path
else:
yield YAMLPath(tmp_path)
continue
if isinstance(ele, (CommentedSeq, CommentedMap)):
logger.debug(
"yaml_paths::search_for_paths:"
+ "recursing into complex data:"
)
logger.debug(ele)
logger.debug(">>>> >>>> >>>> >>>> >>>> >>>> >>>>")
for subpath in search_for_paths(
logger, processor, ele, terms, pathsep, tmp_path,
seen_anchors, search_values=search_values,
search_keys=search_keys, search_anchors=search_anchors,
include_key_aliases=include_key_aliases,
include_value_aliases=include_value_aliases,
decrypt_eyaml=decrypt_eyaml,
if (anchor_matched is AnchorMatches.UNSEARCHABLE_ALIAS
and not include_value_aliases):
continue
check_value = ele
if decrypt_eyaml and processor.is_eyaml_value(ele):
check_value = processor.decrypt_eyaml(ele)
matches = search_matches(method, term, check_value)
if (matches and not invert) or (invert and not matches):
logger.debug(
("yaml_paths::search_for_paths:"
+ "yielding VALUE match, {}: {}."
).format(check_value, tmp_path)
)
yield YAMLPath(tmp_path)
# pylint: disable=too-many-nested-blocks
elif isinstance(data, CommentedMap):
if build_path:
build_path += strsep
elif pathsep is PathSeperators.FSLASH:
build_path = strsep
pool = data.non_merged_items()
if include_key_aliases or include_value_aliases:
pool = data.items()
for key, val in pool:
tmp_path = build_path + escape_path_section(key, pathsep)
# Search the value anchor to have it on record, in case the key
if (val_anchor_matched is AnchorMatches.UNSEARCHABLE_ALIAS
and not include_value_aliases):
continue
check_value = val
if decrypt_eyaml and processor.is_eyaml_value(val):
check_value = processor.decrypt_eyaml(val)
matches = search_matches(method, term, check_value)
if (matches and not invert) or (invert and not matches):
logger.debug(
("yaml_paths::search_for_paths:"
+ "yielding VALUE match, {}: {}."
).format(check_value, tmp_path)
)
yield YAMLPath(tmp_path)
(not include_key_aliases
and key_anchor_matched in exclude_alias_matchers)
or (not include_value_aliases
and val_anchor_matched in exclude_alias_matchers)
):
continue
if isinstance(val, (CommentedSeq, CommentedMap)):
for path in yield_children(
logger, val, terms, pathsep, tmp_path, seen_anchors,
search_anchors=search_anchors,
include_key_aliases=include_key_aliases,
include_value_aliases=include_value_aliases):
yield path
else:
yield YAMLPath(tmp_path)
else:
if not build_path and pathsep is PathSeperators.FSLASH:
build_path = str(pathsep)
yield YAMLPath(build_path)