Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def security_group_ingress_ipv4_1():
return SecurityGroupIngress(
**{
"Type": "AWS::EC2::SecurityGroupIngress",
"Properties": {
"GroupId": "sg-12341234",
"CidrIp": "1.1.1.1/0",
"FromPort": 46,
"ToPort": 46,
"IpProtocol": "tcp",
},
def security_group_ingress_ipv4_2():
return SecurityGroupIngress(
**{
"Type": "AWS::EC2::SecurityGroupIngress",
"Properties": {
"GroupId": "sg-2345",
"CidrIp": "127.0.0.0",
"FromPort": 46,
"ToPort": 46,
"IpProtocol": "tcp",
},
def security_group_ingress_ipv6():
return SecurityGroupIngress(
**{
"Type": "AWS::EC2::SecurityGroupIngress",
"Properties": {
"GroupId": "sg-12341234",
"CidrIpv6": "2001:0db8:0000:0000:0000:ff00:0042:8329",
"FromPort": 46,
"ToPort": 46,
"IpProtocol": "tcp",
},
from .resources.iam_managed_policy import IAMManagedPolicy
from .resources.iam_policy import IAMPolicy
from .resources.iam_role import IAMRole
from .resources.s3_bucket_policy import S3BucketPolicy
from .resources.security_group import SecurityGroup
from .resources.security_group_egress import SecurityGroupEgress
from .resources.security_group_ingress import SecurityGroupIngress
from .resources.sqs_queue_policy import SQSQueuePolicy
from .resources.sns_topic_policy import SNSTopicPolicy
from .resources.kms_key import KMSKey
_RESOURCE_MAP = {
"AWS::EC2::SecurityGroup": SecurityGroup,
"AWS::EC2::SecurityGroupEgress": SecurityGroupEgress,
"AWS::EC2::SecurityGroupIngress": SecurityGroupIngress,
"AWS::IAM::Group": IAMGroup,
"AWS::IAM::ManagedPolicy": IAMManagedPolicy,
"AWS::IAM::Policy": IAMPolicy,
"AWS::IAM::Role": IAMRole,
"AWS::IAM::User": IAMUser,
"AWS::KMS::Key": KMSKey,
"AWS::S3::BucketPolicy": S3BucketPolicy,
"AWS::SNS::TopicPolicy": SNSTopicPolicy,
"AWS::SQS::QueuePolicy": SQSQueuePolicy,
}
_DEFAULT_RESOURCE = Resource
def create_resource(logical_id: str, value: Dict[str, Any]) -> Resource:
resource = _RESOURCE_MAP.get(value.get("Type"), _DEFAULT_RESOURCE)
return resource(logical_id, value)
def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, SecurityGroupIngress) and (
resource.ipv4_slash_zero() or resource.ipv6_slash_zero()
):
for port in range(resource.Properties.FromPort, resource.Properties.ToPort + 1):
if str(port) not in self._config.allowed_world_open_ports:
self.add_failure(type(self).__name__, self.REASON.format(port, logical_id))
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.resources_filtered_by_type({SecurityGroupIngress}).items():
filters_available_context = {
"config": self._config,
"extras": extras,
"logical_id": logical_id,
"resource": resource,
}
self.analyse_ingress(result, logical_id, resource.Properties, filters_available_context)
return result
from pycfmodel.model.resources.security_group_egress import SecurityGroupEgress
from pycfmodel.model.resources.security_group_ingress import SecurityGroupIngress
from pycfmodel.model.resources.sns_topic_policy import SNSTopicPolicy
from pycfmodel.model.resources.sqs_queue_policy import SQSQueuePolicy
ResourceModels = Union[
IAMGroup,
IAMManagedPolicy,
IAMPolicy,
IAMRole,
IAMUser,
KMSKey,
S3BucketPolicy,
SecurityGroup,
SecurityGroupEgress,
SecurityGroupIngress,
SNSTopicPolicy,
SQSQueuePolicy,
]