Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_invalid_source_couchdb(self):
"""
Ensure that a CouchDB client cannot be used with an infinite feed.
"""
with self.assertRaises(CloudantFeedException) as cm:
invalid_feed = [x for x in InfiniteFeed(self.client)]
self.assertEqual(str(cm.exception),
'Infinite _db_updates feed not supported for CouchDB.')
def test_infinite_feed(self):
"""
Test that an infinite feed will continue to issue multiple requests
until stopped. This check is performed in combination by creating
documents 3 separate times and checking that the "_start" method on the
InfiniteFeed object was called 3 times as well.
"""
self.populate_db_with_documents()
feed = InfiniteFeed(self.db, timeout=100)
# Create a proxy for the feed._start method so that we can track how
# many times it has been called.
feed._start = MethodCallCount(feed._start)
changes = list()
for change in feed:
self.assertSetEqual(set(change.keys()), set(['seq', 'changes', 'id']))
changes.append(change)
if len(changes) in (100, 200):
sleep(1) # 1 second > .1 second (timeout)
self.populate_db_with_documents(off_set=len(changes))
elif len(changes) == 300:
feed.stop()
expected = set(['julia{0:03d}'.format(i) for i in range(300)])
self.assertSetEqual(set([x['id'] for x in changes]), expected)
def test_constructor_no_feed_option(self):
"""
Test constructing an infinite feed when no feed option is set
"""
feed = InfiniteFeed(self.db, chunk_size=1, timeout=100)
self.assertEqual(feed._url, '/'.join([self.db.database_url, '_changes']))
self.assertIsInstance(feed._r_session, Session)
self.assertFalse(feed._raw_data)
self.assertDictEqual(feed._options, {'feed': 'continuous', 'timeout': 100})
self.assertEqual(feed._chunk_size, 1)
def test_infinite_db_updates_feed(self):
"""
Test that an _db_updates infinite feed will continue to issue multiple
requests until stopped. Since we do not have control over updates
happening within the account as we do within a database, this test is
stopped after 15 database creations regardless. Within that span of
time we expect that the feed would have been restarted at least once.
"""
feed = InfiniteFeed(self.client, since='now', timeout=100)
# Create a proxy for the feed._start method so that we can track how
# many times it has been called.
feed._start = MethodCallCount(feed._start)
new_dbs = list()
try:
new_dbs.append(self.client.create_database(self.dbname()))
for change in feed:
self.assertTrue(all(x in change for x in ('seq', 'type')))
new_dbs.append(self.client.create_database(self.dbname()))
if feed._start.called_count >= 3 and len(new_dbs) >= 3:
feed.stop()
if len(new_dbs) >= 15:
# We stop regardless after 15 databases have been created
feed.stop()
def test_db_updates_infinite_feed_call(self):
"""
Test that infinite_db_updates() method call constructs and returns an
InfiniteFeed object
"""
try:
self.client.connect()
db_updates = self.client.infinite_db_updates()
self.assertIsInstance(db_updates, InfiniteFeed)
self.assertEqual(
db_updates._url, '/'.join([self.client.server_url, '_db_updates']))
self.assertIsInstance(db_updates._r_session, requests.Session)
self.assertFalse(db_updates._raw_data)
self.assertDictEqual(db_updates._options, {'feed': 'continuous'})
finally:
self.client.disconnect()
def test_constructor_with_invalid_feed_option(self):
"""
Test constructing an infinite feed when a feed option is set
to an invalid value raises an exception.
"""
feed = InfiniteFeed(self.db, feed='longpoll')
with self.assertRaises(CloudantArgumentError) as cm:
invalid_feed = [x for x in feed]
self.assertEqual(
str(cm.exception),
'Invalid infinite feed option: longpoll. Must be set to continuous.'
)
def test_constructor_db_updates(self):
"""
Test constructing an infinite _db_updates feed.
"""
feed = InfiniteFeed(self.client, chunk_size=1, timeout=100, feed='continuous')
self.assertEqual(feed._url, '/'.join([self.client.server_url, '_db_updates']))
self.assertIsInstance(feed._r_session, Session)
self.assertFalse(feed._raw_data)
self.assertDictEqual(feed._options, {'feed': 'continuous', 'timeout': 100})
self.assertEqual(feed._chunk_size, 1)
def test_changes_inifinite_feed_call(self):
"""
Test that infinite_changes() method call constructs and returns an
InfiniteFeed object
"""
changes = self.db.infinite_changes()
self.assertIsInstance(changes, InfiniteFeed)
self.assertEqual(changes._url, '/'.join([self.db.database_url, '_changes']))
self.assertIsInstance(changes._r_session, requests.Session)
self.assertFalse(changes._raw_data)
self.assertDictEqual(changes._options, {'feed': 'continuous'})
def test_constructor_with_feed_option(self):
"""
Test constructing an infinite feed when the continuous feed
option is set.
"""
feed = InfiniteFeed(self.db, chunk_size=1, timeout=100, feed='continuous')
self.assertEqual(feed._url, '/'.join([self.db.database_url, '_changes']))
self.assertIsInstance(feed._r_session, Session)
self.assertFalse(feed._raw_data)
self.assertDictEqual(feed._options, {'feed': 'continuous', 'timeout': 100})
self.assertEqual(feed._chunk_size, 1)