Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_securitygroupegress(self):
egress = ec2.SecurityGroupEgress(
'egress',
ToPort='80',
FromPort='80',
IpProtocol="tcp",
GroupId="id",
CidrIp="0.0.0.0/0",
)
egress.to_dict()
egress = ec2.SecurityGroupEgress(
'egress',
ToPort='80',
FromPort='80',
IpProtocol="tcp",
GroupId="id",
DestinationPrefixListId='id',
ec2.InternetGateway(
'InternetGateway',
Tags=self.get_tags()
)
)
gateway_attachment = self.create_resource(
ec2.VPCGatewayAttachment(
'VPCGatewayAttachment',
VpcId=Ref(self.vpc),
InternetGatewayId=Ref(gateway)
)
)
public_route_table = self.create_resource(
ec2.RouteTable(
'PublicRouteTable',
VpcId=Ref(self.vpc))
)
self.create_resource(
ec2.Route(
'PublicRoute',
RouteTableId=Ref(public_route_table),
DestinationCidrBlock=ALLOW_ALL_CIDR,
DependsOn=gateway_attachment.title,
GatewayId=Ref(gateway)
)
)
return public_route_table
}
}
)
)
),
Tags=Tags(
Application=self._stack_id,
Name=Sub('Streamlit EC2 Instance (${AWS::StackName})'),
),
IamInstanceProfile=Ref(self._resources['Ec2IamInstanceProfile']),
DependsOn='StackDeletorRole',
),
})
self._resources.update({
'IPAddress': ec2.EIP(
'IPAddress',
Domain='vpc',
InstanceId=Ref(self._resources['Ec2Instance']),
DependsOn='StackDeletorRole',
),
})
self._outputs.update({
'SshIp': Output(
'SshIp',
Description='SshIp',
Value=GetAtt('Ec2Instance', 'PublicIp'),
),
'SshCommand': Output(
'SshCommand',
Description='SshCommand',
def _dhcp_options_hosted_zones(self):
t = self.template
domain_name = Join(" ", [Ref("BaseDomain"), Ref("InternalDomain")])
dhcp_options = t.add_resource(ec2.DHCPOptions(
'DHCPOptionsWithDNS',
DomainName=domain_name,
DomainNameServers=['AmazonProvidedDNS', ],
Condition="HasHostedZones"))
t.add_resource(ec2.VPCDHCPOptionsAssociation(
'DHCPAssociationWithDNS',
VpcId=VPC_ID,
DhcpOptionsId=Ref(dhcp_options),
Condition="HasHostedZones"))
Action=[awacs.aws.Action("elasticache",
"DescribeCacheClusters")],
Resource=["*"],
Effect=awacs.aws.Allow
)]
),
Roles=[Ref(webserverrole)],
))
webserverinstanceprofile = template.add_resource(iam.InstanceProfile(
'WebServerInstanceProfile',
Path='/',
Roles=[Ref(webserverrole)],
))
webserversg = template.add_resource(ec2.SecurityGroup(
'WebServerSecurityGroup',
GroupDescription='Enable HTTP and SSH access',
SecurityGroupIngress=[
ec2.SecurityGroupRule(
IpProtocol='tcp',
FromPort='22',
ToPort='22',
CidrIp=Ref(sshlocation),
),
ec2.SecurityGroupRule(
IpProtocol='tcp',
FromPort='80',
ToPort='80',
CidrIp='0.0.0.0/0',
)
]
str_yaml = ""
with open(specfile, "r") as fh:
str_yaml = fh.read()
obj_yaml = yaml.load(str_yaml)
# use troposphere to write out a cloud formation template
cfn_template = Template()
nodes = obj_yaml.get('nodes')
nodes = expand_clones(nodes)
obj_yaml['nodes'] = nodes
for node in nodes:
nodename = node.get('nodename')
instance = ec2.Instance(nodename,
Tags = [ec2.Tag("name", nodename)])
keyname = node.get('key')
if keyname:
instance.KeyName = keyname
instance.ImageId = node.get('image')
instance.InstanceType = node.get('instance_type')
cfn_template.add_resource(instance)
# save it to ./dustcluster/clusters/name_region.cfn
cfn_json = cfn_template.to_json()
cluster_spec = obj_yaml.get('cluster')
if not cluster_spec:
raise Exception("No cluster section in template %s" % specfile)
cluster_name = cluster_spec.get('name')
Tags=Tags(type=net_type)))
route_table_name = "%sRouteTable%s" % (name_prefix,
name_suffix)
t.add_resource(ec2.RouteTable(
route_table_name,
VpcId=vpc_id,
Tags=[ec2.Tag('type', net_type)]))
t.add_resource(ec2.SubnetRouteTableAssociation(
"%sRouteTableAssociation%s" % (name_prefix, name_suffix),
SubnetId=Ref(subnet_name),
RouteTableId=Ref(route_table_name)))
if net_type == 'public':
# the public subnets are where the NAT instances live,
# so their default route needs to go to the AWS
# Internet Gateway
t.add_resource(ec2.Route(
"%sRoute%s" % (name_prefix, name_suffix),
RouteTableId=Ref(route_table_name),
DestinationCidrBlock="0.0.0.0/0",
GatewayId=Ref(GATEWAY)))
self.create_nat_instance(i, subnet_name)
else:
# Private subnets are where actual instances will live
# so their gateway needs to be through the nat instances
t.add_resource(ec2.Route(
'%sRoute%s' % (name_prefix, name_suffix),
RouteTableId=Ref(route_table_name),
DestinationCidrBlock='0.0.0.0/0',
InstanceId=Ref(NAT_INSTANCE_NAME % name_suffix)))
for net_type in net_types:
t.add_output(Output(
"%sSubnets" % net_type.capitalize(),
def create_dhcp_options(self):
t = self.template
search_path = NoValue
if self.zone:
search_path = self.zone.Name
self.dhcp_options = t.add_resource(
ec2.DHCPOptions(
"DHCPOptions",
DomainName=search_path,
DomainNameServers=["AmazonProvidedDNS", ],
)
)
t.add_output(
Output(
"DHCPOptionsId",
Value=self.dhcp_options.Ref(),
)
)
self.dhcp_association = t.add_resource(
ec2.VPCDHCPOptionsAssociation(
"VPCDHCPOptionsAssociation",