Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_non_decimal(self):
result = config.Integer(base=8)("0600")
self.assertEqual(result, 384)
def test_parse_integer_invalid(self):
with self.assertRaises(ValueError):
config.Integer("")
with self.assertRaises(ValueError):
config.Integer("illegal")
def test_tupleof_invalid(self):
parser = config.TupleOf(config.Integer)
with self.assertRaises(ValueError):
parser("")
with self.assertRaises(ValueError):
parser("a, b")
def test_parse_integer_valid(self):
result = config.Integer("337")
self.assertEqual(result, 337)
fetcher_config,
{
"vault": {
"url": config.String,
"role": config.String,
"auth_type": config.Optional(
config.OneOf(**VaultClientFactory.auth_types()),
default=VaultClientFactory.auth_types()["aws"],
),
"mount_point": config.Optional(config.String, default="aws-ec2"),
},
"output": {
"path": config.Optional(config.String, default="/var/local/secrets.json"),
"owner": config.Optional(config.UnixUser, default=0),
"group": config.Optional(config.UnixGroup, default=0),
"mode": config.Optional(config.Integer(base=8), default=0o400), # type: ignore
},
"secrets": config.Optional(config.TupleOf(config.String), default=[]),
"callback": config.Optional(config.String),
},
)
# pylint: disable=maybe-no-member
client_factory = VaultClientFactory(
cfg.vault.url, cfg.vault.role, cfg.vault.auth_type, cfg.vault.mount_point
)
if args.once:
logger.info("Running secret fetcher once")
fetch_secrets(cfg, client_factory)
trigger_callback(cfg.callback, cfg.output.path)
else:
:param prefix: prefix for configuration keys
:param serializer: function to serialize values to strings suitable
for being stored in memcached. An example is
:py:func:`~baseplate.clients.memcache.lib.make_dump_and_compress_fn`.
:param deserializer: function to convert strings returned from
memcached to arbitrary objects, must be compatible with ``serializer``.
An example is :py:func:`~baseplate.clients.memcache.lib.decompress_and_load`.
:returns: :py:class:`pymemcache.client.base.PooledClient`
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"endpoint": config.Endpoint,
"max_pool_size": config.Optional(config.Integer, default=None),
"connect_timeout": config.Optional(config.TimespanWithLegacyFallback, default=None),
"timeout": config.Optional(config.TimespanWithLegacyFallback, default=None),
"no_delay": config.Optional(config.Boolean, default=True),
}
)
options = parser.parse(prefix[:-1], app_config)
return PooledClient(
server=options.endpoint.address,
connect_timeout=options.connect_timeout and options.connect_timeout.total_seconds(),
timeout=options.timeout and options.timeout.total_seconds(),
serializer=serializer,
deserializer=deserializer,
no_delay=options.no_delay,
max_pool_size=options.max_pool_size,
)
level = logging.DEBUG
else:
level = logging.WARNING
logging.basicConfig(level=level)
config_parser = configparser.RawConfigParser()
config_parser.read_file(args.config_file)
publisher_raw_cfg = dict(config_parser.items("trace-publisher:" + args.queue_name))
publisher_cfg = config.parse_config(
publisher_raw_cfg,
{
"zipkin_api_url": config.Endpoint,
"post_timeout": config.Optional(config.Integer, POST_TIMEOUT_DEFAULT),
"max_batch_size": config.Optional(config.Integer, MAX_BATCH_SIZE_DEFAULT),
"retry_limit": config.Optional(config.Integer, RETRY_LIMIT_DEFAULT),
},
)
trace_queue = MessageQueue(
"/traces-" + args.queue_name, max_messages=MAX_QUEUE_SIZE, max_message_size=MAX_SPAN_SIZE
)
# pylint: disable=maybe-no-member
inner_batch = TraceBatch(max_size=publisher_cfg.max_batch_size)
batcher = TimeLimitedBatch(inner_batch, MAX_BATCH_AGE)
metrics_client = metrics_client_from_config(publisher_raw_cfg)
publisher = ZipkinPublisher(
publisher_cfg.zipkin_api_url.address,
metrics_client,
post_timeout=publisher_cfg.post_timeout,
)
:return: A configured client.
"""
cfg = config.parse_config(
raw_config,
{
"tracing": {
"service_name": config.String,
"endpoint": config.Optional(config.Endpoint),
"queue_name": config.Optional(config.String),
"max_span_queue_size": config.Optional(config.Integer, default=50000),
"num_span_workers": config.Optional(config.Integer, default=5),
"span_batch_interval": config.Optional(
config.Timespan, default=config.Timespan("500 milliseconds")
),
"num_conns": config.Optional(config.Integer, default=100),
"sample_rate": config.Optional(
config.Fallback(config.Percent, config.Float), default=0.1
),
}
},
)
# pylint: disable=maybe-no-member
return make_client(
service_name=cfg.tracing.service_name,
tracing_endpoint=cfg.tracing.endpoint,
tracing_queue_name=cfg.tracing.queue_name,
max_span_queue_size=cfg.tracing.max_span_queue_size,
num_span_workers=cfg.tracing.num_span_workers,
span_batch_interval=cfg.tracing.span_batch_interval.total_seconds(),
num_conns=cfg.tracing.num_conns,
* ``contact_points`` (required): comma delimited list of contact points to
try connecting for cluster discovery
* ``port``: The server-side port to open connections to.
* ``credentials_secret`` (optional): the key used to retrieve the database
credentials from ``secrets`` as a :py:class:`~baseplate.lib.secrets.CredentialSecret`.
:param execution_profiles: Configured execution profiles to provide to the
rest of the application.
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"contact_points": config.TupleOf(config.String),
"port": config.Optional(config.Integer, default=None),
"credentials_secret": config.Optional(config.String),
}
)
options = parser.parse(prefix[:-1], app_config)
if options.port:
kwargs.setdefault("port", options.port)
if options.credentials_secret:
if not secrets:
raise TypeError("'secrets' is required if 'credentials_secret' is set")
credentials = secrets.get_credentials(options.credentials_secret)
kwargs.setdefault(
"auth_provider",
PlainTextAuthProvider(username=credentials.username, password=credentials.password),
)
if args.debug:
level = logging.DEBUG
else:
level = logging.WARNING
logging.basicConfig(level=level)
config_parser = configparser.RawConfigParser()
config_parser.read_file(args.config_file)
raw_config = dict(config_parser.items("event-publisher:" + args.queue_name))
cfg = config.parse_config(
raw_config,
{
"collector": {
"hostname": config.String,
"version": config.Optional(config.Integer, default=1),
},
"key": {"name": config.String, "secret": config.Base64},
},
)
metrics_client = metrics_client_from_config(raw_config)
event_queue = MessageQueue(
"/events-" + args.queue_name, max_messages=MAX_QUEUE_SIZE, max_message_size=MAX_EVENT_SIZE
)
# pylint: disable=maybe-no-member
serializer = SERIALIZER_BY_VERSION[cfg.collector.version]()
batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE)
publisher = BatchPublisher(metrics_client, cfg)