Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_timespan_invalid(self):
with self.assertRaises(ValueError):
config.Timespan("")
with self.assertRaises(ValueError):
config.Timespan("a b")
with self.assertRaises(ValueError):
config.Timespan("10 florgles")
with self.assertRaises(ValueError):
config.Timespan("a b c")
with self.assertRaises(ValueError):
config.Timespan("3.2 hours")
def test_timespan(self):
result = config.Timespan("30 milliseconds")
self.assertAlmostEqual(result.total_seconds(), 0.03)
result = config.Timespan("1 second")
self.assertEqual(result.total_seconds(), 1)
result = config.Timespan("2 seconds")
self.assertEqual(result.total_seconds(), 2)
result = config.Timespan("30 minutes")
self.assertEqual(result.total_seconds(), 1800)
result = config.Timespan("2 hours")
self.assertEqual(result.total_seconds(), 7200)
result = config.Timespan("1 day")
self.assertEqual(result.total_seconds(), 86400)
def test_timespan_invalid(self):
with self.assertRaises(ValueError):
config.Timespan("")
with self.assertRaises(ValueError):
config.Timespan("a b")
with self.assertRaises(ValueError):
config.Timespan("10 florgles")
with self.assertRaises(ValueError):
config.Timespan("a b c")
with self.assertRaises(ValueError):
config.Timespan("3.2 hours")
def test_timespan(self):
result = config.Timespan("30 milliseconds")
self.assertAlmostEqual(result.total_seconds(), 0.03)
result = config.Timespan("1 second")
self.assertEqual(result.total_seconds(), 1)
result = config.Timespan("2 seconds")
self.assertEqual(result.total_seconds(), 2)
result = config.Timespan("30 minutes")
self.assertEqual(result.total_seconds(), 1800)
result = config.Timespan("2 hours")
self.assertEqual(result.total_seconds(), 7200)
result = config.Timespan("1 day")
self.assertEqual(result.total_seconds(), 86400)
def test_timespan(self):
result = config.Timespan("30 milliseconds")
self.assertAlmostEqual(result.total_seconds(), 0.03)
result = config.Timespan("1 second")
self.assertEqual(result.total_seconds(), 1)
result = config.Timespan("2 seconds")
self.assertEqual(result.total_seconds(), 2)
result = config.Timespan("30 minutes")
self.assertEqual(result.total_seconds(), 1800)
result = config.Timespan("2 hours")
self.assertEqual(result.total_seconds(), 7200)
result = config.Timespan("1 day")
self.assertEqual(result.total_seconds(), 86400)
Supported keys:
* ``url`` (required): a URL like ``redis://localhost/0``.
* ``max_connections``: an integer maximum number of connections in the pool
* ``socket_connect_timeout``: how long to wait for sockets to connect. e.g.
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`)
* ``socket_timeout``: how long to wait for socket operations, e.g.
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`)
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"url": config.String,
"max_connections": config.Optional(config.Integer, default=None),
"socket_connect_timeout": config.Optional(config.Timespan, default=None),
"socket_timeout": config.Optional(config.Timespan, default=None),
}
)
options = parser.parse(prefix[:-1], app_config)
if options.max_connections is not None:
kwargs.setdefault("max_connections", options.max_connections)
if options.socket_connect_timeout is not None:
kwargs.setdefault("socket_connect_timeout", options.socket_connect_timeout.total_seconds())
if options.socket_timeout is not None:
kwargs.setdefault("socket_timeout", options.socket_timeout.total_seconds())
return redis.BlockingConnectionPool.from_url(options.url, **kwargs)
* ``timeout``: The maximum amount of time a connection attempt or RPC call
can take before a TimeoutError is raised.
(:py:func:`~baseplate.lib.config.Timespan`)
* ``max_connection_attempts``: The maximum number of times the pool will attempt to
open a connection.
.. versionchanged:: 1.2
``max_retries`` was renamed ``max_connection_attempts``.
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"endpoint": config.Endpoint,
"size": config.Optional(config.Integer, default=10),
"max_age": config.Optional(config.Timespan, default=config.Timespan("1 minute")),
"timeout": config.Optional(config.Timespan, default=config.Timespan("1 second")),
"max_connection_attempts": config.Optional(config.Integer),
"max_retries": config.Optional(config.Integer),
}
)
options = parser.parse(prefix[:-1], app_config)
if options.size is not None:
kwargs.setdefault("size", options.size)
if options.max_age is not None:
kwargs.setdefault("max_age", options.max_age.total_seconds())
if options.timeout is not None:
kwargs.setdefault("timeout", options.timeout.total_seconds())
if options.max_connection_attempts is not None:
kwargs.setdefault("max_connection_attempts", options.max_connection_attempts)
if options.max_retries is not None:
def start(server_config: Dict[str, str], application: Any, pool: Pool) -> None:
baseplate: Baseplate = getattr(application, "baseplate", None)
if not baseplate or not baseplate._metrics_client:
logger.info("No metrics client configured. Server metrics will not be sent.")
return
cfg = config.parse_config(
server_config,
{
"monitoring": {
"blocked_hub": config.Optional(config.Timespan, default=None),
"concurrency": config.Optional(config.Boolean, default=True),
"connection_pool": config.Optional(config.Boolean, default=False),
"gc": {
"stats": config.Optional(config.Boolean, default=True),
"timing": config.Optional(config.Boolean, default=False),
"refcycle": config.Optional(config.String, default=None),
},
}
},
)
reporters: List[_Reporter] = []
if cfg.monitoring.concurrency:
reporters.append(_OpenConnectionsReporter(pool))
observer = _ActiveRequestsObserver()
:param log_if_unconfigured: When the client is not configured, should
trace spans be logged or discarded silently?
:return: A configured client.
"""
cfg = config.parse_config(
raw_config,
{
"tracing": {
"service_name": config.String,
"endpoint": config.Optional(config.Endpoint),
"queue_name": config.Optional(config.String),
"max_span_queue_size": config.Optional(config.Integer, default=50000),
"num_span_workers": config.Optional(config.Integer, default=5),
"span_batch_interval": config.Optional(
config.Timespan, default=config.Timespan("500 milliseconds")
),
"num_conns": config.Optional(config.Integer, default=100),
"sample_rate": config.Optional(
config.Fallback(config.Percent, config.Float), default=0.1
),
}
},
)
# pylint: disable=maybe-no-member
return make_client(
service_name=cfg.tracing.service_name,
tracing_endpoint=cfg.tracing.endpoint,
tracing_queue_name=cfg.tracing.queue_name,
max_span_queue_size=cfg.tracing.max_span_queue_size,
num_span_workers=cfg.tracing.num_span_workers,