Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
create_class_with_framework,
DelegateMethod,
motor_coroutine,
MotorCursorChainingMethod,
ReadOnlyProperty)
class AgnosticGridOutCursor(AgnosticBaseCursor):
__motor_class_name__ = 'MotorGridOutCursor'
__delegate_class__ = gridfs.GridOutCursor
add_option = MotorCursorChainingMethod()
address = ReadOnlyProperty()
collation = ReadOnlyProperty()
comment = MotorCursorChainingMethod()
distinct = AsyncRead()
explain = AsyncRead()
hint = MotorCursorChainingMethod()
limit = MotorCursorChainingMethod()
max = MotorCursorChainingMethod()
max_await_time_ms = MotorCursorChainingMethod()
max_scan = MotorCursorChainingMethod()
max_time_ms = MotorCursorChainingMethod()
min = MotorCursorChainingMethod()
remove_option = MotorCursorChainingMethod()
skip = MotorCursorChainingMethod()
sort = MotorCursorChainingMethod(doc=cursor_sort_doc)
where = MotorCursorChainingMethod()
# PyMongo's GridOutCursor inherits __die from Cursor.
_Cursor__die = AsyncCommand()
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class AgnosticDatabase(AgnosticBaseProperties):
__motor_class_name__ = 'MotorDatabase'
__delegate_class__ = Database
command = AsyncCommand(doc=cmd_doc)
create_collection = AsyncCommand().wrap(Collection)
current_op = AsyncRead(doc=current_op_doc)
dereference = AsyncRead()
drop_collection = AsyncCommand().unwrap('MotorCollection')
get_collection = DelegateMethod().wrap(Collection)
list_collection_names = AsyncRead(doc=list_collection_names_doc)
list_collections = AsyncRead()
name = ReadOnlyProperty()
profiling_info = AsyncRead()
profiling_level = AsyncRead()
set_profiling_level = AsyncCommand()
validate_collection = AsyncRead().unwrap('MotorCollection')
with_options = DelegateMethod().wrap(Database)
incoming_manipulators = ReadOnlyProperty()
incoming_copying_manipulators = ReadOnlyProperty()
outgoing_manipulators = ReadOnlyProperty()
outgoing_copying_manipulators = ReadOnlyProperty()
_async_aggregate = AsyncRead(attr_name='aggregate')
def __init__(self, client, name, **kwargs):
index_information = AsyncRead(doc=index_information_doc)
inline_map_reduce = AsyncRead()
insert_many = AsyncWrite(doc=insert_many_doc)
insert_one = AsyncCommand(doc=insert_one_doc)
map_reduce = AsyncCommand(doc=mr_doc).wrap(Collection)
name = ReadOnlyProperty()
options = AsyncRead()
reindex = AsyncCommand()
rename = AsyncCommand()
replace_one = AsyncCommand(doc=replace_one_doc)
update_many = AsyncCommand(doc=update_many_doc)
update_one = AsyncCommand(doc=update_one_doc)
with_options = DelegateMethod().wrap(Collection)
_async_aggregate = AsyncRead(attr_name='aggregate')
_async_aggregate_raw_batches = AsyncRead(attr_name='aggregate_raw_batches')
_async_list_indexes = AsyncRead(attr_name='list_indexes')
def __init__(self, database, name, codec_options=None,
read_preference=None, write_concern=None, read_concern=None,
_delegate=None):
db_class = create_class_with_framework(
AgnosticDatabase, self._framework, self.__module__)
if not isinstance(database, db_class):
raise TypeError("First argument to MotorCollection must be "
"MotorDatabase, not %r" % database)
delegate = _delegate or Collection(
database.delegate, name, codec_options=codec_options,
read_preference=read_preference, write_concern=write_concern,
read_concern=read_concern)
create_index = AsyncCommand()
create_indexes = AsyncCommand(doc=create_indexes_doc)
delete_many = AsyncCommand(doc=delete_many_doc)
delete_one = AsyncCommand(doc=delete_one_doc)
distinct = AsyncRead()
drop = AsyncCommand(doc=drop_doc)
drop_index = AsyncCommand()
drop_indexes = AsyncCommand()
estimated_document_count = AsyncCommand()
find_one = AsyncRead(doc=find_one_doc)
find_one_and_delete = AsyncCommand(doc=find_one_and_delete_doc)
find_one_and_replace = AsyncCommand(doc=find_one_and_replace_doc)
find_one_and_update = AsyncCommand(doc=find_one_and_update_doc)
full_name = ReadOnlyProperty()
index_information = AsyncRead(doc=index_information_doc)
inline_map_reduce = AsyncRead()
insert_many = AsyncWrite(doc=insert_many_doc)
insert_one = AsyncCommand(doc=insert_one_doc)
map_reduce = AsyncCommand(doc=mr_doc).wrap(Collection)
name = ReadOnlyProperty()
options = AsyncRead()
reindex = AsyncCommand()
rename = AsyncCommand()
replace_one = AsyncCommand(doc=replace_one_doc)
update_many = AsyncCommand(doc=update_many_doc)
update_one = AsyncCommand(doc=update_one_doc)
with_options = DelegateMethod().wrap(Collection)
_async_aggregate = AsyncRead(attr_name='aggregate')
_async_aggregate_raw_batches = AsyncRead(attr_name='aggregate_raw_batches')
_async_list_indexes = AsyncRead(attr_name='list_indexes')
def __enter__(self):
raise AttributeError("Use Motor sessions like 'async with await"
" client.start_session()', not 'with'")
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class AgnosticDatabase(AgnosticBaseProperties):
__motor_class_name__ = 'MotorDatabase'
__delegate_class__ = Database
command = AsyncCommand(doc=cmd_doc)
create_collection = AsyncCommand().wrap(Collection)
current_op = AsyncRead(doc=current_op_doc)
dereference = AsyncRead()
drop_collection = AsyncCommand().unwrap('MotorCollection')
get_collection = DelegateMethod().wrap(Collection)
list_collection_names = AsyncRead(doc=list_collection_names_doc)
list_collections = AsyncRead()
name = ReadOnlyProperty()
profiling_info = AsyncRead()
profiling_level = AsyncRead()
set_profiling_level = AsyncCommand()
validate_collection = AsyncRead().unwrap('MotorCollection')
with_options = DelegateMethod().wrap(Database)
incoming_manipulators = ReadOnlyProperty()
incoming_copying_manipulators = ReadOnlyProperty()
outgoing_manipulators = ReadOnlyProperty()
outgoing_copying_manipulators = ReadOnlyProperty()
__delegate_class__ = gridfs.GridOut
_ensure_file = AsyncCommand()
_id = MotorGridOutProperty()
aliases = MotorGridOutProperty()
chunk_size = MotorGridOutProperty()
close = MotorGridOutProperty()
content_type = MotorGridOutProperty()
filename = MotorGridOutProperty()
length = MotorGridOutProperty()
md5 = MotorGridOutProperty()
metadata = MotorGridOutProperty()
name = MotorGridOutProperty()
read = AsyncRead()
readable = DelegateMethod()
readchunk = AsyncRead()
readline = AsyncRead()
seek = DelegateMethod()
seekable = DelegateMethod()
tell = DelegateMethod()
upload_date = MotorGridOutProperty()
write = DelegateMethod()
def __init__(self, root_collection, file_id=None, file_document=None,
delegate=None, session=None):
collection_class = create_class_with_framework(
AgnosticCollection, self._framework, self.__module__)
if not isinstance(root_collection, collection_class):
raise TypeError(
"First argument to MotorGridOut must be "
"MotorCollection, not %r" % root_collection)
inline_map_reduce = AsyncRead()
insert_many = AsyncWrite(doc=insert_many_doc)
insert_one = AsyncCommand(doc=insert_one_doc)
map_reduce = AsyncCommand(doc=mr_doc).wrap(Collection)
name = ReadOnlyProperty()
options = AsyncRead()
reindex = AsyncCommand()
rename = AsyncCommand()
replace_one = AsyncCommand(doc=replace_one_doc)
update_many = AsyncCommand(doc=update_many_doc)
update_one = AsyncCommand(doc=update_one_doc)
with_options = DelegateMethod().wrap(Collection)
_async_aggregate = AsyncRead(attr_name='aggregate')
_async_aggregate_raw_batches = AsyncRead(attr_name='aggregate_raw_batches')
_async_list_indexes = AsyncRead(attr_name='list_indexes')
def __init__(self, database, name, codec_options=None,
read_preference=None, write_concern=None, read_concern=None,
_delegate=None):
db_class = create_class_with_framework(
AgnosticDatabase, self._framework, self.__module__)
if not isinstance(database, db_class):
raise TypeError("First argument to MotorCollection must be "
"MotorDatabase, not %r" % database)
delegate = _delegate or Collection(
database.delegate, name, codec_options=codec_options,
read_preference=read_preference, write_concern=write_concern,
read_concern=read_concern)
class AgnosticCollection(AgnosticBaseProperties):
__motor_class_name__ = 'MotorCollection'
__delegate_class__ = Collection
bulk_write = AsyncCommand(doc=bulk_write_doc)
count_documents = AsyncRead()
create_index = AsyncCommand()
create_indexes = AsyncCommand(doc=create_indexes_doc)
delete_many = AsyncCommand(doc=delete_many_doc)
delete_one = AsyncCommand(doc=delete_one_doc)
distinct = AsyncRead()
drop = AsyncCommand(doc=drop_doc)
drop_index = AsyncCommand()
drop_indexes = AsyncCommand()
estimated_document_count = AsyncCommand()
find_one = AsyncRead(doc=find_one_doc)
find_one_and_delete = AsyncCommand(doc=find_one_and_delete_doc)
find_one_and_replace = AsyncCommand(doc=find_one_and_replace_doc)
find_one_and_update = AsyncCommand(doc=find_one_and_update_doc)
full_name = ReadOnlyProperty()
index_information = AsyncRead(doc=index_information_doc)
inline_map_reduce = AsyncRead()
insert_many = AsyncWrite(doc=insert_many_doc)
insert_one = AsyncCommand(doc=insert_one_doc)
map_reduce = AsyncCommand(doc=mr_doc).wrap(Collection)
name = ReadOnlyProperty()
options = AsyncRead()
reindex = AsyncCommand()
rename = AsyncCommand()
replace_one = AsyncCommand(doc=replace_one_doc)
update_many = AsyncCommand(doc=update_many_doc)
update_one = AsyncCommand(doc=update_one_doc)
class AgnosticDatabase(AgnosticBaseProperties):
__motor_class_name__ = 'MotorDatabase'
__delegate_class__ = Database
command = AsyncCommand(doc=cmd_doc)
create_collection = AsyncCommand().wrap(Collection)
current_op = AsyncRead(doc=current_op_doc)
dereference = AsyncRead()
drop_collection = AsyncCommand().unwrap('MotorCollection')
get_collection = DelegateMethod().wrap(Collection)
list_collection_names = AsyncRead(doc=list_collection_names_doc)
list_collections = AsyncRead()
name = ReadOnlyProperty()
profiling_info = AsyncRead()
profiling_level = AsyncRead()
set_profiling_level = AsyncCommand()
validate_collection = AsyncRead().unwrap('MotorCollection')
with_options = DelegateMethod().wrap(Database)
incoming_manipulators = ReadOnlyProperty()
incoming_copying_manipulators = ReadOnlyProperty()
outgoing_manipulators = ReadOnlyProperty()
outgoing_copying_manipulators = ReadOnlyProperty()
_async_aggregate = AsyncRead(attr_name='aggregate')
def __init__(self, client, name, **kwargs):
self._client = client
delegate = kwargs.get('_delegate') or Database(
client.delegate, name, **kwargs)
__motor_class_name__ = 'MotorClient'
__delegate_class__ = pymongo.mongo_client.MongoClient
address = ReadOnlyProperty()
arbiters = ReadOnlyProperty()
close = DelegateMethod()
drop_database = AsyncCommand().unwrap('MotorDatabase')
event_listeners = ReadOnlyProperty()
fsync = AsyncCommand(doc=fsync_doc)
get_database = DelegateMethod(doc=get_database_doc).wrap(Database)
get_default_database = DelegateMethod(doc=get_default_database_doc).wrap(Database)
HOST = ReadOnlyProperty()
is_mongos = ReadOnlyProperty()
is_primary = ReadOnlyProperty()
list_databases = AsyncRead().wrap(CommandCursor)
list_database_names = AsyncRead()
local_threshold_ms = ReadOnlyProperty()
max_bson_size = ReadOnlyProperty()
max_idle_time_ms = ReadOnlyProperty()
max_message_size = ReadOnlyProperty()
max_pool_size = ReadOnlyProperty()
max_write_batch_size = ReadOnlyProperty()
min_pool_size = ReadOnlyProperty()
nodes = ReadOnlyProperty()
PORT = ReadOnlyProperty()
primary = ReadOnlyProperty()
read_concern = ReadOnlyProperty()
retry_reads = ReadOnlyProperty()
retry_writes = ReadOnlyProperty()
secondaries = ReadOnlyProperty()
server_info = AsyncRead()
server_selection_timeout = ReadOnlyProperty()