Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'exc_cls', [feedparser.CharacterEncodingOverride, feedparser.NonXMLContentType]
)
def test_parse_survivable_feedparser_exceptions(
monkeypatch, caplog, parse, data_dir, exc_cls
):
"""parse() should not reraise some acceptable feedparser exceptions."""
old_feedparser_parse = feedparser.parse
def feedparser_parse(*args, **kwargs):
rv = old_feedparser_parse(*args, **kwargs)
rv['bozo'] = 1
rv['bozo_exception'] = exc_cls("whatever")
return rv
monkeypatch.setattr('feedparser.parse', feedparser_parse)
if not data:
# fetch the data
data = feedparser.parse(str(self.url), etag, lastModified)
# set etag
SetAttribute(self, data, 'etag')
# set lastModified
modified = data.get('modified')
if modified:
self.lastModified = datetime.fromtimestamp(time.mktime(modified)).replace(tzinfo=None)
# if the feed is bad, raise the sax exception
try:
if data.bozo and not isinstance(data.bozo_exception, feedparser.CharacterEncodingOverride):
logger.error("For url '%s', feedparser exception: %s" % (self.url, data.bozo_exception))
raise data.bozo_exception
except KeyError:
print "Error"
return
self._DoChannel(data['channel'])
count = self._DoItems(data['items'])
if count:
logger.info("...added %d RSSItems" % count)
from DateTime import DateTime
from DateTime.interfaces import DateTimeError
from logging import getLogger
from plone.app.portlets import PloneMessageFactory as _
from plone.app.portlets.portlets import base
from plone.portlets.interfaces import IPortletDataProvider
from Products.Five.browser.pagetemplatefile import ZopeTwoPageTemplateFile
from zope import schema
from zope.interface import implementer, Interface
import feedparser
import time
# Accept these bozo_exceptions encountered by feedparser when parsing
# the feed:
ACCEPTED_FEEDPARSER_EXCEPTIONS = (feedparser.CharacterEncodingOverride, )
# store the feeds here (which means in RAM)
FEED_DATA = {} # url: ({date, title, url, itemlist})
logger = getLogger(__name__)
class IFeed(Interface):
def __init__(url, timeout):
"""initialize the feed with the given url. will not automatically load it
timeout defines the time between updates in minutes
"""
def loaded():
"""return if this feed is in a loaded state"""
# set shorter timeouts and revert default at enf of read
default_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(60.0)
try:
d = feedparser.parse(url, handlers=handlers)
finally:
socket.setdefaulttimeout(default_timeout)
if d.bozo and isinstance(d.bozo_exception, urllib2.URLError):
# we have an URL error
return {'status':-2}
elif d.bozo:
# some bozo exceptions can be ignored
if not isinstance(d.bozo_exception, (
feedparser.CharacterEncodingOverride,
)):
return {'status': -5}
if d.status == 401:
return {'status':-3}
elif d.status == 404:
return {'status':-4}
result['items'] = []
# some feeds may not provide logo
if d.feed.get('image', None) is not None:
result['logo'] = d.feed.image['href']
result['title'] = d.feed.title
result['link'] = d.feed.link
for entry in d.entries:
entry_dict = {}
entry_dict['title'] = entry['title']
ex = rss.get('bozo_exception', False)
if ex or rss.get('bozo'):
if rss.entries:
msg = (
'Bozo error %s while parsing feed, but entries were produced, ignoring the error.'
% type(ex)
)
if config.get('silent', False):
logger.debug(msg)
else:
logger.verbose(msg)
else:
if isinstance(ex, feedparser.NonXMLContentType):
# see: http://www.feedparser.org/docs/character-encoding.html#advanced.encoding.nonxml
logger.debug('ignoring feedparser.NonXMLContentType')
elif isinstance(ex, feedparser.CharacterEncodingOverride):
# see: ticket 88
logger.debug('ignoring feedparser.CharacterEncodingOverride')
elif isinstance(ex, UnicodeEncodeError):
raise plugin.PluginError('Feed has UnicodeEncodeError while parsing...')
elif isinstance(
ex, (xml.sax._exceptions.SAXParseException, xml.sax._exceptions.SAXException)
):
# save invalid data for review, this is a bit ugly but users seem to really confused when
# html pages (login pages) are received
self.process_invalid_content(task, content, config['url'])
if task.options.debug:
logger.error('bozo error parsing rss: {}', ex)
raise plugin.PluginError(
'Received invalid RSS content from task %s (%s)'
% (task.name, config['url'])
)
if not data:
# fetch the data
data = feedparser.parse(str(self.url), etag, lastModified)
# set etag
SetAttribute(self, data, 'etag')
# set lastModified
modified = data.get('modified')
if modified:
self.lastModified = datetime.datetime.fromtimestamp(time.mktime(modified)).replace(tzinfo=None)
# if the feed is bad, raise the sax exception
try:
if data.bozo and not isinstance(data.bozo_exception, feedparser.CharacterEncodingOverride):
logger.error("For url '%s', feedparser exception: %s" % (self.url, data.bozo_exception))
raise data.bozo_exception
except KeyError:
print "Error"
return
self._DoChannel(data['channel'])
count = self._DoItems(data['items'])
if count:
logger.info("...added %d RSSItems" % count)
def process_feed(url):
l.info('Downloading feed: %s' % url)
feed = feedparser.parse(url)
# Not all bozo errors cause total failure
if feed.bozo and isinstance(feed.bozo_exception,
(type(feedparser.NonXMLContentType), type(feedparser.CharacterEncodingOverride))):
l.error("Erroneous feed URL: %s (%s)"%(url, type(feed.bozo_exception)))
return
# When parsing a website or error message, title is missing.
if 'title' not in feed.feed:
l.error("Erroneous feed URL: %s" % url)
return
l.info("Parsing feed: %s"%feed.feed.title)
feed.entries.reverse()
for entry in feed.entries:
EntryProcessor(entry, feed)
def tick(feeds, opts, formatter, seen_id_hashes, iteration, stream=sys.stdout):
for url, last_element_info in feeds.items():
etag, last_mtime, last_update = last_element_info
log.debug('parsing: %r', url)
log.debug('etag: %s', etag)
log.debug('mtime: %s', date_fmt(last_mtime))
feed = feedparser.parse(url, etag=etag, modified=last_mtime)
if feed.bozo == 1:
safeexc = (feedparser.CharacterEncodingOverride,)
if not isinstance(feed.bozo_exception, safeexc):
msg = 'feed error %r:\n%s'
die(msg, opts.nofail, url, feed.bozo_exception)
if iteration == 1 and isinstance(opts.initial, int):
entries = feed.entries[:opts.initial]
else:
entries = feed.entries
if opts.newer:
log.debug('showing entries older than %s', date_fmt(last_update))
entries = [entry for entry in entries if entry.date_parsed > opts.newer]
if last_update:
log.debug('showing entries older than %s', date_fmt(last_update))
entries = [entry for entry in entries if entry.updated_parsed > last_update]
elif isinstance(exc, _SOCKET_ERRORS):
_LOG.error('{}: {}'.format(exc, self))
warned = True
elif isinstance(exc, _feedparser.zlib.error):
_LOG.error('broken compression: {}'.format(self))
warned = True
elif isinstance(exc, (IOError, AttributeError)):
_LOG.error('{}: {}'.format(exc, self))
warned = True
elif isinstance(exc, KeyboardInterrupt):
raise exc
elif isinstance(exc, _sax.SAXParseException):
_LOG.error('sax parsing error: {}: {}'.format(exc, self))
warned = True
elif (parsed.bozo and
isinstance(exc, _feedparser.CharacterEncodingOverride)):
_LOG.warning(
'incorrectly declared encoding: {}: {}'.format(exc, self))
warned = True
elif (parsed.bozo and isinstance(exc, _feedparser.NonXMLContentType)):
_LOG.warning('non XML Content-Type: {}: {}'.format(exc, self))
warned = True
elif parsed.bozo or exc:
if exc is None:
exc = "can't process"
_LOG.error('processing error: {}: {}'.format(exc, self))
warned = True
if (not warned and
status in [200, 302] and
not parsed.entries and
not version):
def bozo_checker(bozo_exception):
"""
This function checks if the bozo exception is a critical exception or
a exception that can be ignored.
:param bozo_exception The bozo exception to test.
"""
# Will return false by default, so only whitelisted exceptions will
# return true.
return_val = False
# This exception is raised when the feed was decoded and parsed using a different encoding than what the server/feed
# itself claimed it to be.
if isinstance(bozo_exception, feedparser.CharacterEncodingOverride):
return_val = True
return return_val