Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_connect(self):
"""Ensure that the connect() method works properly."""
connect('mongoenginetest')
conn = get_connection()
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
db = get_db()
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest')
connect('mongoenginetest2', alias='testdb')
conn = get_connection('testdb')
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
def test_connect(self):
self.skip()
# Use a timeout so the test is reasonably fast
self.assertRaises(ConnectionFailure, MongoClient,
[self.bad, self.bad], connectTimeoutMS=500)
client = MongoClient([self.left, self.right])
self.assertTrue(client)
host = client.host
port = client.port
client = MongoClient([self.right, self.left], connectTimeoutMS=500)
self.assertTrue(client)
self.assertEqual(host, client.host)
self.assertEqual(port, client.port)
if self.left == '%s:%s' % (host, port):
slave = self.right
else:
slave = self.left
# Refuse to connect if master absent
def test_default(self):
client = MongoClient(connect=False)
self.assertEqual(client.retry_reads, True)
kwargs['username'] = db_user
kwargs['password'] = db_pwd
# Set bad defaults.
MongoClient.HOST = "somedomainthatdoesntexist.org"
MongoClient.PORT = 123456789
with self.assertRaises(AutoReconnect):
connected(MongoClient(serverSelectionTimeoutMS=10,
**kwargs))
# Override the defaults. No error.
connected(MongoClient(host, port, **kwargs))
# Set good defaults.
MongoClient.HOST = host
MongoClient.PORT = port
# No error.
connected(MongoClient(**kwargs))
def _test_recover_from_initial(self, initial_callback):
# Construct a valid final response callback distinct from base.
response_final = self.BASE_SRV_RESPONSE[:]
response_final.pop()
def final_callback():
return response_final
with SrvPollingKnobs(
ttl_time=WAIT_TIME, min_srv_rescan_interval=WAIT_TIME,
dns_resolver_nodelist_response=initial_callback,
count_resolver_calls=True):
# Client uses unpatched method to get initial nodelist
client = MongoClient(self.CONNECTION_STRING)
# Invalid DNS resolver response should not change nodelist.
self.assert_nodelist_nochange(self.BASE_SRV_RESPONSE, client)
with SrvPollingKnobs(
ttl_time=WAIT_TIME, min_srv_rescan_interval=WAIT_TIME,
dns_resolver_nodelist_response=final_callback):
# Nodelist should reflect new valid DNS resolver response.
self.assert_nodelist_change(response_final, client)
def dns_resolver_response():
return dns_response
if expect_change:
assertion_method = self.assert_nodelist_change
count_resolver_calls = False
expected_response = dns_response
else:
assertion_method = self.assert_nodelist_nochange
count_resolver_calls = True
expected_response = self.BASE_SRV_RESPONSE
# Patch timeouts to ensure short test running times.
with SrvPollingKnobs(
ttl_time=WAIT_TIME, min_srv_rescan_interval=WAIT_TIME):
client = MongoClient(self.CONNECTION_STRING)
self.assert_nodelist_change(self.BASE_SRV_RESPONSE, client)
# Patch list of hosts returned by DNS query.
with SrvPollingKnobs(
dns_resolver_nodelist_response=dns_resolver_response,
count_resolver_calls=count_resolver_calls):
assertion_method(expected_response, client)
def create(client, opts):
"""Create a _CommandEncyptor for a client.
:Parameters:
- `client`: The encrypted MongoClient.
- `opts`: The encrypted client's :class:`AutoEncryptionOpts`.
:Returns:
A :class:`_CommandEncrypter` for this client.
"""
key_vault_client = opts._key_vault_client or client
db, coll = opts._key_vault_namespace.split('.', 1)
key_vault_coll = key_vault_client[db][coll]
mongocryptd_client = MongoClient(
opts._mongocryptd_uri, connect=False,
serverSelectionTimeoutMS=_MONGOCRYPTD_TIMEOUT_MS)
io_callbacks = _EncryptionIO(
client, key_vault_coll, mongocryptd_client, opts)
return _Encrypter(io_callbacks, opts)
# Handle replica data, if any
# See
# http://www.mongodb.org/display/DOCS/Replica+Set+Commands#ReplicaSetCommands-replSetGetStatus # noqa
if _is_affirmative(instance.get('replica_check', True)):
try:
data = {}
dbnames = []
replSet = admindb.command('replSetGetStatus')
if replSet:
primary = None
current = None
# need a new connection to deal with replica sets
setname = replSet.get('set')
cli_rs = pymongo.mongo_client.MongoClient(
server,
socketTimeoutMS=timeout,
connectTimeoutMS=timeout,
serverSelectionTimeoutMS=timeout,
replicaset=setname,
read_preference=pymongo.ReadPreference.NEAREST,
**ssl_params)
if do_auth:
if auth_source:
self._authenticate(cli_rs[auth_source],
username,
password,
use_x509,
server,
service_check_tags)
def open_spider(self, spider):
if self.rep is not None:
self.connection = MongoReplicaSetClient(
self.uri,
replicaSet=self.rep,
w=self.wcc,
fsync=self.fsc,
read_preference=ReadPreference.PRIMARY_PREFERRED)
else:
self.connection = MongoClient(
self.uri,
fsync=self.fsc,
read_preference=ReadPreference.PRIMARY)
self.database = self.connection[self.db]
self.collection = self.database[self.col]
log.msg('Connected to MongoDB "%s", using "%s/%s"' %
self.uri, self.db, self.col)
# create index if not existing
if self._get_uniq_key() is not None:
self.collection.create_index(self._get_uniq_key(), unique=True)