Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_collection(self):
self.initOplogThread()
pymongo.collection.Collection(
self.primary_conn['test'], 'test', create=True)
assert_soon(lambda: self.docman.commands)
command = self.docman.commands[0]
self.assertEqual(command['create'], 'test')
def write_concern_collection(self):
if client_context.version.at_least(3, 3, 9) and client_context.is_rs:
with self.assertRaises(WriteConcernError):
# Unsatisfiable write concern.
yield Collection(
self.db, 'test',
write_concern=WriteConcern(w=len(client_context.nodes) + 1))
else:
yield self.db.test
def create(database, collection_name):
"""
Force creation of a collection.
:param database: a MongoDB database object
:param collection_name: the name of the new collection
:return collection: a MongoDB collection
"""
return pymongo.collection.Collection(database=database, name=collection_name, create=True)
def __new__(cls, name, bases, attrs):
# judge if the target class is Document
if not (len(bases) == 1 and bases[0] is StructuredDict):
if not ('col' in attrs and isinstance(attrs['col'], Collection)):
raise errors.ConnectionError(
'col of a Document is not set properly, passing: %s %s' %
(attrs['col'], type(attrs['col'])))
return type.__new__(cls, name, bases, attrs)
default) the :attr:`codec_options` of this :class:`Database` is
used.
- `read_preference` (optional): The read preference to use. If
``None`` (the default) the :attr:`read_preference` of this
:class:`Database` is used. See :mod:`~pymongo.read_preferences`
for options.
- `write_concern` (optional): An instance of
:class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
default) the :attr:`write_concern` of this :class:`Database` is
used.
- `read_concern` (optional): An instance of
:class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
default) the :attr:`read_concern` of this :class:`Database` is
used.
"""
return Collection(
self, name, False, codec_options, read_preference,
write_concern, read_concern)
Prameters
---------
uri: obj
Argument to be distinguished
Returns
-------
bool: maybe_mongo
"""
try:
import pymongo
except ImportError:
return False
if isinstance(uri, (pymongo.mongo_client.MongoClient,
pymongo.collection.Collection)):
msg = ('To initialize MongoBackend, pymongo Database '
'instance must be provided, given: {}{}')
raise ValueError(msg.format(uri, type(uri)))
elif isinstance(uri, pymongo.database.Database):
return True
elif not isinstance(uri, str):
return False
return uri.startswith('mongodb://')
kwargs['filter'] = {'datetime': {'$lte': end}}
elif end:
kwargs['filter'] = {'datetime': {'$lte': end}}
if isinstance(collection, (str, unicode)):
db = self.client[db] if db else self.db
return self._read(db[collection], **kwargs)
elif isinstance(collection, pymongo.collection.Collection):
return self._read(collection, **kwargs)
elif isinstance(collection, (list, tuple)):
db = self.client[db] if db else self.db
panel = {}
for col in collection:
if isinstance(col, (str, unicode)):
panel[col] = self._read(db[col], **kwargs)
elif isinstance(col, pymongo.collection.Collection):
panel[col.name] = self._read(col, **kwargs)
return pd.Panel.from_dict(panel)
def getFromDatabase(self):
import base
connection = base.getDBConnection()
dbName = 'Lfunction'
dbColl = 'LemurellMaassHighDegree' #Probably later a choice
db = pymongo.database.Database(connection, dbName)
collection = pymongo.collection.Collection(db,dbColl)
self.dbEntry = collection.find_one({'_id': self.id})
self.lcalcfile = self.dbEntry['lcalcfile']
self.parseLcalcfile()
self.family = self.dbEntry['family']
self.group = self.dbEntry['group']
self.field = self.dbEntry['field']
self.objectName = self.dbEntry['objectName']
self.texnamecompleted1ms = self.dbEntry['texnamecompleted1ms']
self.texname = self.dbEntry['texname']
self.texnamecompleteds = self.dbEntry['texnamecompleteds']
self.title = self.dbEntry['title']
self.citation = self.dbEntry['citation']
self.credit = self.dbEntry['credit']
"""Returns the collection with the required indexes.
Args:
collection_name (pymongo.collection.Collection)
Returns:
pymongo.collection.Collection
"""
collection = self._client_master[
self._database_master['database']][collection_name]
if collection_name in self._indexed:
return collection
try:
indexes = collection.index_information()
# When the table does not exist it needs to be created.
except OperationFailure:
collection = Collection(
database=self._client_master[self._database_master],
name=collection_name,
create=True)
indexes = collection.index_information()
for index_name in indexes:
if str(index_name).startswith('time'):
self._indexed.append(collection_name)
return collection
# If index was found create one and update indexed list.
collection.create_index([('time', ASCENDING)], unique=True)
self._indexed.append(collection_name)
return collection
def __init__(self, collection):
import pymongo
assert isinstance(collection,
pymongo.collection.Collection)
self.collection = collection
collection.create_index("wechat_id")