Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_RESOURCE_MAP = {
"AWS::EC2::SecurityGroup": SecurityGroup,
"AWS::EC2::SecurityGroupEgress": SecurityGroupEgress,
"AWS::EC2::SecurityGroupIngress": SecurityGroupIngress,
"AWS::IAM::Group": IAMGroup,
"AWS::IAM::ManagedPolicy": IAMManagedPolicy,
"AWS::IAM::Policy": IAMPolicy,
"AWS::IAM::Role": IAMRole,
"AWS::IAM::User": IAMUser,
"AWS::KMS::Key": KMSKey,
"AWS::S3::BucketPolicy": S3BucketPolicy,
"AWS::SNS::TopicPolicy": SNSTopicPolicy,
"AWS::SQS::QueuePolicy": SQSQueuePolicy,
}
_DEFAULT_RESOURCE = Resource
def create_resource(logical_id: str, value: Dict[str, Any]) -> Resource:
resource = _RESOURCE_MAP.get(value.get("Type"), _DEFAULT_RESOURCE)
return resource(logical_id, value)
class SNSTopicPolicyProperties(CustomModel):
"""
Properties:
- PolicyDocument: A [policy document][pycfmodel.model.resources.properties.policy_document.PolicyDocument] object.
- Topics: ARNs of the topics to add the policy.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sns-policy.html)
"""
PolicyDocument: Resolvable[PolicyDocument]
Topics: List[ResolvableStr]
class SNSTopicPolicy(Resource):
"""
Properties:
- Properties: A [SNS Topic Policy][pycfmodel.model.resources.sns_topic_policy.SNSTopicPolicyProperties] object.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sns-policy.html)
"""
TYPE_VALUE: ClassVar = "AWS::SNS::TopicPolicy"
Type: str = TYPE_VALUE
Properties: Resolvable[SNSTopicPolicyProperties]
- Policies: A list of [policy][pycfmodel.model.resources.properties.policy.Policy] objects.
- UserName: Name of the user.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-iam-user.html)
"""
Groups: Optional[Resolvable[List[ResolvableStr]]] = None
LoginProfile: Optional[Dict] = None
ManagedPolicyArns: Optional[Resolvable[List[ResolvableStr]]] = None
Path: Optional[ResolvableStr] = None
PermissionsBoundary: Optional[ResolvableStr] = None
Policies: Optional[Resolvable[List[Resolvable[Policy]]]] = None
UserName: Optional[ResolvableStr] = None
class IAMUser(Resource):
"""
Properties:
- Properties: A [IAM User properties][pycfmodel.model.resources.iam_user.IAMUserProperties] object.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-iam-user.html)
"""
TYPE_VALUE: ClassVar = "AWS::IAM::User"
Type: str = TYPE_VALUE
Properties: Optional[Resolvable[IAMUserProperties]]
def has_hardcoded_credentials(self) -> bool:
""" Returns True if login profile password contains a hardcoded string, otherwise False. """
if self.Properties:
login_profile = self.Properties.LoginProfile
import logging
from typing import ClassVar
from pydantic import Extra, validator
from pycfmodel.model.resources.resource import Resource
from pycfmodel.model.resources.types import ResourceModels
logger = logging.getLogger(__file__)
_EXISTING_RESOURCE_TYPES = {klass.TYPE_VALUE for klass in ResourceModels.__args__}
class GenericResource(Resource):
"""This class is used for all resource types that we haven't had time to implement yet"""
ALLOW_EXISTING_TYPES: ClassVar[bool] = True
Type: str
class Config(Resource.Config):
extra = Extra.allow
@validator("Type", pre=True)
def check_type(cls, value, values, **kwargs):
if value in _EXISTING_RESOURCE_TYPES:
if cls.ALLOW_EXISTING_TYPES:
logger.warning(f"Instantiating a GenericResource from a {value} in {values}")
else:
raise ValueError(f"Instantiation of GenericResource from {value} in {values} not allowed")
return value
- CidrIpv6: IPv6 address range.
- Description: Description for the security group rule.
- DestinationPrefixListId: The prefix list IDs for an AWS service.
- DestinationSecurityGroupId: ID of the destination VPC security group.
- FromPort: Start of port range for the TCP and UDP protocols, or an ICMP/ICMPv6 type number. A value of -1 indicates all ICMP/ICMPv6 types.
- GroupId: ID of the security group.
- IpProtocol: IP protocol name.
- ToPort: End of port range for the TCP and UDP protocols, or an ICMP/ICMPv6 code. A value of -1 indicates all ICMP/ICMPv6 codes.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-security-group-egress.html)
"""
GroupId: Optional[ResolvableStr] = None
class SecurityGroupEgress(Resource):
"""
Properties:
- Properties: A [Security Group Egress Properties][pycfmodel.model.resources.security_group_egress.SecurityGroupEgressProperties] object.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-security-group-egress.html)
"""
TYPE_VALUE: ClassVar = "AWS::EC2::SecurityGroupEgress"
Type: str = TYPE_VALUE
Properties: SecurityGroupEgressProperties
def ipv4_slash_zero(self) -> bool:
return self.Properties.ipv4_slash_zero()
def ipv6_slash_zero(self) -> bool:
logger = logging.getLogger(__file__)
class GenericWildcardPolicyRule(Rule):
"""
Abstract rule that checks for use of the wildcard `*` character in Actions of Policy Documents of AWS Resources.
This rule must be inherited by another class to be used, with `AWS_RESOURCE` set to the resource to be checked.
See `S3BucketPolicyWildcardActionRule` and `SQSQueuePolicyWildcardActionRule` for examples.
"""
REASON = "The {} {} should not allow a `*` action"
GRANULARITY = RuleGranularity.RESOURCE
AWS_RESOURCE: Type[Resource] = None
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
if self.AWS_RESOURCE is None:
logger.warning(f"Not running {type(self).__name__} rule as AWS_RESOURCE is not defined.")
else:
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, self.AWS_RESOURCE):
print(resource.Properties.PolicyDocument)
if isinstance(resource, self.AWS_RESOURCE) and resource.Properties.PolicyDocument.allowed_actions_with(
REGEX_HAS_STAR_OR_STAR_AFTER_COLON
):
self.add_failure_to_result(
result, self.REASON.format(self.AWS_RESOURCE.__name__, logical_id), resource_ids={logical_id},
)
return result
class SQSQueuePolicyProperties(CustomModel):
"""
Properties:
- PolicyDocument: A [policy document][pycfmodel.model.resources.properties.policy_document.PolicyDocument] object.
- Queues: URLs of the queues to add the policy.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-policy.html)
"""
PolicyDocument: Resolvable[PolicyDocument]
Queues: Resolvable[List[ResolvableStr]]
class SQSQueuePolicy(Resource):
"""
Properties:
- Properties: A [SQS Queue Policy Properties][pycfmodel.model.resources.sqs_queue_policy.SQSQueuePolicy] object.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-policy.html)
"""
TYPE_VALUE: ClassVar = "AWS::SQS::QueuePolicy"
Type: str = TYPE_VALUE
Properties: Resolvable[SQSQueuePolicyProperties]
class S3BucketPolicyProperties(CustomModel):
"""
Properties:
- Bucket: Name of the Amazon S3 bucket to which the policy applies.
- PolicyDocument: A [policy document][pycfmodel.model.resources.properties.policy_document.PolicyDocument] object.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-s3-policy.html)
"""
Bucket: ResolvableStr
PolicyDocument: Resolvable[PolicyDocument]
class S3BucketPolicy(Resource):
"""
Properties:
- Properties: A [S3 Bucket Policy Properties][pycfmodel.model.resources.s3_bucket_policy.S3BucketPolicyProperties] object.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-s3-policy.html)
"""
TYPE_VALUE: ClassVar = "AWS::S3::BucketPolicy"
Type: str = TYPE_VALUE
Properties: Resolvable[S3BucketPolicyProperties]
- GroupName: Name of the security group.
- IpProtocol: IP protocol name.
- SourcePrefixListId: The prefix list IDs for an AWS service.
- SourceSecurityGroupId: ID of the security group.
- SourceSecurityGroupName: Name of the source security group.
- SourceSecurityGroupOwnerId: AWS account ID for the source security group.
- ToPort: End of port range for the TCP and UDP protocols, or an ICMP/ICMPv6 code. A value of -1 indicates all ICMP/ICMPv6 codes.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ec2-security-group-ingress.html)
"""
GroupId: Optional[ResolvableStr] = None
GroupName: Optional[ResolvableStr] = None
class SecurityGroupIngress(Resource):
"""
Properties:
- Properties: A [Security Group Ingress Properties][pycfmodel.model.resources.kms_key.KMSKeyProperties] object.
More info at [AWS Docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ec2-security-group-ingress.html)
"""
TYPE_VALUE: ClassVar = "AWS::EC2::SecurityGroupIngress"
Type: str = TYPE_VALUE
Properties: SecurityGroupIngressProperties
def ipv4_slash_zero(self) -> bool:
return self.Properties.ipv4_slash_zero()
def ipv6_slash_zero(self) -> bool:
from pycfmodel.model.resources.resource import Resource
from pycfmodel.model.resources.types import ResourceModels
logger = logging.getLogger(__file__)
_EXISTING_RESOURCE_TYPES = {klass.TYPE_VALUE for klass in ResourceModels.__args__}
class GenericResource(Resource):
"""This class is used for all resource types that we haven't had time to implement yet"""
ALLOW_EXISTING_TYPES: ClassVar[bool] = True
Type: str
class Config(Resource.Config):
extra = Extra.allow
@validator("Type", pre=True)
def check_type(cls, value, values, **kwargs):
if value in _EXISTING_RESOURCE_TYPES:
if cls.ALLOW_EXISTING_TYPES:
logger.warning(f"Instantiating a GenericResource from a {value} in {values}")
else:
raise ValueError(f"Instantiation of GenericResource from {value} in {values} not allowed")
return value