Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_configs(config, region, expected_cert_valid, expected_entropy_min, expected_rand_seed,
expected_log_level, expected_password, expected_username_validation, expected_key_compression):
config = BlessConfig(region, config_file=config)
assert expected_cert_valid == config.getint(BLESS_OPTIONS_SECTION,
CERTIFICATE_VALIDITY_BEFORE_SEC_OPTION)
assert expected_cert_valid == config.getint(BLESS_OPTIONS_SECTION,
CERTIFICATE_VALIDITY_AFTER_SEC_OPTION)
assert expected_entropy_min == config.getint(BLESS_OPTIONS_SECTION,
ENTROPY_MINIMUM_BITS_OPTION)
assert expected_rand_seed == config.getint(BLESS_OPTIONS_SECTION,
RANDOM_SEED_BYTES_OPTION)
assert expected_log_level == config.get(BLESS_OPTIONS_SECTION, LOGGING_LEVEL_OPTION)
assert expected_password == config.getpassword()
assert expected_username_validation == config.get(BLESS_OPTIONS_SECTION,
USERNAME_VALIDATION_OPTION)
assert expected_key_compression == config.get(BLESS_CA_SECTION,
CA_PRIVATE_KEY_COMPRESSION_OPTION)
'bless_ca_default_password': '',
'bless_ca_ca_private_key_file': '',
'bless_ca_ca_private_key': str(base64.b64encode(b''), encoding='ascii'),
'kms_auth_use_kmsauth': 'True',
'kms_auth_kmsauth_key_id': '',
'kms_auth_kmsauth_serviceid': 'bless-test',
}
for k, v in extra_environment_variables.items():
monkeypatch.setenv(k, v)
# Create an empty config, everything is set in the environment
config = BlessConfig('us-east-1', config_file='')
assert 1 == config.getint(BLESS_OPTIONS_SECTION, CERTIFICATE_VALIDITY_AFTER_SEC_OPTION)
assert 1 == config.getint(BLESS_OPTIONS_SECTION, CERTIFICATE_VALIDITY_BEFORE_SEC_OPTION)
assert 2 == config.getint(BLESS_OPTIONS_SECTION, ENTROPY_MINIMUM_BITS_OPTION)
assert 3 == config.getint(BLESS_OPTIONS_SECTION, RANDOM_SEED_BYTES_OPTION)
assert 'DEBUG' == config.get(BLESS_OPTIONS_SECTION, LOGGING_LEVEL_OPTION)
assert 'permit-X11-forwarding' == config.get(BLESS_OPTIONS_SECTION, CERTIFICATE_EXTENSIONS_OPTION)
assert 'debian' == config.get(BLESS_OPTIONS_SECTION, USERNAME_VALIDATION_OPTION)
assert 'useradd' == config.get(BLESS_OPTIONS_SECTION, REMOTE_USERNAMES_VALIDATION_OPTION)
assert '' == config.getpassword()
assert '' == config.get(BLESS_CA_SECTION, CA_PRIVATE_KEY_FILE_OPTION)
assert b'' == config.getprivatekey()
assert config.getboolean(KMSAUTH_SECTION, KMSAUTH_USEKMSAUTH_OPTION)
assert '' == config.get(KMSAUTH_SECTION, KMSAUTH_KEY_ID_OPTION)
assert 'bless-test' == config.get(KMSAUTH_SECTION, KMSAUTH_SERVICE_ID_OPTION)
global global_bless_cache
if ca_private_key_password is not None or config_file is not None:
bless_cache = BlessLambdaCache(ca_private_key_password, config_file)
elif global_bless_cache is None:
global_bless_cache = BlessLambdaCache(config_file=os.path.join(os.path.dirname(__file__), 'bless_deploy.cfg'))
bless_cache = global_bless_cache
else:
bless_cache = global_bless_cache
# AWS Region determines configs related to KMS
region = bless_cache.region
# Load the deployment config values
config = bless_cache.config
logging_level = config.get(BLESS_OPTIONS_SECTION, LOGGING_LEVEL_OPTION)
numeric_level = getattr(logging, logging_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: {}'.format(logging_level))
logger = logging.getLogger()
logger.setLevel(numeric_level)
certificate_validity_before_seconds = config.getint(BLESS_OPTIONS_SECTION,
CERTIFICATE_VALIDITY_BEFORE_SEC_OPTION)
certificate_validity_after_seconds = config.getint(BLESS_OPTIONS_SECTION,
CERTIFICATE_VALIDITY_AFTER_SEC_OPTION)
entropy_minimum_bits = config.getint(BLESS_OPTIONS_SECTION, ENTROPY_MINIMUM_BITS_OPTION)
random_seed_bytes = config.getint(BLESS_OPTIONS_SECTION, RANDOM_SEED_BYTES_OPTION)
ca_private_key = config.getprivatekey()
certificate_extensions = config.get(BLESS_OPTIONS_SECTION, CERTIFICATE_EXTENSIONS_OPTION)
# Load the deployment config values
config = bless_cache.config
logging_level = config.get(BLESS_OPTIONS_SECTION, LOGGING_LEVEL_OPTION)
numeric_level = getattr(logging, logging_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: {}'.format(logging_level))
logger = logging.getLogger()
logger.setLevel(numeric_level)
certificate_validity_before_seconds = config.getint(BLESS_OPTIONS_SECTION,
CERTIFICATE_VALIDITY_BEFORE_SEC_OPTION)
certificate_validity_after_seconds = config.getint(BLESS_OPTIONS_SECTION,
CERTIFICATE_VALIDITY_AFTER_SEC_OPTION)
entropy_minimum_bits = config.getint(BLESS_OPTIONS_SECTION, ENTROPY_MINIMUM_BITS_OPTION)
random_seed_bytes = config.getint(BLESS_OPTIONS_SECTION, RANDOM_SEED_BYTES_OPTION)
ca_private_key = config.getprivatekey()
certificate_extensions = config.get(BLESS_OPTIONS_SECTION, CERTIFICATE_EXTENSIONS_OPTION)
# Process cert request
schema = BlessSchema(strict=True)
schema.context[USERNAME_VALIDATION_OPTION] = config.get(BLESS_OPTIONS_SECTION, USERNAME_VALIDATION_OPTION)
schema.context[REMOTE_USERNAMES_VALIDATION_OPTION] = config.get(BLESS_OPTIONS_SECTION,
REMOTE_USERNAMES_VALIDATION_OPTION)
schema.context[REMOTE_USERNAMES_BLACKLIST_OPTION] = config.get(BLESS_OPTIONS_SECTION,
REMOTE_USERNAMES_BLACKLIST_OPTION)
try:
request = schema.load(event).data
except ValidationError as e:
return error_response('InputValidationError', str(e))