Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class AgnosticGridOut(object):
"""Class to read data out of GridFS.
MotorGridOut supports the same attributes as PyMongo's
:class:`~gridfs.grid_file.GridOut`, such as ``_id``, ``content_type``,
etc.
You don't need to instantiate this class directly - use the
methods provided by :class:`~motor.MotorGridFSBucket`. If it **is**
instantiated directly, call :meth:`open`, :meth:`read`, or
:meth:`readline` before accessing its attributes.
"""
__motor_class_name__ = 'MotorGridOut'
__delegate_class__ = gridfs.GridOut
_ensure_file = AsyncCommand()
_id = MotorGridOutProperty()
aliases = MotorGridOutProperty()
chunk_size = MotorGridOutProperty()
close = MotorGridOutProperty()
content_type = MotorGridOutProperty()
filename = MotorGridOutProperty()
length = MotorGridOutProperty()
md5 = MotorGridOutProperty()
metadata = MotorGridOutProperty()
name = MotorGridOutProperty()
read = AsyncRead()
readable = DelegateMethod()
readchunk = AsyncRead()
readline = AsyncRead()
seek = DelegateMethod()
seekable = DelegateMethod()
return self.io_loop
class AgnosticGridFSBucket(object):
__motor_class_name__ = 'MotorGridFSBucket'
__delegate_class__ = gridfs.GridFSBucket
delete = AsyncCommand()
download_to_stream = AsyncCommand()
download_to_stream_by_name = AsyncCommand()
open_download_stream = AsyncCommand().wrap(gridfs.GridOut)
open_download_stream_by_name = AsyncCommand().wrap(gridfs.GridOut)
open_upload_stream = DelegateMethod().wrap(gridfs.GridIn)
open_upload_stream_with_id = DelegateMethod().wrap(gridfs.GridIn)
rename = AsyncCommand()
upload_from_stream = AsyncCommand()
upload_from_stream_with_id = AsyncCommand()
def __init__(self, database, bucket_name="fs", disable_md5=False,
chunk_size_bytes=DEFAULT_CHUNK_SIZE, write_concern=None,
read_preference=None, collection=None):
"""Create a handle to a GridFS bucket.
Raises :exc:`~pymongo.errors.ConfigurationError` if `write_concern`
is not acknowledged.
This class conforms to the `GridFS API Spec
`_
for MongoDB drivers.
:Parameters:
- `database`: database to use.
self.__module__)
return klass(self, obj.name, _delegate=obj)
elif obj.__class__ is Database:
return self.__class__(self._client, obj.name, _delegate=obj)
else:
return obj
def get_io_loop(self):
return self._client.get_io_loop()
class AgnosticCollection(AgnosticBaseProperties):
__motor_class_name__ = 'MotorCollection'
__delegate_class__ = Collection
bulk_write = AsyncCommand(doc=bulk_write_doc)
count_documents = AsyncRead()
create_index = AsyncCommand()
create_indexes = AsyncCommand(doc=create_indexes_doc)
delete_many = AsyncCommand(doc=delete_many_doc)
delete_one = AsyncCommand(doc=delete_one_doc)
distinct = AsyncRead()
drop = AsyncCommand(doc=drop_doc)
drop_index = AsyncCommand()
drop_indexes = AsyncCommand()
estimated_document_count = AsyncCommand()
find_one = AsyncRead(doc=find_one_doc)
find_one_and_delete = AsyncCommand(doc=find_one_and_delete_doc)
find_one_and_replace = AsyncCommand(doc=find_one_and_replace_doc)
find_one_and_update = AsyncCommand(doc=find_one_and_update_doc)
full_name = ReadOnlyProperty()
index_information = AsyncRead(doc=index_information_doc)
return self.__class__(self._client, obj.name, _delegate=obj)
else:
return obj
def get_io_loop(self):
return self._client.get_io_loop()
class AgnosticCollection(AgnosticBaseProperties):
__motor_class_name__ = 'MotorCollection'
__delegate_class__ = Collection
bulk_write = AsyncCommand(doc=bulk_write_doc)
count_documents = AsyncRead()
create_index = AsyncCommand()
create_indexes = AsyncCommand(doc=create_indexes_doc)
delete_many = AsyncCommand(doc=delete_many_doc)
delete_one = AsyncCommand(doc=delete_one_doc)
distinct = AsyncRead()
drop = AsyncCommand(doc=drop_doc)
drop_index = AsyncCommand()
drop_indexes = AsyncCommand()
estimated_document_count = AsyncCommand()
find_one = AsyncRead(doc=find_one_doc)
find_one_and_delete = AsyncCommand(doc=find_one_and_delete_doc)
find_one_and_replace = AsyncCommand(doc=find_one_and_replace_doc)
find_one_and_update = AsyncCommand(doc=find_one_and_update_doc)
full_name = ReadOnlyProperty()
index_information = AsyncRead(doc=index_information_doc)
inline_map_reduce = AsyncRead()
insert_many = AsyncWrite(doc=insert_many_doc)
insert_one = AsyncCommand(doc=insert_one_doc)
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
"""), globals(), locals())
def get_io_loop(self):
return self.io_loop
class AgnosticGridFSBucket(object):
__motor_class_name__ = 'MotorGridFSBucket'
__delegate_class__ = gridfs.GridFSBucket
delete = AsyncCommand()
download_to_stream = AsyncCommand()
download_to_stream_by_name = AsyncCommand()
open_download_stream = AsyncCommand().wrap(gridfs.GridOut)
open_download_stream_by_name = AsyncCommand().wrap(gridfs.GridOut)
open_upload_stream = DelegateMethod().wrap(gridfs.GridIn)
open_upload_stream_with_id = DelegateMethod().wrap(gridfs.GridIn)
rename = AsyncCommand()
upload_from_stream = AsyncCommand()
upload_from_stream_with_id = AsyncCommand()
def __init__(self, database, bucket_name="fs", disable_md5=False,
chunk_size_bytes=DEFAULT_CHUNK_SIZE, write_concern=None,
read_preference=None, collection=None):
"""Create a handle to a GridFS bucket.
Raises :exc:`~pymongo.errors.ConfigurationError` if `write_concern`
is not acknowledged.
collection = client.db.collection
async with await client.start_session() as s:
async with s.start_transaction():
await collection.delete_one({'x': 1}, session=s)
await collection.insert_one({'x': 2}, session=s)
.. versionadded:: 2.0
"""
__motor_class_name__ = 'MotorClientSession'
__delegate_class__ = ClientSession
commit_transaction = AsyncCommand()
abort_transaction = AsyncCommand()
end_session = AsyncCommand()
cluster_time = ReadOnlyProperty()
has_ended = ReadOnlyProperty()
in_transaction = ReadOnlyProperty()
options = ReadOnlyProperty()
operation_time = ReadOnlyProperty()
session_id = ReadOnlyProperty()
advance_cluster_time = DelegateMethod()
advance_operation_time = DelegateMethod()
def __init__(self, delegate, motor_client):
AgnosticBase.__init__(self, delegate=delegate)
self._client = motor_client
def get_io_loop(self):
return self._client.get_io_loop()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
"""), globals(), locals())
def get_io_loop(self):
return self.io_loop
class AgnosticGridFSBucket(object):
__motor_class_name__ = 'MotorGridFSBucket'
__delegate_class__ = gridfs.GridFSBucket
delete = AsyncCommand()
download_to_stream = AsyncCommand()
download_to_stream_by_name = AsyncCommand()
open_download_stream = AsyncCommand().wrap(gridfs.GridOut)
open_download_stream_by_name = AsyncCommand().wrap(gridfs.GridOut)
open_upload_stream = DelegateMethod().wrap(gridfs.GridIn)
open_upload_stream_with_id = DelegateMethod().wrap(gridfs.GridIn)
rename = AsyncCommand()
upload_from_stream = AsyncCommand()
upload_from_stream_with_id = AsyncCommand()
def __init__(self, database, bucket_name="fs", disable_md5=False,
chunk_size_bytes=DEFAULT_CHUNK_SIZE, write_concern=None,
read_preference=None, collection=None):
"""Create a handle to a GridFS bucket.
Raises :exc:`~pymongo.errors.ConfigurationError` if `write_concern`
self.delegate.__exit__(exc_type, exc_val, exc_tb)
"""), globals(), locals())
def __enter__(self):
raise AttributeError("Use Motor sessions like 'async with await"
" client.start_session()', not 'with'")
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class AgnosticDatabase(AgnosticBaseProperties):
__motor_class_name__ = 'MotorDatabase'
__delegate_class__ = Database
command = AsyncCommand(doc=cmd_doc)
create_collection = AsyncCommand().wrap(Collection)
current_op = AsyncRead(doc=current_op_doc)
dereference = AsyncRead()
drop_collection = AsyncCommand().unwrap('MotorCollection')
get_collection = DelegateMethod().wrap(Collection)
list_collection_names = AsyncRead(doc=list_collection_names_doc)
list_collections = AsyncRead()
name = ReadOnlyProperty()
profiling_info = AsyncRead()
profiling_level = AsyncRead()
set_profiling_level = AsyncCommand()
validate_collection = AsyncRead().unwrap('MotorCollection')
with_options = DelegateMethod().wrap(Database)
incoming_manipulators = ReadOnlyProperty()
incoming_copying_manipulators = ReadOnlyProperty()
find_one_and_delete = AsyncCommand(doc=find_one_and_delete_doc)
find_one_and_replace = AsyncCommand(doc=find_one_and_replace_doc)
find_one_and_update = AsyncCommand(doc=find_one_and_update_doc)
full_name = ReadOnlyProperty()
index_information = AsyncRead(doc=index_information_doc)
inline_map_reduce = AsyncRead()
insert_many = AsyncWrite(doc=insert_many_doc)
insert_one = AsyncCommand(doc=insert_one_doc)
map_reduce = AsyncCommand(doc=mr_doc).wrap(Collection)
name = ReadOnlyProperty()
options = AsyncRead()
reindex = AsyncCommand()
rename = AsyncCommand()
replace_one = AsyncCommand(doc=replace_one_doc)
update_many = AsyncCommand(doc=update_many_doc)
update_one = AsyncCommand(doc=update_one_doc)
with_options = DelegateMethod().wrap(Collection)
_async_aggregate = AsyncRead(attr_name='aggregate')
_async_aggregate_raw_batches = AsyncRead(attr_name='aggregate_raw_batches')
_async_list_indexes = AsyncRead(attr_name='list_indexes')
def __init__(self, database, name, codec_options=None,
read_preference=None, write_concern=None, read_concern=None,
_delegate=None):
db_class = create_class_with_framework(
AgnosticDatabase, self._framework, self.__module__)
if not isinstance(database, db_class):
raise TypeError("First argument to MotorCollection must be "
"MotorDatabase, not %r" % database)
max_bson_size = ReadOnlyProperty()
max_idle_time_ms = ReadOnlyProperty()
max_message_size = ReadOnlyProperty()
max_pool_size = ReadOnlyProperty()
max_write_batch_size = ReadOnlyProperty()
min_pool_size = ReadOnlyProperty()
nodes = ReadOnlyProperty()
PORT = ReadOnlyProperty()
primary = ReadOnlyProperty()
read_concern = ReadOnlyProperty()
retry_reads = ReadOnlyProperty()
retry_writes = ReadOnlyProperty()
secondaries = ReadOnlyProperty()
server_info = AsyncRead()
server_selection_timeout = ReadOnlyProperty()
start_session = AsyncCommand(doc=start_session_doc).wrap(ClientSession)
unlock = AsyncCommand()
def __init__(self, *args, **kwargs):
"""Create a new connection to a single MongoDB instance at *host:port*.
Takes the same constructor arguments as
:class:`~pymongo.mongo_client.MongoClient`, as well as:
:Parameters:
- `io_loop` (optional): Special event loop
instance to use instead of default
"""
if 'io_loop' in kwargs:
io_loop = kwargs.pop('io_loop')
self._framework.check_event_loop(io_loop)
else: