Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_validate_multiple_principals(test_input):
BlessSchema().validate_remote_usernames(test_input)
schema = BlessSchema()
schema.context[USERNAME_VALIDATION_OPTION] = USERNAME_VALIDATION_OPTIONS.principal.name
schema.context[REMOTE_USERNAMES_VALIDATION_OPTION] = USERNAME_VALIDATION_OPTIONS.principal.name
schema.context[REMOTE_USERNAMES_BLACKLIST_OPTION] = 'balrog'
schema.validate_remote_usernames(test_input)
def validate_remote_usernames(self, remote_usernames):
if REMOTE_USERNAMES_VALIDATION_OPTION in self.context:
username_validation = USERNAME_VALIDATION_OPTIONS[self.context[REMOTE_USERNAMES_VALIDATION_OPTION]]
else:
username_validation = USERNAME_VALIDATION_OPTIONS[REMOTE_USERNAMES_VALIDATION_DEFAULT]
if REMOTE_USERNAMES_BLACKLIST_OPTION in self.context:
username_blacklist = self.context[REMOTE_USERNAMES_BLACKLIST_OPTION]
else:
username_blacklist = REMOTE_USERNAMES_BLACKLIST_DEFAULT
for remote_username in remote_usernames.split(','):
validate_user(remote_username, username_validation, username_blacklist)
certificate_validity_before_seconds = config.getint(BLESS_OPTIONS_SECTION,
CERTIFICATE_VALIDITY_BEFORE_SEC_OPTION)
certificate_validity_after_seconds = config.getint(BLESS_OPTIONS_SECTION,
CERTIFICATE_VALIDITY_AFTER_SEC_OPTION)
entropy_minimum_bits = config.getint(BLESS_OPTIONS_SECTION, ENTROPY_MINIMUM_BITS_OPTION)
random_seed_bytes = config.getint(BLESS_OPTIONS_SECTION, RANDOM_SEED_BYTES_OPTION)
ca_private_key = config.getprivatekey()
certificate_extensions = config.get(BLESS_OPTIONS_SECTION, CERTIFICATE_EXTENSIONS_OPTION)
# Process cert request
schema = BlessSchema(strict=True)
schema.context[USERNAME_VALIDATION_OPTION] = config.get(BLESS_OPTIONS_SECTION, USERNAME_VALIDATION_OPTION)
schema.context[REMOTE_USERNAMES_VALIDATION_OPTION] = config.get(BLESS_OPTIONS_SECTION,
REMOTE_USERNAMES_VALIDATION_OPTION)
schema.context[REMOTE_USERNAMES_BLACKLIST_OPTION] = config.get(BLESS_OPTIONS_SECTION,
REMOTE_USERNAMES_BLACKLIST_OPTION)
try:
request = schema.load(event).data
except ValidationError as e:
return error_response('InputValidationError', str(e))
logger.info('Bless lambda invoked by [user: {0}, bastion_ips:{1}, public_key: {2}, kmsauth_token:{3}]'.format(
request.bastion_user,
request.bastion_user_ip,
request.public_key_to_sign,
request.kmsauth_token))
# Make sure we have the ca private key password
if bless_cache.ca_private_key_password is None:
return error_response('ClientError', bless_cache.ca_private_key_password_error)
else:
certificate_validity_before_seconds = config.getint(BLESS_OPTIONS_SECTION,
CERTIFICATE_VALIDITY_BEFORE_SEC_OPTION)
certificate_validity_after_seconds = config.getint(BLESS_OPTIONS_SECTION,
CERTIFICATE_VALIDITY_AFTER_SEC_OPTION)
entropy_minimum_bits = config.getint(BLESS_OPTIONS_SECTION, ENTROPY_MINIMUM_BITS_OPTION)
random_seed_bytes = config.getint(BLESS_OPTIONS_SECTION, RANDOM_SEED_BYTES_OPTION)
ca_private_key = config.getprivatekey()
certificate_extensions = config.get(BLESS_OPTIONS_SECTION, CERTIFICATE_EXTENSIONS_OPTION)
# Process cert request
schema = BlessSchema(strict=True)
schema.context[USERNAME_VALIDATION_OPTION] = config.get(BLESS_OPTIONS_SECTION, USERNAME_VALIDATION_OPTION)
schema.context[REMOTE_USERNAMES_VALIDATION_OPTION] = config.get(BLESS_OPTIONS_SECTION,
REMOTE_USERNAMES_VALIDATION_OPTION)
schema.context[REMOTE_USERNAMES_BLACKLIST_OPTION] = config.get(BLESS_OPTIONS_SECTION,
REMOTE_USERNAMES_BLACKLIST_OPTION)
try:
request = schema.load(event).data
except ValidationError as e:
return error_response('InputValidationError', str(e))
logger.info('Bless lambda invoked by [user: {0}, bastion_ips:{1}, public_key: {2}, kmsauth_token:{3}]'.format(
request.bastion_user,
request.bastion_user_ip,
request.public_key_to_sign,
request.kmsauth_token))
# Make sure we have the ca private key password
if bless_cache.ca_private_key_password is None:
return error_response('ClientError', bless_cache.ca_private_key_password_error)