Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if result is True:
yield event
if event[0] is START:
depth = 1
while depth > 0:
subevent = stream.next()
if subevent[0] is START:
depth += 1
elif subevent[0] is END:
depth -= 1
yield subevent
test(subevent, namespaces, variables,
updateonly=True)
elif result:
yield result
return Stream(_generate(),
serializer=getattr(stream, 'serializer', None))
if not isinstance(data, str):
raise UnicodeError("source returned bytes, but no encoding specified")
self.feed(data)
for kind, data, pos in self._queue:
yield kind, data, pos
self._queue = []
if done:
open_tags = self._open_tags
open_tags.reverse()
for tag in open_tags:
yield END, QName(tag), pos
break
except html.HTMLParseError as e:
msg = '%s: line %d, column %d' % (e.msg, e.lineno, e.offset)
raise ParseError(msg, self.filename, e.lineno, e.offset)
return Stream(_generate()).filter(_coalesce)
def _parse(self, source, encoding):
streams = [[]] # stacked lists of events of the "compiled" template
dirmap = {} # temporary mapping of directives to elements
ns_prefix = {}
depth = 0
fallbacks = []
includes = []
if not isinstance(source, Stream):
source = XMLParser(source, filename=self.filename,
encoding=encoding)
for kind, data, pos in source:
stream = streams[-1]
if kind is START_NS:
# Strip out the namespace declaration for template directives
prefix, uri = data
ns_prefix[prefix] = uri
if uri not in (self.DIRECTIVE_NAMESPACE,
self.XINCLUDE_NAMESPACE):
stream.append((kind, data, pos))
elif kind is END_NS:
uri = ns_prefix.pop(data, None)
def __str__(self):
return self.render()
def __unicode__(self):
return self.render(encoding=None)
def __html__(self):
return self
START = Stream.START
END = Stream.END
TEXT = Stream.TEXT
XML_DECL = Stream.XML_DECL
DOCTYPE = Stream.DOCTYPE
START_NS = Stream.START_NS
END_NS = Stream.END_NS
START_CDATA = Stream.START_CDATA
END_CDATA = Stream.END_CDATA
PI = Stream.PI
COMMENT = Stream.COMMENT
def _ensure(stream):
"""Ensure that every item on the stream is actually a markup event."""
stream = iter(stream)
try:
event = stream.next()
except StopIteration:
return
# Check whether the iterable is a real markup event stream by examining the
done = True
else:
self.feed(data)
for kind, data, pos in self._queue:
yield kind, data, pos
self._queue = []
if done:
open_tags = self._open_tags
open_tags.reverse()
for tag in open_tags:
yield END, QName(tag), pos
break
except html.HTMLParseError, e:
msg = '%s: line %d, column %d' % (e.msg, e.lineno, e.offset)
raise ParseError(msg, self.filename, e.lineno, e.offset)
return Stream(_generate()).filter(_coalesce)
Unlike with `XMLParser`, the returned stream is reusable, meaning it can be
iterated over multiple times:
>>> xml = XML('FooBar')
>>> print xml
FooBar
>>> print xml.select('elem')
FooBar
>>> print xml.select('elem/text()')
FooBar
:param text: the XML source
:return: the parsed XML event stream
:raises ParseError: if the XML text is not well-formed
"""
return Stream(list(XMLParser(StringIO(text))))
def append(self, node):
"""Append an element or string as child node.
:param node: the node to append; can be an `Element`, `Fragment`, or a
`Stream`, or a Python string or number
"""
if isinstance(node, (Stream, Element, basestring, int, float, long)):
# For objects of a known/primitive type, we avoid the check for
# whether it is iterable for better performance
self.children.append(node)
elif isinstance(node, Fragment):
self.children.extend(node.children)
elif node is not None:
try:
self.append(Stream(iter(node)))
except TypeError:
self.children.append(node)
def __unicode__(self):
return self.render(encoding=None)
def __html__(self):
return self
START = Stream.START
END = Stream.END
TEXT = Stream.TEXT
XML_DECL = Stream.XML_DECL
DOCTYPE = Stream.DOCTYPE
START_NS = Stream.START_NS
END_NS = Stream.END_NS
START_CDATA = Stream.START_CDATA
END_CDATA = Stream.END_CDATA
PI = Stream.PI
COMMENT = Stream.COMMENT
def _ensure(stream):
"""Ensure that every item on the stream is actually a markup event."""
stream = iter(stream)
event = stream.next()
# Check whether the iterable is a real markup event stream by examining the
# first item it yields; if it's not we'll need to do some conversion
if type(event) is not tuple or len(event) != 3:
for event in chain([event], stream):
if hasattr(event, 'totuple'):
event = event.totuple()
def __call__(self, stream, keep_marks=False):
"""Apply the transform filter to the marked stream.
:param stream: the marked event stream to filter
:param keep_marks: Do not strip transformer selection marks from the
stream. Useful for testing.
:return: the transformed stream
:rtype: `Stream`
"""
transforms = self._mark(stream)
for link in self.transforms:
transforms = link(transforms)
if not keep_marks:
transforms = self._unmark(transforms)
return Stream(transforms,
serializer=getattr(stream, 'serializer', None))
def _parse(self, source, encoding):
if not isinstance(source, Stream):
source = XMLParser(source, filename=self.filename,
encoding=encoding)
stream = []
for kind, data, pos in source:
if kind is TEXT:
for kind, data, pos in interpolate(data, self.filepath, pos[1],
pos[2], lookup=self.lookup):
stream.append((kind, data, pos))
elif kind is PI and data[0] == 'python':
if not self.allow_exec:
raise TemplateSyntaxError('Python code blocks not allowed',
self.filepath, *pos[1:])
try: