Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
updated during the iteration.
"""
if not self._lazy:
if path is None:
yield self._root
else:
for e in iter_select(self._root, path, namespaces, strict=False):
yield e
return
elif self.seek(0) == 0:
resource = self.source
elif self._url is not None:
resource = urlopen(self._url, timeout=self.timeout)
else:
self.load()
resource = StringIO(self._text)
if namespaces or namespaces is None:
events = ('start', 'end')
nsmap = None
else:
events = ('start-ns', 'end-ns', 'start', 'end')
nsmap = []
try:
if path is None:
level = 0
for event, node in self.iterparse(resource, events):
if event == "start":
if level == 0:
self._root.clear()
self._root = node
:returns: an ElementTree instance.
"""
if self.defuse == 'always' or self.defuse == 'remote' and \
hasattr(source, 'read') and is_remote_url(self.base_url):
if hasattr(source, 'read'):
text = source.read()
else:
with open(source) as f:
text = f.read()
if isinstance(text, bytes):
self.defusing(BytesIO(text))
return ElementTree.parse(BytesIO(text))
else:
self.defusing(StringIO(text))
return ElementTree.parse(StringIO(text))
else:
return ElementTree.parse(source)
"""
if self.defuse == 'always' or self.defuse == 'remote' and \
hasattr(source, 'read') and is_remote_url(self.base_url):
if hasattr(source, 'read'):
text = source.read()
else:
with open(source) as f:
text = f.read()
if isinstance(text, bytes):
self.defusing(BytesIO(text))
return ElementTree.parse(BytesIO(text))
else:
self.defusing(StringIO(text))
return ElementTree.parse(StringIO(text))
else:
return ElementTree.parse(source)
def fromstring(self, text):
"""
An equivalent of *ElementTree.fromstring()* that can protect from XML entities attacks.
The protection applied is based on resource *defuse* attribute and *base_url* property.
:param text: a string containing XML data.
:returns: the root Element instance.
"""
if self.defuse == 'always' or self.defuse == 'remote' and is_remote_url(self.base_url):
self.defusing(StringIO(text))
return ElementTree.fromstring(text)
def _fromsource(self, source):
url = None
if hasattr(source, 'tag') and hasattr(source, 'attrib'):
self._lazy = False
return source, None, None # Source is already an Element --> nothing to load
elif isinstance(source, string_base_type):
_url, self._url = self._url, None
try:
if self._lazy:
# check if source is a string containing a valid XML root
for _, root in self.iterparse(StringIO(source), events=('start',)):
return root, source, None
else:
return self.fromstring(source), source, None
except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
if '\n' in source:
raise
finally:
self._url = _url
url = normalize_url(source) if '\n' not in source else None
elif isinstance(source, StringIO):
_url, self._url = self._url, None
try:
if self._lazy:
for _, root in self.iterparse(source, events=('start',)):
index = int(match.group()) + 1
prefix = prefix[:match.span()[0]] + str(index)
else:
prefix += '0'
nsmap[prefix] = uri
nsmap = {}
if not self.namespace:
nsmap[''] = ''
if namespaces:
nsmap.update(namespaces)
if self._url is not None or hasattr(self.source, 'read'):
resource = self.open()
elif isinstance(self._text, string_base_type):
resource = StringIO(self._text)
else:
if hasattr(self._root, 'nsmap'):
# Can extract namespace mapping information only from lxml etree structures
for elem in self._root.iter():
for k, v in elem.nsmap.items():
update_nsmap(k if k is not None else '', v)
if nsmap.get('') == '':
del nsmap['']
return nsmap
try:
for event, node in self.iterparse(resource, events=('start-ns', 'end')):
if event == 'end':
node.clear()
else:
try:
if self._lazy:
# check if source is a string containing a valid XML root
for _, root in self.iterparse(StringIO(source), events=('start',)):
return root, source, None
else:
return self.fromstring(source), source, None
except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
if '\n' in source:
raise
finally:
self._url = _url
url = normalize_url(source) if '\n' not in source else None
elif isinstance(source, StringIO):
_url, self._url = self._url, None
try:
if self._lazy:
for _, root in self.iterparse(source, events=('start',)):
return root, source.getvalue(), None
else:
return self.parse(source).getroot(), source.getvalue(), None
finally:
self._url = _url
elif hasattr(source, 'read'):
try:
# Save remote urls for open new resources (non seekable)
if is_remote_url(source.url):
url = source.url
except AttributeError:
def iter(self, tag=None):
"""XML resource tree iterator."""
if not self._lazy:
for elem in self._root.iter(tag):
yield elem
return
elif self.seek(0) == 0:
resource = self.source
elif self._url is not None:
resource = urlopen(self._url, timeout=self.timeout)
else:
resource = StringIO(self._text)
# Note: lazy iteration change the order (top level element is the last)
try:
for event, elem in self.iterparse(resource, events=('end',)):
if tag is None or elem.tag == tag:
yield elem
elem.clear()
finally:
if resource is not self.source:
resource.close()