Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def do_run(self):
import feedparser
# Tweek feedparser to accept XML as content
feedparser._FeedParserMixin.unknown_starttag = feedparser_unknown_starttag
feedparser._FeedParserMixin.unknown_endtag = feedparser_unknown_endtag
feedparser._sanitizeHTML = lambda source, encoding: source
self.logger.debug('Starting')
# Does not accept events pre-dating the startup
self.last_event = time.gmtime()
if self.url == None:
raise Exception('Attribute url must be set')
while True:
self.logger.debug("Reading feed")
feed = feedparser.parse(self.url)
last_update = self.last_event
def _mapContentType(self, contentType):
contentType = feedparser._FeedParserMixin.mapContentType(self, contentType)
if contentType == 'plain':
contentType = 'text/plain'
return contentType
general_ti,
specific_abs,
specific_ti)
query = 'search_query=%s&start=%i&max_results=%i' % (search_query,
start,
max_results)
if only_recent:
suffix = "&sortBy=submittedDate&sortOrder=descending"
query += suffix
full_url = base_url + query
print(full_url)
feedparser._FeedParserMixin.namespaces['http://a9.com/-/spec/opensearch/1.1/'] = 'opensearch'
feedparser._FeedParserMixin.namespaces['http://arxiv.org/schemas/atom'] = 'arxiv'
with urllib.request.urlopen(base_url+query) as url:
response = url.read()
feed = feedparser.parse(response)
print('Feed title: %s' % feed.feed.title)
print('Feed last updated: %s' % feed.feed.updated)
print('totalResults for this query: %s' % feed.feed.opensearch_totalresults)
print('itemsPerPage for this query: %s' % feed.feed.opensearch_itemsperpage)
print('startIndex for this query: %s' % feed.feed.opensearch_startindex)
rows = []
for entry in feed.entries: # extract information & add to list
http://code.google.com/p/feedparser/issues/detail?id=80
Added by Thomas Perl for gPodder 2007-12-29
"""
def mapContentType2(self, contentType):
contentType = contentType.lower()
if contentType == 'text' or contentType == 'plain':
contentType = 'text/plain'
elif contentType == 'html':
contentType = 'text/html'
elif contentType == 'xhtml':
contentType = 'application/xhtml+xml'
return contentType
try:
if feedparser._FeedParserMixin().mapContentType('plain') == 'plain':
log('Patching feedparser module... (mapContentType bugfix)')
feedparser._FeedParserMixin.mapContentType = mapContentType2
except:
log('Warning: feedparser unpatched - might be broken!')
context['media_description']['content'] = value
def _mapContentType(self, contentType):
contentType = feedparser._FeedParserMixin.mapContentType(self, contentType)
if contentType == 'plain':
contentType = 'text/plain'
return contentType
feedparser._FeedParserMixin._start_media_description = (
types.MethodType(
_start_media_description,
None, feedparser._FeedParserMixin))
feedparser._FeedParserMixin._end_media_description = (
types.MethodType(
_end_media_description,
None, feedparser._FeedParserMixin))
if hasattr(feedparser, '_StrictFeedParser'):
feedparser._StrictFeedParser.mapContentType = (
types.MethodType(
_mapContentType,
None, feedparser._StrictFeedParser))
feedparser._LooseFeedParser.mapContentType = (
types.MethodType(
_mapContentType,
None, feedparser._LooseFeedParser))
# Change out feedparser's html sanitizer for our own based
# on BeautifulSoup and our own tag/attribute stripper.
feedparser._sanitizeHTML = sanitize_html
def do_run(self):
import feedparser
# Tweek feedparser to accept XML as content
feedparser._FeedParserMixin.unknown_starttag = feedparser_unknown_starttag
feedparser._FeedParserMixin.unknown_endtag = feedparser_unknown_endtag
feedparser._sanitizeHTML = lambda source, encoding: source
self.logger.debug('Starting')
# Does not accept events pre-dating the startup
self.last_event = time.gmtime()
if self.url == None:
raise Exception('Attribute url must be set')
while True:
self.logger.debug("Reading feed")
feed = feedparser.parse(self.url)
last_update = self.last_event
can.DELETE,
can.TAG,
can.GROUP,
#can.THREAD,
can.THREAD_REPLY,
can.USER_MESSAGES,
],
}
NICK_PARSE = re.compile("\B@([A-Za-z0-9_]+|@[A-Za-z0-9_]$)")
HASH_PARSE = re.compile("\B#([A-Za-z0-9_\-]+|@[A-Za-z0-9_\-]$)")
GROUP_PARSE = re.compile("\B!([A-Za-z0-9_\-]+|![A-Za-z0-9_\-]$)")
def _posticon(self, a): self._getContext()["laconica_posticon"] = a["rdf:resource"]
def _has_creator(self, a): self._getContext()["sioc_has_creator"] = a["rdf:resource"]
feedparser._FeedParserMixin._start_laconica_posticon = _posticon
feedparser._FeedParserMixin._start_sioc_has_creator = _has_creator
class Message:
def __init__(self, client, data):
self.id = data["id"]
self.client = client
self.account = client.account
self.protocol = client.account["protocol"]
self.username = client.account["username"]
self.text = support.xml_escape(data["text"])
if "user" in data:
user = data["user"]
# FIXME: bug in identi.ca 'twitter-compatible' API, no
# in_reply_to_screen_name grr, so we have to extract ourselves
# self.reply_nick = data["in_reply_to_screen_name"]
def update(self, config, feedurl = None):
"""Perform the update action: check feeds for new articles, and
expire old ones."""
config.log("Starting update")
now = time.time()
feedparser._FeedParserMixin.can_contain_relative_uris = ["url"]
feedparser._FeedParserMixin.can_contain_dangerous_markup = []
set_socket_timeout(config["timeout"])
if feedurl is None:
update_feeds = [url for url in self.feeds.keys()
if self.feeds[url].needs_update(now)]
elif self.feeds.has_key(feedurl):
update_feeds = [feedurl]
self.feeds[feedurl].etag = None
self.feeds[feedurl].modified = None
else:
print "No such feed: " + feedurl
update_feeds = []
numfeeds = len(update_feeds)
config.log("Will update ", numfeeds, " feeds")
can.TAG,
can.GROUP,
#can.THREAD,
can.THREAD_REPLY,
can.USER_MESSAGES,
],
}
NICK_PARSE = re.compile("\B@([A-Za-z0-9_]+|@[A-Za-z0-9_]$)")
HASH_PARSE = re.compile("\B#([A-Za-z0-9_\-]+|@[A-Za-z0-9_\-]$)")
GROUP_PARSE = re.compile("\B!([A-Za-z0-9_\-]+|![A-Za-z0-9_\-]$)")
def _posticon(self, a): self._getContext()["laconica_posticon"] = a["rdf:resource"]
def _has_creator(self, a): self._getContext()["sioc_has_creator"] = a["rdf:resource"]
feedparser._FeedParserMixin._start_laconica_posticon = _posticon
feedparser._FeedParserMixin._start_sioc_has_creator = _has_creator
class Message:
def __init__(self, client, data):
self.id = data["id"]
self.client = client
self.account = client.account
self.protocol = client.account["protocol"]
self.username = client.account["username"]
self.text = support.xml_escape(data["text"])
if "user" in data:
user = data["user"]
# FIXME: bug in identi.ca 'twitter-compatible' API, no
# in_reply_to_screen_name grr, so we have to extract ourselves
# self.reply_nick = data["in_reply_to_screen_name"]
screen_names = NICK_PARSE.match(self.text)
if self.contentparams['type'].endswith('/xml'):
return 0
if self.contentparams['type'].endswith('/json'):
return 0
return 0
def token():
""" Return a unique 32-char write-token
"""
return str(uuid.uuid4().hex)
# Override feedparser's buggy isBase64 method until they fix it
feedparser._FeedParserMixin._isBase64 = ib64_patched
def cleanwrap(func):
""" Wrapper for Zotero._cleanup
"""
def enc(self, *args):
""" Send each item to _cleanup() """
return (func(self, item) for item in args)
return enc
def retrieve(func):
"""
Decorator for Zotero read API methods; calls _retrieve_data() and passes
the result to the correct processor, based on a lookup
"""