Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _test(event, namespaces, variables, updateonly=False):
kind, data, pos = event[:3]
retval = None
for steps, size, cursors, cutoff, counter in paths:
# Manage the stack that tells us "where we are" in the stream
if kind is END:
if cursors:
cursors.pop()
continue
elif kind is START:
cursors.append(cursors and cursors[-1] or 0)
elif kind is START_NS or kind is END_NS \
or kind is START_CDATA or kind is END_CDATA:
continue
if updateonly or retval or not cursors:
continue
cursor = cursors[-1]
depth = len(cursors)
if cutoff and depth + int(kind is not START) > cutoff[0]:
continue
for mark, event in stream:
if mark is None:
yield mark, event
continue
result = test(event, namespaces, variables)
# XXX This is effectively genshi.core._ensure() for transform
# streams.
if result is True:
if event[0] is START:
yield ENTER, event
depth = 1
while depth > 0:
mark, subevent = next()
if subevent[0] is START:
depth += 1
elif subevent[0] is END:
depth -= 1
if depth == 0:
yield EXIT, subevent
else:
yield INSIDE, subevent
test(subevent, namespaces, variables, updateonly=True)
else:
yield OUTSIDE, event
elif isinstance(result, Attrs):
# XXX Selected *attributes* are given a "kind" of None to
# indicate they are not really part of the stream.
yield ATTR, (ATTR, (QName(event[1][0] + '@*'), result), event[2])
yield None, event
elif isinstance(result, tuple):
yield OUTSIDE, result
elif result:
def _generate(stream=stream, ns=namespaces, vs=variables):
next = stream.__next__
test = self.test()
for event in stream:
result = test(event, ns, vs)
if result is True:
yield event
if event[0] is START:
depth = 1
while depth > 0:
subevent = next()
if subevent[0] is START:
depth += 1
elif subevent[0] is END:
depth -= 1
yield subevent
test(subevent, ns, vs, updateonly=True)
elif result:
yield result
return Stream(_generate(),
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)
def _handle_end(self, tag):
self._enqueue(END, QName(tag))
attrns = attr.namespace
if attrns:
if attrns not in namespaces:
prefix = _gen_prefix()
_push_ns(prefix, attrns)
_push_ns_attr(('xmlns:%s' % prefix, attrns))
else:
prefix = namespaces[attrns][-1]
if prefix:
attrname = u'%s:%s' % (prefix, attrname)
new_attrs.append((attrname, value))
yield kind, (tagname, Attrs(ns_attrs + new_attrs)), pos
del ns_attrs[:]
elif kind is END:
tagname = data.localname
tagns = data.namespace
if tagns:
prefix = namespaces[tagns][-1]
if prefix:
tagname = u'%s:%s' % (prefix, tagname)
yield kind, tagname, pos
elif kind is START_NS:
prefix, uri = data
if uri not in namespaces:
prefix = prefixes.get(uri, [prefix])[-1]
_push_ns_attr(_make_ns_attr(prefix, uri))
_push_ns(prefix, uri)
elif kind is END_NS:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
attrs = Attrs([(QName("{%s}%s" % attr if attr[0] is not None else attr[1]), value)
for attr, value in token["data"].items()])
yield (START, (QName(name), attrs), (None, -1, -1))
if type == "EmptyTag":
type = "EndTag"
if type == "EndTag":
if token["namespace"]:
name = "{%s}%s" % (token["namespace"], token["name"])
else:
name = token["name"]
yield END, QName(name), (None, -1, -1)
elif type == "Comment":
yield COMMENT, token["data"], (None, -1, -1)
elif type == "Doctype":
yield DOCTYPE, (token["name"], token["publicId"],
token["systemId"]), (None, -1, -1)
else:
pass # FIXME: What to do?
if text:
yield TEXT, "".join(text), (None, -1, -1)
while 1:
while not done and len(self._queue) == 0:
data = self.source.read(bufsize)
if data == '': # end of data
self.close()
done = True
else:
self.feed(data)
for kind, data, pos in self._queue:
yield kind, data, pos
self._queue = []
if done:
open_tags = self._open_tags
open_tags.reverse()
for tag in open_tags:
yield END, QName(tag), pos
break
except html.HTMLParseError, e:
msg = '%s: line %d, column %d' % (e.msg, e.lineno, e.offset)
raise ParseError(msg, self.filename, e.lineno, e.offset)
return Stream(_generate()).filter(_coalesce)
have_decl = have_doctype = False
in_cdata = False
for filter_ in self.filters:
stream = filter_(stream)
for kind, data, pos in stream:
if kind is START or kind is EMPTY:
tag, attrib = data
buf = ['<', tag]
for attr, value in attrib:
buf += [' ', attr, '="', escape(value), '"']
buf.append(kind is EMPTY and '/>' or '>')
yield Markup(u''.join(buf))
elif kind is END:
yield Markup('' % data)
elif kind is TEXT:
if in_cdata:
yield data
else:
yield escape(data, quotes=False)
elif kind is COMMENT:
yield Markup('' % data)
elif kind is XML_DECL and not have_decl:
version, encoding, standalone = data
buf = ['
self.delete(old_start + idx, old_end + idx)
break
# the best case. We're in both cases dealing with the same
# event type. This is the easiest because all routines we
# have can deal with that.
if old_event[0] == new_event[0]:
type = old_event[0]
# start tags are easy. handle them first.
if type == START:
_, (tag, attrs), pos = new_event
self.enter_mark_replaced(pos, tag, attrs)
# ends in replacements are a bit tricker, we try to
# leave the new one first, then the old one. One
# should succeed.
elif type == END:
_, tag, pos = new_event
if not self.leave(pos, tag):
self.leave(pos, old_event[1])
# replaced text is internally diffed again
elif type == TEXT:
_, new_text, pos = new_event
self.diff_text(pos, old_event[1], new_text)
# for all other stuff we ignore the old event
else:
self.append(*new_event)
# ob boy, now the ugly stuff starts. Let's handle the
# easy one first. If the old event was text and the
# new one is the start or end of a tag, we just process
# both of them. The text is deleted, the rest is handled.
elif old_event[0] == TEXT and new_event[0] in (START, END):