Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _validate_aws_credentials(hass, credential):
"""Validate AWS credential config."""
aws_config = credential.copy()
del aws_config[CONF_NAME]
del aws_config[CONF_VALIDATE]
profile = aws_config.get(CONF_PROFILE_NAME)
if profile is not None:
session = aiobotocore.AioSession(profile=profile)
del aws_config[CONF_PROFILE_NAME]
if CONF_ACCESS_KEY_ID in aws_config:
del aws_config[CONF_ACCESS_KEY_ID]
if CONF_SECRET_ACCESS_KEY in aws_config:
del aws_config[CONF_SECRET_ACCESS_KEY]
else:
session = aiobotocore.AioSession()
if credential[CONF_VALIDATE]:
async with session.create_client("iam", **aws_config) as client:
await client.get_user()
return session
else:
_LOGGER.error("Missing aws credential for %s", config[CONF_NAME])
return None
if session is None:
credential_name = aws_config.get(CONF_CREDENTIAL_NAME)
if credential_name is not None:
session = hass.data[DATA_SESSIONS].get(credential_name)
if session is None:
_LOGGER.warning("No available aws session for %s", credential_name)
del aws_config[CONF_CREDENTIAL_NAME]
if session is None:
profile = aws_config.get(CONF_PROFILE_NAME)
if profile is not None:
session = aiobotocore.AioSession(profile=profile)
del aws_config[CONF_PROFILE_NAME]
else:
session = aiobotocore.AioSession()
aws_config[CONF_REGION] = region_name
if service == "lambda":
context_str = json.dumps(
{"custom": conf.get(CONF_CONTEXT, {})}, cls=JSONEncoder
)
context_b64 = base64.b64encode(context_str.encode("utf-8"))
context = context_b64.decode("utf-8")
return AWSLambda(session, aws_config, context)
if service == "sns":
return AWSSNS(session, aws_config)
aws_config = credential.copy()
del aws_config[CONF_NAME]
del aws_config[CONF_VALIDATE]
profile = aws_config.get(CONF_PROFILE_NAME)
if profile is not None:
session = aiobotocore.AioSession(profile=profile)
del aws_config[CONF_PROFILE_NAME]
if CONF_ACCESS_KEY_ID in aws_config:
del aws_config[CONF_ACCESS_KEY_ID]
if CONF_SECRET_ACCESS_KEY in aws_config:
del aws_config[CONF_SECRET_ACCESS_KEY]
else:
session = aiobotocore.AioSession()
if credential[CONF_VALIDATE]:
async with session.create_client("iam", **aws_config) as client:
await client.get_user()
return session
if session is None:
credential_name = aws_config.get(CONF_CREDENTIAL_NAME)
if credential_name is not None:
session = hass.data[DATA_SESSIONS].get(credential_name)
if session is None:
_LOGGER.warning("No available aws session for %s", credential_name)
del aws_config[CONF_CREDENTIAL_NAME]
if session is None:
profile = aws_config.get(CONF_PROFILE_NAME)
if profile is not None:
session = aiobotocore.AioSession(profile=profile)
del aws_config[CONF_PROFILE_NAME]
else:
session = aiobotocore.AioSession()
aws_config[CONF_REGION] = region_name
if service == "lambda":
context_str = json.dumps(
{"custom": conf.get(CONF_CONTEXT, {})}, cls=JSONEncoder
)
context_b64 = base64.b64encode(context_str.encode("utf-8"))
context = context_b64.decode("utf-8")
return AWSLambda(session, aws_config, context)
if service == "sns":
return AWSSNS(session, aws_config)
if service == "sqs":
return AWSSQS(session, aws_config)