Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_accessing_objects_with_indexes_error(self):
insert_result = self.db.company.insert_many([{'name': 'Foo'},
{'name': 'Foo'}]) # Force 2 doc with same name
REF_OID = insert_result.inserted_ids[0]
self.db.user.insert_one({'company': REF_OID}) # Force 2 doc with same name
class Company(Document):
name = StringField(unique=True)
class User(Document):
company = ReferenceField(Company)
# Ensure index creation exception aren't swallowed (#1688)
with self.assertRaises(DuplicateKeyError):
User.objects().select_related()
def test_update_bad(self):
# Violate a unique index, make sure we handle error well
coll = self.db.unique_collection
yield from coll.create_index('s', unique=True)
try:
yield from coll.insert_many([{'s': 1}, {'s': 2}])
with self.assertRaises(DuplicateKeyError):
yield from coll.update_one({'s': 2}, {'$set': {'s': 1}})
finally:
yield from coll.drop()
await Post.q(db).create_indexes()
await Post(title='xxx', author='totti').save(db, do_insert=True)
yyy = await Post(title='yyy', author='totti').save(db, do_insert=True)
data = Post(title='xxx', author='totti').to_mongo()
with pytest.raises(pymongo.errors.DuplicateKeyError):
await Post.q(db).insert_one(data)
with pytest.raises(DuplicateKeyError) as excinfo:
await Post.q(db).insert_one(data)
assert excinfo.value.index_name == 'title_index'
yyy_data = yyy.to_mongo()
yyy_data['title'] = 'xxx'
with pytest.raises(pymongo.errors.DuplicateKeyError):
await Post.q(db).replace_one({Post.title.s: 'yyy'}, yyy_data)
with pytest.raises(DuplicateKeyError) as excinfo:
await Post.q(db).replace_one({Post.title.s: 'yyy'}, yyy_data)
assert excinfo.value.index_name == 'title_index'
with pytest.raises(pymongo.errors.DuplicateKeyError):
await Post.q(db).update_one({Post.title.s: 'yyy'},
{'$set': {Post.title.s: 'xxx'}})
with pytest.raises(DuplicateKeyError) as excinfo:
await Post.q(db).update_one({Post.title.s: 'yyy'},
{'$set': {Post.title.s: 'xxx'}})
assert excinfo.value.index_name == 'title_index'
with pytest.raises(pymongo.errors.DuplicateKeyError):
await Post.q(db).update_many({}, {'$set': {Post.title.s: 'xxx'}})
with pytest.raises(DuplicateKeyError) as excinfo:
overlapping content.
:param unit: the unit to be updated
:type unit: pulp.plugins.model.Unit
:param pulp_unit: the unit to be updated, as a dict
:type pulp_unit: dict
:return: id of the updated unit
:rtype: basestring
"""
content_manager = manager_factory.content_manager()
try:
unit_id = content_manager.add_content_unit(unit.type_id, None, pulp_unit)
self._added_count += 1
return unit_id
except DuplicateKeyError:
logger.debug(_('cannot add unit; already exists. updating instead.'))
return self._update_unit(unit, pulp_unit)
def try_replace_one(db, coll_name, query, update, upsert=False):
"""
Mongo does not see replace w/ upsert as an atomic action:
https://jira.mongodb.org/browse/SERVER-14322
This function will try a replace_one operation, returning the result and if the operation succeeded.
"""
try:
result = db[coll_name].replace_one(query, update, upsert=upsert)
except DuplicateKeyError:
return result, False
else:
return result, True
async def archive(self, payload: Payload, request: Request) -> ArchiverResponse:
"""
Archive a payload to MongoDB
"""
self._connect_gridfs()
sha1 = get_sha1(payload.content)
meta = payload.payload_meta.extra_data
meta['_id'] = sha1
try:
with self.gridfs_db.new_file(**meta) as fp:
fp.write(payload.content)
except (DuplicateKeyError, FileExists):
pass
return ArchiverResponse(meta)
def insert(self, doc_or_docs, safe=False, manipulate=True):
if not isinstance(doc_or_docs, list):
doc_or_docs = [ doc_or_docs ]
for doc in doc_or_docs:
doc = bcopy(doc)
bson_safe(doc)
_id = doc.get('_id', ())
if _id == () and manipulate:
_id = doc['_id'] = bson.ObjectId()
if _id in self._data:
if safe: raise DuplicateKeyError('duplicate ID on insert')
continue
self._index(doc)
self._data[_id] = bcopy(doc)
return _id
def invite_action(self, *args, **kwargs):
action = self.get_body_argument("action")
if action == "create":
code = random_str()
try:
yield self.db.invite.insert({
"code": code,
"used": False,
"user": "",
"time": time.time()
})
except pymongo.errors.DuplicateKeyError:
pass
self.redirect("/manage/invite")
elif action == "delete":
code = self.get_body_argument("code")
yield self.db.invite.remove({
"code": code,
"used": False
})
self.redirect("/manage/invite")
elif action == "expire":
yield self.db.invite.remove({
"time": {"$lt": (time.time() - self.settings["invite_expire"])},
"used": {"$eq": False}
})
self.redirect("/manage/invite")
self.custom_error("方法错误,请重试")
except StopIteration:
new_id = 1
while True:
if obj['_type'] == 'bill':
obj['_id'] = '%s%08d' % (id_prefix, new_id)
else:
obj['_id'] = '%s%06d' % (id_prefix, new_id)
obj['_all_ids'] = [obj['_id']]
if obj['_type'] in ['person', 'legislator']:
obj['leg_id'] = obj['_id']
try:
return collection.insert(obj, safe=True)
except pymongo.errors.DuplicateKeyError:
new_id += 1
project_id=project_id,
app_config_id=app_config_id,
artifact_index_id=artifact_index_id)
if artifact_already_subscribed:
return
d = dict(user_id=user_id, project_id=project_id, app_config_id=app_config_id,
artifact_index_id=artifact_index_id, topic=topic)
sess = session(cls)
try:
mbox = cls(
type=type, frequency=dict(n=n, unit=unit),
artifact_title=artifact_title,
artifact_url=artifact_url,
**d)
sess.flush(mbox)
except pymongo.errors.DuplicateKeyError:
sess.expunge(mbox)
mbox = cls.query.get(**d)
mbox.artifact_title = artifact_title
mbox.artifact_url = artifact_url
mbox.type = type
mbox.frequency.n = n
mbox.frequency.unit = unit
sess.flush(mbox)
if not artifact_index_id:
# Unsubscribe from individual artifacts when subscribing to the tool
for other_mbox in cls.query.find(dict(
user_id=user_id, project_id=project_id, app_config_id=app_config_id)):
if other_mbox is not mbox:
other_mbox.delete()