Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
UNSAFE_THEME_TAGS = ("implementation", "browser", "xul:browser", "xul:script")
SELF_CLOSING_TAGS = ("area", "base", "basefont", "br", "col", "frame", "hr",
"img", "input", "li", "link", "meta", "p", "param", )
SAFE_IFRAME_TYPES = ("content", "content-primary", "content-targetable", )
TAG_NOT_OPENED = "Tag (%s) being closed before it is opened."
DOM_MUTATION_HANDLERS = (
"ondomattrmodified", "ondomattributenamechanged",
"ondomcharacterdatamodified", "ondomelementnamechanged",
"ondomnodeinserted", "ondomnodeinsertedintodocument", "ondomnoderemoved",
"ondomnoderemovedfromdocument", "ondomsubtreemodified", )
UNSAFE_THEME_XBL = ("constructor", "destructor", "field", "getter",
"implementation", "setter", )
class MarkupParser(htmlparser.HTMLParser):
"""Parse and analyze the versious components of markup files."""
def __init__(self, err, strict=True, debug=False):
htmlparser.HTMLParser.__init__(self)
self.err = err
self.line = 0
self.strict = strict
self.debug = debug
self.context = None
self.xml_state = []
self.xml_line_stack = []
self.xml_buffer = []
self.xbl = False
async def async_setup_platform(
hass, config, async_add_entities, discovery_info=None): # pylint: disable=unused-argument
"""Setup sensor platform."""
channel_id = config['channel_id']
session = async_create_clientsession(hass)
try:
url = BASE_URL.format(channel_id)
async with async_timeout.timeout(10, loop=hass.loop):
response = await session.get(url)
info = await response.text()
name = html.parser.HTMLParser().unescape(
info.split('<title>')[1].split('</')[0])
except Exception: # pylint: disable=broad-except
name = None
if name is not None:
async_add_entities([YoutubeSensor(channel_id, name, session)], True)
</title>
__author__ = "Zoey Young (ydingmiao@gmail.com)"
__about__ = """
应用中的一些功能函数
"""
from tornado import httpclient
from functools import lru_cache
from readability import htmls
from readability.readability import Document, Summary, get_clean_html
import re
import jieba
import jieba.analyse
import logging
import json
import html.parser
from .models import Webpage
html_parser = html.parser.HTMLParser()
from .config import LOG, DB
# import urllib
log = logging.getLogger(LOG)
predefine_sites_collection = DB.predefine_sites_col
jieba.initialize()
# 格式化标签
def format_tags(str=None):
if str is None:
str = ''
tags = re.split('[,,|]', str)
tags = [tag.strip() for tag in tags]
tags = [tag for tag in tags if len(tag) > 0]
tags = list(set([tag.upper() for tag in tags]))
return tags
def trim_tweet_data(tweet_data, screen_name, alt_rt_style, is_py3):
# Because of the huge amount of data, we need to cut down on most of it because we only really want
# a small subset of it. This also prevents the output buffer from overflowing when fetching many tweets
# at once.
h = html.parser.HTMLParser()
output = []
for message in tweet_data:
if message.get('retweeted_status'):
if alt_rt_style:
if message['user']['screen_name'] == screen_name:
#escape highlighting
message['user']['screen_name'] = ""
message['text'] = message['retweeted_status']['text'] + " (retweeted by " + message['user']['screen_name'] + ")"
message['user'] = message['retweeted_status']['user']
else:
message['text'] = "RT @{}: {}".format(message['retweeted_status']['user']['screen_name'],
message['retweeted_status']['text'])
mes_list = [calendar.timegm(time.strptime(message['created_at'],'%a %b %d %H:%M:%S +0000 %Y')),
message['user']['screen_name'],
message['id_str']]
USER_EXPANDABLE = ["friendStatus", "wishlistStatus", "blockedStatus"]
LOCALE_CODES = ["de-DE", "en-US", "fr-FR", "pt-BR", "pl-PL", "ru-RU", "zh-Hans"]
CURRENCY_CODES = [
"USD", "EUR", "GBP", "AUD", "RUB", "PLN", "CAD", "CHF", "NOK", "SEK", "DKK"
]
def find_scripts(site):
parser = ScriptParser()
parser.feed(site)
return parser.scripts
class ScriptParser(html.parser.HTMLParser):
def __init__(self):
super().__init__()
self.last_tag = None
self.scripts = []
def handle_starttag(self, tag, attrs):
self.last_tag = tag
def handle_data(self, data):
if self.last_tag == "script":
self.scripts.append(data)
class GogApi:
def __init__(self, token=None):
url = urlparse.urljoin(response.url, pair[0])
response = urllib2.urlopen(url)
if not isinstance(response.read(0), bytes):
subscriptions = read(
SubscriptionList,
[bytes(response.read(), 'utf-8')] # FIXME
)
complete(subscriptions)
else:
subscriptions = read(SubscriptionList, response)
complete(subscriptions)
response.close()
return subscriptions
class BlogrollLinkParser(HTMLParser.HTMLParser):
"""HTML parser that find all blogroll links."""
SUPPORTED_TYPES = {
'application/xml+opml': 15,
'text/opml': 10, 'text/x-opml': 10,
'application/xml': 5, 'text/xml': 5,
}
def __init__(self):
HTMLParser.HTMLParser.__init__(self)
self.links = []
def handle_starttag(self, tag, attrs):
if tag not in ('a', 'link'):
return
attrs = dict(attrs)
def __init__(self):
HTMLParser.HTMLParser.__init__(self)
self.users = []
self.currentUser = {}
self.currentUser['appNames'] = []
self.currentAppname = ""
#in relevant area?
self.in_creator_paragraph = False;
self.in_approvedApplications = False;
self.in_approvedApplicationsSubDivCount = 0
init_deezer_session()
class Deezer404Exception(Exception):
pass
class Deezer403Exception(Exception):
pass
class DeezerApiException(Exception):
pass
class ScriptExtractor(html.parser.HTMLParser):
""" extract
f.close()
except IOError: # as msg:
# g.es("error reading %s: %s" % (name, msg))
# g.es("...not found: " + name)
c.setBodyString(p,"") # Clear the body text.
return True # Mark the node as changed.
else:
ext = os.path.splitext(parse[2])[1]
if ext.lower() in ['.htm', '.html']:
#@+<< convert HTML to text >>
#@+node:edream.110203113231.895: *3* << convert HTML to text >>
fh = StringIO()
fmt = AbstractFormatter(DumbWriter(fh))
# the parser stores parsed data into fh (file-like handle)
### pylint: disable=too-many-function-args
parser = HTMLParser(fmt)
# send the HTML text to the parser
parser.feed(new)
parser.close()
# now replace the old string with the parsed text
new = fh.getvalue()
fh.close()
# finally, get the list of hyperlinks and append to the end of the text
### pylint: disable=no-member
hyperlinks = parser.anchorlist
numlinks = len(hyperlinks)
if numlinks > 0:
hyperlist = ['\n\n--Hyperlink list follows--']
for i in range(numlinks):
def trim_tweet_data(tweet_data, screen_name, alt_rt_style, is_py3):
# Because of the huge amount of data, we need to cut down on most of it
# because we only really want a small subset of it. This also prevents the
# output buffer from overflowing when fetching many tweets at once.
h = html.parser.HTMLParser()
output = []
for message in tweet_data:
if message.get('retweeted_status'):
if alt_rt_style:
if message['user']['screen_name'] == screen_name:
#escape highlighting
message['user']['screen_name'] = ""
if message.get('full_text'):
message['full_text'] = message['retweeted_status']['full_text'] + \
" (retweeted by " + message['user']['screen_name'] + ")"
else:
message['text'] = message['retweeted_status']['text'] + \
" (retweeted by " + message['user']['screen_name'] + ")"
message['user'] = message['retweeted_status']['user']
else: