Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sessions_not_supported(self):
with self.assertRaisesRegex(
ConfigurationError, "Sessions are not supported"):
self.client.start_session()
"""Takes a string of the form host1[:port],host2[:port]... and
splits it into (host, port) tuples. If [:port] isn't present the
default_port is used.
Returns a set of 2-tuples containing the host name (or IP) followed by
port number.
:Parameters:
- `hosts`: A string of the form host1[:port],host2[:port],...
- `default_port`: The port number to use when one wasn't specified
for a host.
"""
nodes = []
for entity in hosts.split(','):
if not entity:
raise ConfigurationError("Empty host "
"(or extra comma in host list).")
port = default_port
# Unix socket entities don't have ports
if entity.endswith('.sock'):
port = None
nodes.append(parse_host(entity, port))
return nodes
def _delete(
self, sock_info, criteria, multi,
write_concern=None, op_id=None, ordered=True,
collation=None):
"""Internal delete helper."""
common.validate_is_mapping("filter", criteria)
concern = (write_concern or self.write_concern).document
acknowledged = concern.get("w") != 0
delete_doc = SON([('q', criteria),
('limit', int(not multi))])
collation = validate_collation_or_none(collation)
if collation is not None:
if sock_info.max_wire_version < 5:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use collations.')
elif not acknowledged:
raise ConfigurationError(
'Collation is unsupported for unacknowledged writes.')
else:
delete_doc['collation'] = collation
command = SON([('delete', self.name),
('ordered', ordered),
('deletes', [delete_doc])])
if concern:
command['writeConcern'] = concern
if sock_info.max_wire_version > 1 and acknowledged:
# Delete command.
result = sock_info.command(
self.__database.name,
write_concern=None, op_id=None, ordered=True,
bypass_doc_val=False, collation=None, array_filters=None):
"""Internal update / replace helper."""
common.validate_boolean("upsert", upsert)
if manipulate:
document = self.__database._fix_incoming(document, self)
collation = validate_collation_or_none(collation)
concern = (write_concern or self.write_concern).document
acknowledged = concern.get("w") != 0
update_doc = SON([('q', criteria),
('u', document),
('multi', multi),
('upsert', upsert)])
if collation is not None:
if sock_info.max_wire_version < 5:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use collations.')
elif not acknowledged:
raise ConfigurationError(
'Collation is unsupported for unacknowledged writes.')
else:
update_doc['collation'] = collation
if array_filters is not None:
if sock_info.max_wire_version < 6:
raise ConfigurationError(
'Must be connected to MongoDB 3.6+ to use array_filters.')
elif not acknowledged:
raise ConfigurationError(
'arrayFilters is unsupported for unacknowledged writes.')
else:
update_doc['arrayFilters'] = array_filters
command = SON([('update', self.name),
def _execute_command(self, generator, write_concern, session,
sock_info, op_id, retryable, full_result):
if sock_info.max_wire_version < 5 and self.uses_collation:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use a collation.')
if sock_info.max_wire_version < 6 and self.uses_array_filters:
raise ConfigurationError(
'Must be connected to MongoDB 3.6+ to use arrayFilters.')
db_name = self.collection.database.name
client = self.collection.database.client
listeners = client._event_listeners
if not self.current_run:
self.current_run = next(generator)
run = self.current_run
# sock_info.command validates the session, but we use
# sock_info.write_command.
sock_info.validate_session(client, session)
- `dbname`: name of the database on which to run the command
- `spec`: a command document as a dict, SON, or mapping object
- `slave_ok`: whether to set the SlaveOkay wire protocol bit
- `read_preference`: a read preference
- `codec_options`: a CodecOptions instance
- `check`: raise OperationFailure if there are errors
- `allowable_errors`: errors to ignore if `check` is True
- `check_keys`: if True, check `spec` for invalid keys
- `read_concern`: The read concern for this command.
- `write_concern`: The write concern for this command.
- `parse_write_concern_error`: Whether to parse the
``writeConcernError`` field in the command response.
- `collation`: The collation for this command.
"""
if self.max_wire_version < 4 and not read_concern.ok_for_legacy:
raise ConfigurationError(
'read concern level of %s is not valid '
'with a max wire version of %d.'
% (read_concern.level, self.max_wire_version))
if not (write_concern is None or write_concern.acknowledged or
collation is None):
raise ConfigurationError(
'Collation is unsupported for unacknowledged writes.')
if self.max_wire_version >= 5 and write_concern:
spec['writeConcern'] = write_concern.document
elif self.max_wire_version < 5 and collation is not None:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use a collation.')
try:
return command(self.sock, dbname, spec, slave_ok,
self.is_mongos, read_preference, codec_options,
check, allowable_errors, self.address,
def validate_uuid_subtype(dummy, value):
"""Validate the uuid subtype option, a numerical value whose acceptable
values are defined in bson.binary."""
if value not in _UUID_SUBTYPES.values():
raise ConfigurationError("Not a valid setting for uuid_subtype.")
return value
raise ConfigurationError("the replicaSet "
"keyword parameter is required.")
self.__net_timeout = self.__opts.get('sockettimeoutms')
self.__conn_timeout = self.__opts.get('connecttimeoutms')
self.__wait_queue_timeout = self.__opts.get('waitqueuetimeoutms')
self.__wait_queue_multiple = self.__opts.get('waitqueuemultiple')
self.__use_ssl = self.__opts.get('ssl', None)
self.__ssl_keyfile = self.__opts.get('ssl_keyfile', None)
self.__ssl_certfile = self.__opts.get('ssl_certfile', None)
self.__ssl_cert_reqs = self.__opts.get('ssl_cert_reqs', None)
self.__ssl_ca_certs = self.__opts.get('ssl_ca_certs', None)
ssl_kwarg_keys = [k for k in kwargs.keys() if k.startswith('ssl_')]
if self.__use_ssl is False and ssl_kwarg_keys:
raise ConfigurationError("ssl has not been enabled but the "
"following ssl parameters have been set: "
"%s. Please set `ssl=True` or remove."
% ', '.join(ssl_kwarg_keys))
if self.__ssl_cert_reqs and not self.__ssl_ca_certs:
raise ConfigurationError("If `ssl_cert_reqs` is not "
"`ssl.CERT_NONE` then you must "
"include `ssl_ca_certs` to be able "
"to validate the server.")
if ssl_kwarg_keys and self.__use_ssl is None:
# ssl options imply ssl = True
self.__use_ssl = True
if self.__use_ssl and not common.HAS_SSL:
raise ConfigurationError("The ssl module is not available. If you "
cmd = SON([("findAndModify", self.__name),
("query", filter),
("new", return_document)])
cmd.update(kwargs)
if projection is not None:
cmd["fields"] = helpers._fields_list_to_dict(projection,
"projection")
if sort is not None:
cmd["sort"] = helpers._index_document(sort)
if upsert is not None:
common.validate_boolean("upsert", upsert)
cmd["upsert"] = upsert
with self._socket_for_writes() as sock_info:
if array_filters is not None:
if sock_info.max_wire_version < 6:
raise ConfigurationError(
'Must be connected to MongoDB 3.6+ to use '
'arrayFilters.')
if not self.write_concern.acknowledged:
raise ConfigurationError(
'arrayFilters is unsupported for unacknowledged '
'writes.')
cmd["arrayFilters"] = array_filters
if sock_info.max_wire_version >= 4 and 'writeConcern' not in cmd:
wc_doc = self.write_concern.document
if wc_doc:
cmd['writeConcern'] = wc_doc
out = self._command(sock_info, cmd,
read_preference=ReadPreference.PRIMARY,
allowable_errors=[_NO_OBJ_ERROR],
collation=collation)
_check_write_command_response([(0, out)])
def validate_boolean(option, value):
"""Validates that 'value' is 'true' or 'false'.
"""
if isinstance(value, bool):
return value
elif isinstance(value, basestring):
if value not in ('true', 'false'):
raise ConfigurationError("The value of %s must be "
"'true' or 'false'" % (option,))
return value == 'true'
raise TypeError("Wrong type for %s, value must be a boolean" % (option,))