Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_document_context_manager_update_failure_on_error(self):
"""
Test that the document context manager skips document update if there
is an error.
"""
# Create the document.
doc = Document(self.db, 'julia006')
doc['name'] = 'julia'
doc['age'] = 6
doc.save()
# Make a document update and then raise an error.
with self.assertRaises(ZeroDivisionError), Document(self.db, 'julia006') as doc:
doc['age'] = 7
raise ZeroDivisionError()
# Assert the change persists locally.
self.assertEqual(doc['age'], 7)
# Assert the document has not been saved to remote server.
self.assertTrue(doc['_rev'].startswith('1-'))
self.assertEqual(self.db['julia006']['age'], 6)
def test_fetch_existing_document_with_docid(self):
"""
Test fetching document content from an existing document
"""
doc = Document(self.db, 'julia006')
doc['name'] = 'julia'
doc['age'] = 6
doc.create()
new_doc = Document(self.db, 'julia006')
new_doc.fetch()
self.assertEqual(new_doc, doc)
def test_list_field_remove_failure(self):
"""
Test the static helper method to remove from a list
field errors as expected.
"""
doc = Document(self.db)
doc.field_set(doc, 'name', 'julia')
try:
doc.list_field_remove(doc, 'name', 'julia')
self.fail('Above statement should raise an Exception')
except CloudantDocumentException as err:
self.assertEqual(str(err), 'The field name is not a list.')
self.assertEqual(doc, {'name': 'julia'})
self.db,
self.target_db,
repl_id
)
self.replication_ids.append(repl_id)
# Test that the replication document was created
expected_keys = ['_id', '_rev', 'source', 'target', 'user_ctx']
# If Admin Party mode then user_ctx will not be in the key list
if self.client.admin_party or self.client.is_iam_authenticated:
expected_keys.pop()
self.assertTrue(all(x in list(repl_doc.keys()) for x in expected_keys))
self.assertEqual(repl_doc['_id'], repl_id)
self.assertTrue(repl_doc['_rev'].startswith('1-'))
# Now that we know that the replication document was created,
# check that the replication timed out.
repl_doc = Document(self.replicator.database, repl_id)
repl_doc.fetch()
if repl_doc.get('_replication_state') not in ('completed', 'error'):
# assert that a connection error is thrown because the read timed out
with self.assertRaises(ConnectionError) as cm:
changes = self.replicator.database.changes(
feed='continuous')
for change in changes:
continue
self.assertTrue(str(cm.exception).endswith('Read timed out.'))
def test_list_field_append_failure(self):
"""
Test the static helper method to append to a list
field errors as expected.
"""
doc = Document(self.db)
doc.field_set(doc, 'name', 'julia')
try:
doc.list_field_append(doc, 'name', 'isabel')
self.fail('Above statement should raise an Exception')
except CloudantDocumentException as err:
self.assertEqual(str(err), 'The field name is not a list.')
self.assertEqual(doc, {'name': 'julia'})
def test_document_context_manager_doc_create(self):
"""
Test that the document context manager will create a doc if it does
not yet exist.
"""
with Document(self.db, 'julia006') as doc:
doc['name'] = 'julia'
doc['age'] = 6
self.assertTrue(doc['_rev'].startswith('1-'))
self.assertEqual(self.db['julia006'], doc)
def test_create_document_with_docid_encoded_url(self):
"""
Test creating a document providing an id that has an encoded url
"""
doc = Document(self.db, 'http://example.com')
doc['name'] = 'julia'
doc['age'] = 6
self.assertFalse(doc.exists())
self.assertIsNone(doc.get('_rev'))
doc.create()
self.assertTrue(doc.exists())
self.assertTrue(doc.get('_rev').startswith('1-'))
if obj['_type'] == 'datetime':
return datetime.strptime(obj['value'], dt_format)
return obj
doc = Document(self.db, encoder=DTEncoder)
doc['name'] = 'julia'
doc['dt'] = datetime(2018, 7, 9, 15, 11, 10, 0)
doc.save()
raw_doc = self.db.all_docs(include_docs=True)['rows'][0]['doc']
self.assertEquals(raw_doc['name'], 'julia')
self.assertEquals(raw_doc['dt']['_type'], 'datetime')
self.assertEquals(raw_doc['dt']['value'], '2018-07-09T15:11:10')
doc2 = Document(self.db, doc['_id'], decoder=DTDecoder)
doc2.fetch()
self.assertEquals(doc2['dt'], doc['dt'])
def test_fetch_existing_document_with_docid_encoded_url(self):
"""
Test fetching document content from an existing document where the
document id requires an encoded url
"""
doc = Document(self.db, 'http://example.com')
doc['name'] = 'julia'
doc['age'] = 6
doc.create()
new_doc = Document(self.db, 'http://example.com')
new_doc.fetch()
self.assertEqual(new_doc, doc)
def delete(self, rev, **kwargs):
"""
Delete the given revision of the current document. For example:
rev = doc.get().result().json()['_rev']
doc.delete(rev)
"""
return super(Document, self).delete(params={'rev': rev}, **kwargs)