Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_parse_string(self):
result = config.String("whatever")
self.assertEqual(result, "whatever")
def test_empty_string_not_ok(self):
with self.assertRaises(ValueError):
config.String("")
* ``url``: the connection URL to the database, passed to
:py:func:`~sqlalchemy.engine.url.make_url` to create the
:py:class:`~sqlalchemy.engine.url.URL` used to connect to the database.
* ``credentials_secret`` (optional): the key used to retrieve the database
credentials from ``secrets`` as a :py:class:`~baseplate.lib.secrets.CredentialSecret`.
If this is supplied, any credentials given in ``url`` we be replaced by
these.
* ``pool_recycle`` (optional): this setting causes the pool to recycle connections after
the given number of seconds has passed. It defaults to -1, or no timeout.
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"url": config.String,
"credentials_secret": config.Optional(config.String),
"pool_recycle": config.Optional(config.Integer),
}
)
options = parser.parse(prefix[:-1], app_config)
url = make_url(options.url)
if options.pool_recycle is not None:
kwargs.setdefault("pool_recycle", options.pool_recycle)
if options.credentials_secret:
if not secrets:
raise TypeError("'secrets' is required if 'credentials_secret' is set")
credentials = secrets.get_credentials(options.credentials_secret)
url.username = credentials.username
url.password = credentials.password
baseplate: Baseplate = getattr(application, "baseplate", None)
if not baseplate or not baseplate._metrics_client:
logger.info("No metrics client configured. Server metrics will not be sent.")
return
cfg = config.parse_config(
server_config,
{
"monitoring": {
"blocked_hub": config.Optional(config.Timespan, default=None),
"concurrency": config.Optional(config.Boolean, default=True),
"connection_pool": config.Optional(config.Boolean, default=False),
"gc": {
"stats": config.Optional(config.Boolean, default=True),
"timing": config.Optional(config.Boolean, default=False),
"refcycle": config.Optional(config.String, default=None),
},
}
},
)
reporters: List[_Reporter] = []
if cfg.monitoring.concurrency:
reporters.append(_OpenConnectionsReporter(pool))
observer = _ActiveRequestsObserver()
reporters.append(observer)
baseplate.register(observer)
if cfg.monitoring.connection_pool:
reporters.append(_BaseplateReporter(baseplate.get_runtime_metric_reporters()))
:py:class:`~kombu.connection.Connection` constructor. Keyword arguments
take precedence over the configuration file.
Supported keys:
* ``credentials_secret``
* ``hostname``
* ``virtual_host``
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"credentials_secret": config.Optional(config.String),
"hostname": config.String,
"virtual_host": config.Optional(config.String),
}
)
options = parser.parse(prefix[:-1], app_config)
if options.credentials_secret:
if not secrets:
raise ValueError("'secrets' is required if 'credentials_secret' is set")
credentials = secrets.get_credentials(options.credentials_secret)
kwargs.setdefault("userid", credentials.username)
kwargs.setdefault("password", credentials.password)
return Connection(hostname=options.hostname, virtual_host=options.virtual_host, **kwargs)
constructor.
Supported keys:
* ``url`` (required): a URL like ``redis://localhost/0``.
* ``max_connections``: an integer maximum number of connections in the pool
* ``socket_connect_timeout``: how long to wait for sockets to connect. e.g.
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`)
* ``socket_timeout``: how long to wait for socket operations, e.g.
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`)
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"url": config.String,
"max_connections": config.Optional(config.Integer, default=None),
"socket_connect_timeout": config.Optional(config.Timespan, default=None),
"socket_timeout": config.Optional(config.Timespan, default=None),
}
)
options = parser.parse(prefix[:-1], app_config)
if options.max_connections is not None:
kwargs.setdefault("max_connections", options.max_connections)
if options.socket_connect_timeout is not None:
kwargs.setdefault("socket_connect_timeout", options.socket_connect_timeout.total_seconds())
if options.socket_timeout is not None:
kwargs.setdefault("socket_timeout", options.socket_timeout.total_seconds())
return redis.BlockingConnectionPool.from_url(options.url, **kwargs)
e.g. ``amqp.exchange_name`` etc. The ``prefix`` argument specifies the
prefix used to filter keys. Each key is mapped to a corresponding keyword
argument on the :py:class:`~kombu.Exchange` constructor. Any keyword
arguments given to this function will be passed through to the
:py:class:`~kombu.Exchange` constructor. Keyword arguments take precedence
over the configuration file.
Supported keys:
* ``exchange_name``
* ``exchange_type``
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{"exchange_name": config.Optional(config.String), "exchange_type": config.String}
)
options = parser.parse(prefix[:-1], app_config)
return Exchange(name=options.exchange_name or "", type=options.exchange_type, **kwargs)
logging.basicConfig(format="%(asctime)s:%(levelname)s:%(message)s", level=level)
parser = configparser.RawConfigParser()
parser.read_file(args.config_file)
fetcher_config = dict(parser.items("secret-fetcher"))
cfg = config.parse_config(
fetcher_config,
{
"vault": {
"url": config.String,
"role": config.String,
"auth_type": config.Optional(
config.OneOf(**VaultClientFactory.auth_types()),
default=VaultClientFactory.auth_types()["aws"],
),
"mount_point": config.Optional(config.String, default="aws-ec2"),
},
"output": {
"path": config.Optional(config.String, default="/var/local/secrets.json"),
"owner": config.Optional(config.UnixUser, default=0),
"group": config.Optional(config.UnixGroup, default=0),
"mode": config.Optional(config.Integer(base=8), default=0o400), # type: ignore
},
"secrets": config.Optional(config.TupleOf(config.String), default=[]),
"callback": config.Optional(config.String),
},
)
# pylint: disable=maybe-no-member
client_factory = VaultClientFactory(
cfg.vault.url, cfg.vault.role, cfg.vault.auth_type, cfg.vault.mount_point
)
Percentage of unsampled requests to record traces for (e.g. "37%")
:param raw_config: The application configuration which should have settings
for the tracing client.
:param log_if_unconfigured: When the client is not configured, should
trace spans be logged or discarded silently?
:return: A configured client.
"""
cfg = config.parse_config(
raw_config,
{
"tracing": {
"service_name": config.String,
"endpoint": config.Optional(config.Endpoint),
"queue_name": config.Optional(config.String),
"max_span_queue_size": config.Optional(config.Integer, default=50000),
"num_span_workers": config.Optional(config.Integer, default=5),
"span_batch_interval": config.Optional(
config.Timespan, default=config.Timespan("500 milliseconds")
),
"num_conns": config.Optional(config.Integer, default=100),
"sample_rate": config.Optional(
config.Fallback(config.Percent, config.Float), default=0.1
),
}
},
)
# pylint: disable=maybe-no-member
return make_client(
service_name=cfg.tracing.service_name,
Pool size for remote recorder connection pool.
``tracing.sample_rate`` (optional)
Percentage of unsampled requests to record traces for (e.g. "37%")
:param raw_config: The application configuration which should have settings
for the tracing client.
:param log_if_unconfigured: When the client is not configured, should
trace spans be logged or discarded silently?
:return: A configured client.
"""
cfg = config.parse_config(
raw_config,
{
"tracing": {
"service_name": config.String,
"endpoint": config.Optional(config.Endpoint),
"queue_name": config.Optional(config.String),
"max_span_queue_size": config.Optional(config.Integer, default=50000),
"num_span_workers": config.Optional(config.Integer, default=5),
"span_batch_interval": config.Optional(
config.Timespan, default=config.Timespan("500 milliseconds")
),
"num_conns": config.Optional(config.Integer, default=100),
"sample_rate": config.Optional(
config.Fallback(config.Percent, config.Float), default=0.1
),
}
},
)
# pylint: disable=maybe-no-member