Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
raise YAMLPathException(
"{} is not an integer array slice"
.format(str_stripped),
str(yaml_path),
str(unstripped_attrs)
)
if intmin == intmax and len(data) > intmin:
yield NodeCoords([data[intmin]], data, intmin)
else:
yield NodeCoords(data[intmin:intmax], data, intmin)
elif isinstance(data, dict):
for key, val in data.items():
if min_match <= key <= max_match:
yield NodeCoords(val, data, key)
else:
try:
idx: int = int(str_stripped)
except ValueError:
raise YAMLPathException(
"{} is not an integer array index"
.format(str_stripped),
str(yaml_path),
str(unstripped_attrs)
)
if isinstance(data, list) and len(data) > idx:
yield NodeCoords(data[idx], data, idx)
def unwrap_node_coords(data: Any) -> Any:
"""Recursively strips all DOM tracking data off of a NodeCoords wrapper."""
if isinstance(data, NodeCoords):
return unwrap_node_coords(data.node)
if isinstance(data, list):
stripped_nodes = []
for ele in data:
stripped_nodes.append(unwrap_node_coords(ele))
return stripped_nodes
return data
for lstidx, ele in enumerate(data):
if attr == '.':
matches = search_matches(method, term, ele)
elif isinstance(ele, dict) and attr in ele:
matches = search_matches(method, term, ele[attr])
if (matches and not invert) or (invert and not matches):
yield NodeCoords(ele, data, lstidx)
elif isinstance(data, dict):
# Allow . to mean "each key's name"
if attr == '.':
for key, val in data.items():
matches = search_matches(method, term, key)
if (matches and not invert) or (invert and not matches):
yield NodeCoords(val, data, key)
elif attr in data:
value = data[attr]
matches = search_matches(method, term, value)
if (matches and not invert) or (invert and not matches):
yield NodeCoords(value, data, attr)
else:
# Check the passed data itself for a match
matches = search_matches(method, term, data)
if (matches and not invert) or (invert and not matches):
yield NodeCoords(data, parent, parentref)
for subnode_coord in self._get_required_nodes(
segment_node_coords.node, yaml_path, depth + 1,
segment_node_coords.parent,
segment_node_coords.parentref):
yield subnode_coord
else:
self.logger.debug(
("Processor::_get_required_nodes: "
+ "Finally returning data of"
+ " type {} at parentref {}:")
.format(type(data), parentref)
)
self.logger.debug(data)
self.logger.debug("")
yield NodeCoords(data, parent, parentref)
.format(terms)
)
invert = terms.inverted
method = terms.method
attr = terms.attribute
term = terms.term
if isinstance(data, list):
for lstidx, ele in enumerate(data):
if attr == '.':
matches = search_matches(method, term, ele)
elif isinstance(ele, dict) and attr in ele:
matches = search_matches(method, term, ele[attr])
if (matches and not invert) or (invert and not matches):
yield NodeCoords(ele, data, lstidx)
elif isinstance(data, dict):
# Allow . to mean "each key's name"
if attr == '.':
for key, val in data.items():
matches = search_matches(method, term, key)
if (matches and not invert) or (invert and not matches):
yield NodeCoords(val, data, key)
elif attr in data:
value = data[attr]
matches = search_matches(method, term, value)
if (matches and not invert) or (invert and not matches):
yield NodeCoords(value, data, attr)
else:
"Cannot add {} subreference to scalars".format(
str(segment_type)
),
str(yaml_path),
except_segment
)
else:
self.logger.debug(
("Processor::_get_optional_nodes: Finally returning data of"
+ " type {}:"
).format(type(data))
)
self.logger.debug(data)
yield NodeCoords(data, parent, parentref)
for key, val in data.items():
if min_match <= key <= max_match:
yield NodeCoords(val, data, key)
else:
try:
idx: int = int(str_stripped)
except ValueError:
raise YAMLPathException(
"{} is not an integer array index"
.format(str_stripped),
str(yaml_path),
str(unstripped_attrs)
)
if isinstance(data, list) and len(data) > idx:
yield NodeCoords(data[idx], data, idx)
# pylint: disable=too-many-nested-blocks
while next_segment_idx < len(segments):
(peek_type, peek_attrs) = segments[next_segment_idx]
if (
peek_type is PathSegmentTypes.COLLECTOR
and isinstance(peek_attrs, CollectorTerms)
):
peek_path: YAMLPath = YAMLPath(peek_attrs.expression)
if peek_attrs.operation == CollectorOperators.ADDITION:
for node_coord in self._get_required_nodes(
data, peek_path, 0, parent, parentref):
if (isinstance(node_coord, NodeCoords)
and isinstance(node_coord.node, list)):
for coord_idx, coord in enumerate(node_coord.node):
if not isinstance(coord, NodeCoords):
coord = NodeCoords(
coord, node_coord.node, coord_idx)
node_coords.append(coord)
else:
node_coords.append(node_coord)
elif peek_attrs.operation == CollectorOperators.SUBTRACTION:
rem_data = []
for node_coord in self._get_required_nodes(
data, peek_path, 0, parent, parentref):
unwrapped_data = unwrap_node_coords(node_coord)
if isinstance(unwrapped_data, list):
for unwrapped_datum in unwrapped_data:
rem_data.append(unwrapped_datum)
else:
rem_data.append(unwrapped_data)
node_coords = [e for e in node_coords
Returns: (Generator[NodeCoords, None, None]) Each NodeCoords as they
are matched
Raises: N/A
"""
(_, stripped_attrs) = yaml_path.escaped[segment_index]
str_stripped = str(stripped_attrs)
self.logger.debug(
"Processor::_get_nodes_by_key: Seeking KEY node at {}."
.format(str_stripped)
)
if isinstance(data, dict):
if stripped_attrs in data:
yield NodeCoords(data[stripped_attrs], data, stripped_attrs)
else:
# Check for a string/int type mismatch
try:
intkey = int(str(stripped_attrs))
if intkey in data:
yield NodeCoords(data[intkey], data, intkey)
except ValueError:
pass
elif isinstance(data, list):
try:
# Try using the ref as a bare Array index
idx = int(str_stripped)
if len(data) > idx:
yield NodeCoords(data[idx], data, idx)
except ValueError:
# Pass-through search against possible Array-of-Hashes