Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
SessionManager.refresh_materialized_views(self._db)
source = DataSource.lookup(self._db, self.datasource)
metadata = Metadata(source)
mock_api = MockNoveListAPI(self._db)
metadata.recommendations = [same_author.license_pools[0].identifier]
mock_api.setup(metadata)
# A grouped feed is returned with all of the related books
with self.request_context_with_library('/'):
response = self.manager.work_controller.related(
self.identifier.type, self.identifier.identifier,
novelist_api=mock_api
)
eq_(200, response.status_code)
feed = feedparser.parse(response.data)
eq_(5, len(feed['entries']))
def collection_link(entry):
[link] = [l for l in entry['links'] if l['rel']=='collection']
return link['title'], link['href']
# This feed contains five books: one recommended,
# one in the same series, and two by the same author.
recommendations = []
same_series = []
same_contributor = []
feeds_with_original_book = []
for e in feed['entries']:
for link in e['links']:
if link['rel'] != 'collection':
continue
def add_feed(sender):
url = console.input_alert('', "Enter RSS feed URL:", 'http://www.macstories.net/feed/')
result = urlparse.urlparse(url)
if result.netloc == '':
url = 'http://www.macstories.net/feed/'
indicator = ui.ActivityIndicator()
indicator.center = navigation_view.center
navigation_view.add_subview(indicator)
indicator.bring_to_front()
indicator.start()
feed = feedparser.parse(url)
title = feed['feed']['title']
conn = sqlite3.connect('feeds.db')
conn.execute('INSERT INTO feeds VALUES (?, ?)', (title, url))
conn.commit()
feeds = []
for title, url in conn.execute('SELECT * FROM feeds ORDER BY title'):
feeds.append({'title': title, 'url': url })
conn.close()
feed_list_controller.feeds = feeds
table_view.reload()
indicator.stop()
navigation_view.remove_subview(indicator)
def tick(feeds, opts, formatter, seen_id_hashes, iteration, stream=sys.stdout):
for url, last_element_info in feeds.items():
etag, last_mtime, last_update = last_element_info
log.debug('parsing: %r', url)
log.debug('etag: %s', etag)
log.debug('mtime: %s', date_fmt(last_mtime))
feed = feedparser.parse(url, etag=etag, modified=last_mtime)
if feed.bozo == 1:
safeexc = (feedparser.CharacterEncodingOverride,)
if not isinstance(feed.bozo_exception, safeexc):
msg = 'feed error %r:\n%s'
die(msg, opts.nofail, url, feed.bozo_exception)
if iteration == 1 and isinstance(opts.initial, int):
entries = feed.entries[:opts.initial]
else:
entries = feed.entries
if opts.newer:
log.debug('showing entries older than %s', date_fmt(last_update))
entries = [entry for entry in entries if entry.date_parsed > opts.newer]
"""
from feedjack import models
ret_values = {
ENTRY_NEW:0,
ENTRY_UPDATED:0,
ENTRY_SAME:0,
ENTRY_ERR:0}
prints(u'[%d] Processing feed %s' % (self.feed.id,
self.feed.feed_url))
# we check the etag and the modified time to save bandwith and
# avoid bans
try:
self.fpf = feedparser.parse(self.feed.feed_url,
agent=USER_AGENT,
etag=self.feed.etag)
except:
prints('! ERROR: feed cannot be parsed')
return FEED_ERRPARSE, ret_values
if hasattr(self.fpf, 'status'):
if self.options.verbose:
prints(u'[%d] HTTP status %d: %s' % (self.feed.id,
self.fpf.status,
self.feed.feed_url))
if self.fpf.status == 304:
# this means the feed has not changed
if self.options.verbose:
prints('[%d] Feed has not changed since ' \
'last check: %s' % (self.feed.id,
def object_from_feed_entry(feed_url, item_url):
try:
feed = feedparser.parse(feed_url)
except IndexError, exc:
log.debug("Got a %s parsing feed %s: %s", type(exc).__name__, feed_url, str(exc))
return None
matching_entries = [entry for entry in feed.entries if getattr(entry, 'link', None) == item_url]
if len(matching_entries) > 0:
entry = matching_entries[0]
else:
return None
obj = Object(
service='',
foreign_id=item_url,
title=entry.title,
permalink_url=item_url,
)
#!/usr/bin/env python
import subprocess
import feedparser
import string
rss = feedparser.parse('http://www.reddit.com/r/technology/.rss')
print(rss['feed']['title'])
subprocess.call("echo reading technology news... | $VOICE", shell=True)
for post in rss.entries:
headline = post.title
exclude = set(string.punctuation)
headline = ''.join(ch for ch in headline if ch not in exclude)
subprocess.call("echo \"" + headline + "\n\" | tee /dev/tty | $VOICE", shell=True)
def _refresh(self):
d = feedparser.parse(self.feed_url)
if "rss" in d.version:
return self._parse_rss(d)
else:
return self._parse_atom(d)
def suck(save_item, handle_error, source):
feeds = [
{
'url': 'http://alerts.weather.gov/cap/us.php?x=0',
'tags': ['severe-weather']
}
]
for feed in feeds:
d = feedparser.parse(feed['url'])
for entry in d.entries:
if 'lastRetrieved' not in source or parse(record.published) > source['lastRetrieved']:
item = transform(entry, feed['tags'])
save_item(item)
return datetime.now()
# Tweek feedparser to accept XML as content
feedparser._FeedParserMixin.unknown_starttag = feedparser_unknown_starttag
feedparser._FeedParserMixin.unknown_endtag = feedparser_unknown_endtag
feedparser._sanitizeHTML = lambda source, encoding: source
self.logger.debug('Starting')
# Does not accept events pre-dating the startup
self.last_event = time.gmtime()
if self.url == None:
raise Exception('Attribute url must be set')
while True:
self.logger.debug("Reading feed")
feed = feedparser.parse(self.url)
last_update = self.last_event
new_events=0
old_events=0
off = datetime.datetime.now() - datetime.datetime.utcnow()
for entry in feed.entries:
if entry.updated_parsed > self.last_event:
new_events = new_events + 1
event = entry.content[0]['value']
timestamp = (datetime.datetime(*(entry.updated_parsed[0:6]))+off).replace(microsecond=0)
self.process_message(event, timestamp)
else:
old_events = old_events + 1
if entry.updated_parsed > last_update:
def process_all_rss(reprocess=False):
"""Gather all RSS feeds and articles, then process."""
sources = list()
logger.debug("Collecting sources")
monitors = mongo.db[app.config['MONITORS_COLLECTION']]
for item in monitors.find({'active': True}):
sources.append(item['metadata'].get('rss_link'))
contents = [feedparser.parse(x) for x in sources]
logger.debug("Processing sources")
for source in contents:
for idx, item in enumerate(source.get('entries')):
response = get_article(item, source['href'], reprocess)
if response['from_store'] or reprocess:
continue
clean_link = response['article']['feed_source']
monitors.update({'metadata.rss_link': clean_link},
{'$set': {'checked': now_time()}})
correct_counts()