Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
specifies the prefix used to filter keys. Each key is mapped to a
corresponding keyword argument on the :py:class:`redis.ConnectionPool`
constructor.
Supported keys:
* ``url`` (required): a URL like ``redis://localhost/0``.
* ``max_connections``: an integer maximum number of connections in the pool
* ``socket_connect_timeout``: how long to wait for sockets to connect. e.g.
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`)
* ``socket_timeout``: how long to wait for socket operations, e.g.
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`)
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"url": config.String,
"max_connections": config.Optional(config.Integer, default=None),
"socket_connect_timeout": config.Optional(config.Timespan, default=None),
"socket_timeout": config.Optional(config.Timespan, default=None),
}
)
options = parser.parse(prefix[:-1], app_config)
if options.max_connections is not None:
kwargs.setdefault("max_connections", options.max_connections)
if options.socket_connect_timeout is not None:
kwargs.setdefault("socket_connect_timeout", options.socket_connect_timeout.total_seconds())
if options.socket_timeout is not None:
kwargs.setdefault("socket_timeout", options.socket_timeout.total_seconds())
precedence over the configuration file.
Supported keys:
* ``contact_points`` (required): comma delimited list of contact points to
try connecting for cluster discovery
* ``port``: The server-side port to open connections to.
* ``credentials_secret`` (optional): the key used to retrieve the database
credentials from ``secrets`` as a :py:class:`~baseplate.lib.secrets.CredentialSecret`.
:param execution_profiles: Configured execution profiles to provide to the
rest of the application.
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"contact_points": config.TupleOf(config.String),
"port": config.Optional(config.Integer, default=None),
"credentials_secret": config.Optional(config.String),
}
)
options = parser.parse(prefix[:-1], app_config)
if options.port:
kwargs.setdefault("port", options.port)
if options.credentials_secret:
if not secrets:
raise TypeError("'secrets' is required if 'credentials_secret' is set")
credentials = secrets.get_credentials(options.credentials_secret)
kwargs.setdefault(
The keys useful to :py:func:`exchange_from_config` should be prefixed,
e.g. ``amqp.exchange_name`` etc. The ``prefix`` argument specifies the
prefix used to filter keys. Each key is mapped to a corresponding keyword
argument on the :py:class:`~kombu.Exchange` constructor. Any keyword
arguments given to this function will be passed through to the
:py:class:`~kombu.Exchange` constructor. Keyword arguments take precedence
over the configuration file.
Supported keys:
* ``exchange_name``
* ``exchange_type``
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{"exchange_name": config.Optional(config.String), "exchange_type": config.String}
)
options = parser.parse(prefix[:-1], app_config)
return Exchange(name=options.exchange_name or "", type=options.exchange_type, **kwargs)
underlying socket default timeout.
:param app_config: the raw application configuration
:param prefix: prefix for configuration keys
:param serializer: function to serialize values to strings suitable
for being stored in memcached. An example is
:py:func:`~baseplate.clients.memcache.lib.make_dump_and_compress_fn`.
:param deserializer: function to convert strings returned from
memcached to arbitrary objects, must be compatible with ``serializer``.
An example is :py:func:`~baseplate.clients.memcache.lib.decompress_and_load`.
:returns: :py:class:`pymemcache.client.base.PooledClient`
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"endpoint": config.Endpoint,
"max_pool_size": config.Optional(config.Integer, default=None),
"connect_timeout": config.Optional(config.TimespanWithLegacyFallback, default=None),
"timeout": config.Optional(config.TimespanWithLegacyFallback, default=None),
"no_delay": config.Optional(config.Boolean, default=True),
}
)
options = parser.parse(prefix[:-1], app_config)
return PooledClient(
server=options.endpoint.address,
connect_timeout=options.connect_timeout and options.connect_timeout.total_seconds(),
timeout=options.timeout and options.timeout.total_seconds(),
serializer=serializer,
deserializer=deserializer,
Supported keys:
* ``url``: the connection URL to the database, passed to
:py:func:`~sqlalchemy.engine.url.make_url` to create the
:py:class:`~sqlalchemy.engine.url.URL` used to connect to the database.
* ``credentials_secret`` (optional): the key used to retrieve the database
credentials from ``secrets`` as a :py:class:`~baseplate.lib.secrets.CredentialSecret`.
If this is supplied, any credentials given in ``url`` we be replaced by
these.
* ``pool_recycle`` (optional): this setting causes the pool to recycle connections after
the given number of seconds has passed. It defaults to -1, or no timeout.
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"url": config.String,
"credentials_secret": config.Optional(config.String),
"pool_recycle": config.Optional(config.Integer),
}
)
options = parser.parse(prefix[:-1], app_config)
url = make_url(options.url)
if options.pool_recycle is not None:
kwargs.setdefault("pool_recycle", options.pool_recycle)
if options.credentials_secret:
if not secrets:
raise TypeError("'secrets' is required if 'credentials_secret' is set")
credentials = secrets.get_credentials(options.credentials_secret)
def from_spec(spec: ConfigSpecItem) -> "Parser":
"""Return a parser for the given spec object."""
if isinstance(spec, Parser):
return spec
if isinstance(spec, dict):
return SpecParser(spec)
if callable(spec):
return CallableParser(spec)
raise AssertionError(f"invalid specification: {spec!r}")
* ``size``: The size of the connection pool.
* ``max_age``: The oldest a connection can be before it's recycled and
replaced with a new one. Written as a
:py:func:`~baseplate.lib.config.Timespan` e.g. ``1 minute``.
* ``timeout``: The maximum amount of time a connection attempt or RPC call
can take before a TimeoutError is raised.
(:py:func:`~baseplate.lib.config.Timespan`)
* ``max_connection_attempts``: The maximum number of times the pool will attempt to
open a connection.
.. versionchanged:: 1.2
``max_retries`` was renamed ``max_connection_attempts``.
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"endpoint": config.Endpoint,
"size": config.Optional(config.Integer, default=10),
"max_age": config.Optional(config.Timespan, default=config.Timespan("1 minute")),
"timeout": config.Optional(config.Timespan, default=config.Timespan("1 second")),
"max_connection_attempts": config.Optional(config.Integer),
"max_retries": config.Optional(config.Integer),
}
)
options = parser.parse(prefix[:-1], app_config)
if options.size is not None:
kwargs.setdefault("size", options.size)
if options.max_age is not None:
kwargs.setdefault("max_age", options.max_age.total_seconds())
if options.timeout is not None: