Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_list_databases(self):
yield self.collection.insert_one({})
cursor = yield self.cx.list_databases()
self.assertIsInstance(cursor, motor.motor_tornado.MotorCommandCursor)
# Make sure the cursor works, by searching for "local" database.
while (yield cursor.fetch_next):
info = cursor.next_object()
if info['name'] == self.collection.database.name:
break
else:
self.fail("'%s' database not found" % self.collection.database.name)
def test_cursor(self):
cursor = self.collection.find()
self.assertTrue(isinstance(cursor, motor.motor_tornado.MotorCursor))
self.assertFalse(cursor.started, "Cursor shouldn't start immediately")
def _wrap_synchro(*args, **kwargs):
motor_obj = fn(*args, **kwargs)
# Not all Motor classes appear here, only those we need to return
# from methods like map_reduce() or create_collection()
if isinstance(motor_obj, motor.MotorCollection):
client = MongoClient(delegate=motor_obj.database.client)
database = Database(client, motor_obj.database.name)
return Collection(database, motor_obj.name, delegate=motor_obj)
if isinstance(motor_obj, motor.motor_tornado.MotorClientSession):
return ClientSession(delegate=motor_obj)
if isinstance(motor_obj, _MotorTransactionContext):
return _SynchroTransactionContext(motor_obj)
if isinstance(motor_obj, motor.MotorDatabase):
client = MongoClient(delegate=motor_obj.client)
return Database(client, motor_obj.name, delegate=motor_obj)
if isinstance(motor_obj, motor.motor_tornado.MotorChangeStream):
# Send the initial aggregate as PyMongo expects.
motor_obj._lazy_init()
return ChangeStream(motor_obj)
if isinstance(motor_obj, motor.motor_tornado.MotorLatentCommandCursor):
return CommandCursor(motor_obj)
if isinstance(motor_obj, motor.motor_tornado.MotorCommandCursor):
return CommandCursor(motor_obj)
if isinstance(motor_obj, _MotorRawBatchCommandCursor):
return CommandCursor(motor_obj)
def get_motor(**kwargs):
""" 获取Mongo Motor 连接 """
dbname = __conf__.DB_NAME
if 'dbname' in kwargs:
dbname = kwargs.pop('dbname')
host = kwargs.pop('host', __conf__.DB_HOST)
client = motor.motor_tornado.MotorClient(f'mongodb://{host}')
return client[dbname], kwargs
@coroutine
def connect(self):
"""建立连接"""
try:
self.recorder('INFO', '{obj} connect start'.format(obj=self))
self.set_idle()
self._client = motor_tornado.MotorClient(**self.setting)
if self.db:
self.select_db(self.db)
self.isConnect = True
self.recorder('INFO', '{obj} connect successful'.format(obj=self))
except ConnectionFailure as e:
self.recorder('ERROR', '{obj} connect failed [{msg}]'.format(obj=self, msg=e))
self.error()
raise MongoError
raise Return(self)
self.delegate = delegate
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
motor_session = self.delegate._session
if motor_session.in_transaction:
if exc_val is None:
self.synchronize(motor_session.commit_transaction)()
else:
self.synchronize(motor_session.abort_transaction)()
class ClientSession(Synchro):
__delegate_class__ = motor.motor_tornado.MotorClientSession
start_transaction = WrapOutgoing()
client = SynchroProperty()
def __init__(self, delegate):
self.delegate = delegate
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.synchronize(self.delegate.end_session)
# For PyMongo tests that access session internals.
_client = SynchroProperty()
_pinned_address = SynchroProperty()
def __init__(self, delegate):
if not isinstance(delegate, motor.motor_tornado.MotorGridOutCursor):
raise TypeError(
"Expected MotorGridOutCursor, got %r" % delegate)
super(GridOutCursor, self).__init__(delegate)
(r'/auth/login', AuthLoginHandler),
(r'/auth/logout', AuthLogoutHandler),
(r'/socket', GameHandler),
]
settings = dict(
title='Tornado Poker',
init_coin=4000,
xsrf_cookies=True,
cookie_secret='fiDSpuZ7QFelfm0XP9Jb7ZIPNsOegkHYtgKSd4I83Hs=',
template_path=os.path.join(BASE_DIR, 'templates'),
static_path=os.path.join(BASE_DIR, 'static'),
login_url='/auth/login',
debug=True,
)
super(Application, self).__init__(handlers, **settings)
self.db = motor.motor_tornado.MotorClient('mongodb://localhost:27017/').poker
self.executor = concurrent.futures.ThreadPoolExecutor()
self.session = {}
def make_app():
client = motor.motor_tornado.MotorClient(options.database_uri)
db = client.get_database()
return tornado.web.Application([
(r"/metric", handlers.metric_ws_handler.MetricWebSocket),
], db=db, websocket_max_message_size=5767168) # 5.5 MB
return False
# For PyMongo tests that access cursor internals.
_Cursor__data = SynchroProperty()
_Cursor__exhaust = SynchroProperty()
_Cursor__max_await_time_ms = SynchroProperty()
_Cursor__max_time_ms = SynchroProperty()
_Cursor__query_flags = SynchroProperty()
_Cursor__query_spec = SynchroProperty()
_Cursor__retrieved = SynchroProperty()
_Cursor__spec = SynchroProperty()
_read_preference = SynchroProperty()
class CommandCursor(Cursor):
__delegate_class__ = motor.motor_tornado.MotorCommandCursor
class GridOutCursor(Cursor):
__delegate_class__ = motor.motor_tornado.MotorGridOutCursor
def __init__(self, delegate):
if not isinstance(delegate, motor.motor_tornado.MotorGridOutCursor):
raise TypeError(
"Expected MotorGridOutCursor, got %r" % delegate)
super(GridOutCursor, self).__init__(delegate)
def next(self):
motor_grid_out = super(GridOutCursor, self).next()
if motor_grid_out:
return GridOut(self.collection, delegate=motor_grid_out)