Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def iam_role():
return IAMRole(
**{
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"Service": ["ec2.amazonaws.com"], "AWS": "arn:aws:iam::111111111111:root"},
"Action": ["sts:AssumeRole"],
},
},
"Path": "/",
"Policies": [
{
"PolicyName": "root",
"PolicyDocument": {
def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMRole):
for policy in resource.Properties.Policies:
if policy.PolicyDocument.allowed_actions_with(REGEX_WILDCARD_POLICY_ACTION):
self.add_failure(type(self).__name__, self.REASON.format(logical_id, policy.PolicyName))
def invoke(self, cfmodel):
not_has_account_id = re.compile(rf"^((?!{self._config.aws_account_id}).)*$")
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMRole):
for principal in resource.Properties.AssumeRolePolicyDocument.allowed_principals_with(
self.ROOT_PATTERN
):
self.add_failure(
type(self).__name__, self.REASON.format(logical_id, principal), resource_ids={logical_id}
)
if self._config.aws_account_id:
for principal in resource.Properties.AssumeRolePolicyDocument.allowed_principals_with(
not_has_account_id
):
if principal not in self.valid_principals and not principal.endswith(
".amazonaws.com"
): # Checks if principal is an AWS service
if "GETATT" in principal or "UNDEFINED_" in principal:
self.add_failure(
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, (IAMManagedPolicy, IAMPolicy, S3BucketPolicy, SNSTopicPolicy, SQSQueuePolicy)):
self.check_for_wildcards(result, logical_id, resource.Properties.PolicyDocument)
elif isinstance(resource, (IAMRole, IAMUser)):
if isinstance(resource, IAMRole):
self.check_for_wildcards(result, logical_id, resource.Properties.AssumeRolePolicyDocument)
if resource.Properties and resource.Properties.Policies:
for policy in resource.Properties.Policies:
self.check_for_wildcards(result, logical_id, policy.PolicyDocument)
return result
from .resources.security_group import SecurityGroup
from .resources.security_group_egress import SecurityGroupEgress
from .resources.security_group_ingress import SecurityGroupIngress
from .resources.sqs_queue_policy import SQSQueuePolicy
from .resources.sns_topic_policy import SNSTopicPolicy
from .resources.kms_key import KMSKey
_RESOURCE_MAP = {
"AWS::EC2::SecurityGroup": SecurityGroup,
"AWS::EC2::SecurityGroupEgress": SecurityGroupEgress,
"AWS::EC2::SecurityGroupIngress": SecurityGroupIngress,
"AWS::IAM::Group": IAMGroup,
"AWS::IAM::ManagedPolicy": IAMManagedPolicy,
"AWS::IAM::Policy": IAMPolicy,
"AWS::IAM::Role": IAMRole,
"AWS::IAM::User": IAMUser,
"AWS::KMS::Key": KMSKey,
"AWS::S3::BucketPolicy": S3BucketPolicy,
"AWS::SNS::TopicPolicy": SNSTopicPolicy,
"AWS::SQS::QueuePolicy": SQSQueuePolicy,
}
_DEFAULT_RESOURCE = Resource
def create_resource(logical_id: str, value: Dict[str, Any]) -> Resource:
resource = _RESOURCE_MAP.get(value.get("Type"), _DEFAULT_RESOURCE)
return resource(logical_id, value)
def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMRole) and resource.Properties.AssumeRolePolicyDocument.allowed_actions_with(
REGEX_WILDCARD_POLICY_ACTION
):
self.add_failure(type(self).__name__, self.REASON.format(logical_id))
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, (IAMManagedPolicy, IAMPolicy, S3BucketPolicy, SNSTopicPolicy, SQSQueuePolicy)):
self.check_for_wildcards(result, logical_id, resource.Properties.PolicyDocument)
elif isinstance(resource, (IAMRole, IAMUser)):
if isinstance(resource, IAMRole):
self.check_for_wildcards(result, logical_id, resource.Properties.AssumeRolePolicyDocument)
if resource.Properties and resource.Properties.Policies:
for policy in resource.Properties.Policies:
self.check_for_wildcards(result, logical_id, policy.PolicyDocument)
return result
from pycfmodel.model.resources.iam_policy import IAMPolicy
from pycfmodel.model.resources.iam_role import IAMRole
from pycfmodel.model.resources.iam_user import IAMUser
from pycfmodel.model.resources.kms_key import KMSKey
from pycfmodel.model.resources.s3_bucket_policy import S3BucketPolicy
from pycfmodel.model.resources.security_group import SecurityGroup
from pycfmodel.model.resources.security_group_egress import SecurityGroupEgress
from pycfmodel.model.resources.security_group_ingress import SecurityGroupIngress
from pycfmodel.model.resources.sns_topic_policy import SNSTopicPolicy
from pycfmodel.model.resources.sqs_queue_policy import SQSQueuePolicy
ResourceModels = Union[
IAMGroup,
IAMManagedPolicy,
IAMPolicy,
IAMRole,
IAMUser,
KMSKey,
S3BucketPolicy,
SecurityGroup,
SecurityGroupEgress,
SecurityGroupIngress,
SNSTopicPolicy,
SQSQueuePolicy,
]
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMRole):
self.check_managed_policies(result, logical_id, resource)
self.check_inline_policies(result, logical_id, resource)
return result