Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_autoscaling_group_parameters(self, launch_config_name, elb_name):
return {
'AvailabilityZones': Ref("AvailabilityZones"),
'LaunchConfigurationName': Ref(launch_config_name),
'MinSize': Ref("MinSize"),
'MaxSize': Ref("MaxSize"),
'VPCZoneIdentifier': Ref("PrivateSubnets"),
'LoadBalancerNames': If("CreateELB", [Ref(elb_name), ], []),
'Tags': [ASTag('Name', self.name, True)],
}
for az_idx in range(1, availability_zones+1):
subnet_id_ref = env_ctx.env_ref_prefix(
segment_id=efs_config.segment,
attribute='az{}.subnet_id'.format(az_idx)
)
subnet_param = self.create_cfn_parameter(
name='SubnetIdAZ{}'.format(az_idx),
param_type='String',
description='The SubnetId for AZ{}.'.format(az_idx),
value=subnet_id_ref,
)
efs_mount_logical_id = self.create_cfn_logical_id('EFSMountTargetAZ{}'.format(az_idx))
troposphere.efs.MountTarget(
title=efs_mount_logical_id,
template=self.template,
FileSystemId=troposphere.Ref(efs_res),
SecurityGroups=troposphere.Ref(sg_list_param),
SubnetId=troposphere.Ref(subnet_param)
)
# Generate the Template
self.set_template()
Parameter("ComputeSecurityGroup", Type="String", Description="Security Group for Mount Target")
)
master_subnet_id = t.add_parameter(
Parameter("MasterSubnetId", Type="String", Description="Master subnet id for master mount target")
)
compute_subnet_id = t.add_parameter(
Parameter(
"ComputeSubnetId",
Type="String",
Description="User provided compute subnet id. Will be use to create compute mount target if needed.",
)
)
create_efs = t.add_condition(
"CreateEFS",
And(Not(Equals(Select(str(0), Ref(efs_options)), "NONE")), Equals(Select(str(1), Ref(efs_options)), "NONE")),
)
create_master_mt = t.add_condition(
"CreateMasterMT",
And(Not(Equals(Select(str(0), Ref(efs_options)), "NONE")), Equals(Select(str(7), Ref(efs_options)), "NONE")),
)
no_mt_in_compute_az = t.add_condition("NoMTInComputeAZ", Equals(Select(str(8), Ref(efs_options)), "NONE"))
use_user_provided_compute_subnet = t.add_condition(
"UseUserProvidedComputeSubnet", Not(Equals(Ref(compute_subnet_id), "NONE"))
)
# Need to create compute mount target if:
# user is providing a compute subnet and
# there is no existing MT in compute subnet's AZ(includes case where master AZ == compute AZ).
#
# If user is not providing a compute subnet, either we are using the master subnet as compute subnet,
# or we will be creating a compute subnet that is in the same AZ as master subnet,
# see ComputeSubnet resource in the main stack.
cerberus_route_table = template.add_resource(RouteTable(
"CerberusRouteTable",
VpcId=Ref(cerberus_vpc),
Tags=cerberus_tags.get_tags_as_list()
))
cerberus_internet_gateway = template.add_resource(InternetGateway(
"CerberusInternetGateway",
Tags=cerberus_tags.get_tags_as_list()
))
cerberus_route_internet_gateway = template.add_resource(Route(
"CerberusRouteInternetGateway",
DestinationCidrBlock=Ref(cerberus_network.gateway_cidr_block_param),
GatewayId=Ref(cerberus_internet_gateway),
RouteTableId=Ref(cerberus_route_table)
))
cerberus_vpc_gateway_attachment = template.add_resource(VPCGatewayAttachment(
"CerberusVpcGatewayAttachment",
InternetGatewayId=Ref(cerberus_internet_gateway),
VpcId=Ref(cerberus_vpc)
))
# Associate the three subnets to the route table
vpc_subnet_resources_by_zone_identifier = {}
for zone_identifier, subnet_cidr_block in cerberus_network.subnet_cidr_block_param_by_az_map.items():
subnet = template.add_resource(Subnet(
"CerberusSubnetForAz{0}".format(zone_identifier),
AvailabilityZone=Ref(az_by_identifier_map[zone_identifier]),
CidrBlock=Ref(subnet_cidr_block),
MapPublicIpOnLaunch=True,
Resource=[
Ref(website_sns_topic)
],
),
],
)
))
t.add_output(Output(
"WebsiteRole",
Description="website_iam_role_arn",
Value=GetAtt(website_role, "Arn"),
))
website_bucket = t.add_resource(s3.Bucket(
'WebsiteS3Bucket',
BucketName=Ref(website_s3_bucket_name),
WebsiteConfiguration=s3.WebsiteConfiguration(
ErrorDocument="error.html",
IndexDocument="index.html"
)
))
t.add_output(Output(
"S3Bucket",
Description="s3_bucket",
Value=Ref(website_bucket),
))
t.add_resource(s3.BucketPolicy(
'WebsiteS3BucketPolicy',
Bucket=Ref(website_bucket),
PolicyDocument={
"Version": "2012-10-17",
"Statement": [
asg.EC2_INSTANCE_LAUNCH,
asg.EC2_INSTANCE_LAUNCH_ERROR,
asg.EC2_INSTANCE_TERMINATE,
asg.EC2_INSTANCE_TERMINATE_ERROR
]
)
],
VPCZoneIdentifier=Ref(self.private_subnets),
Tags=[asg.Tag('Name', 'TileServer', True)]
)
)
self.add_resource(
asg.ScheduledAction(
'schedTileServerAutoScalingStart',
AutoScalingGroupName=Ref(tile_server_asg),
DesiredCapacity=Ref(
self.tile_server_auto_scaling_schedule_start_capacity),
Recurrence=Ref(
self.tile_server_auto_scaling_schedule_start_recurrence)
)
)
self.add_resource(
asg.ScheduledAction(
'schedTileServerAutoScalingEnd',
AutoScalingGroupName=Ref(tile_server_asg),
DesiredCapacity=Ref(
self.tile_server_auto_scaling_schedule_end_capacity),
Recurrence=Ref(
self.tile_server_auto_scaling_schedule_end_recurrence)
)
DestinationCidrBlock="0.0.0.0/0",
GatewayId=Ref(GATEWAY)))
self.create_nat_instance(i, subnet_name)
else:
# Private subnets are where actual instances will live
# so their gateway needs to be through the nat instances
t.add_resource(ec2.Route(
'%sRoute%s' % (name_prefix, name_suffix),
RouteTableId=Ref(route_table_name),
DestinationCidrBlock='0.0.0.0/0',
InstanceId=Ref(NAT_INSTANCE_NAME % name_suffix)))
for net_type in net_types:
t.add_output(Output(
"%sSubnets" % net_type.capitalize(),
Value=Join(",",
[Ref(sn) for sn in subnets[net_type]])))
self.template.add_output(Output(
"AvailabilityZones",
Value=Join(",", zones)))
def define_ec2_instances (t, args):
t.add_resource(EIP(
"CmElasticIp",
Domain = "vpc"
))
t.add_resource(EIP(
"DcdElasticIp",
Domain = "vpc"
))
t.add_resource(NetworkInterface(
"BigIqCmEth0",
Description = "BIG-IQ CM Instance Management IP",
GroupSet = [ Ref(t.resources["SecurityGroup"]) ],
SubnetId = Ref("Subnet1")
))
t.add_resource(NetworkInterface(
"BigIqDcdEth0",
Description = "BIG-IQ DCD Instance Management IP",
GroupSet = [ Ref(t.resources["SecurityGroup"]) ],
SubnetId = Ref("Subnet1")
))
bd_mappings = [
BlockDeviceMapping(
DeviceName = "/dev/xvda",
Ebs = EBSBlockDevice(
DeleteOnTermination = True,
VolumeType = "gp2"
)
description='The Arn of the IAM Role CodePipeline will assume to gain access to the deployment bucket.',
value=action._delegate_role_arn,
)
# CodeDeploy Deploy Action
s3_deploy_action = troposphere.codepipeline.Actions(
Name='S3',
ActionTypeId = troposphere.codepipeline.ActionTypeId(
Category = 'Deploy',
Owner = 'AWS',
Version = '1',
Provider = 'S3'
),
Configuration = {
'BucketName': troposphere.Ref(s3_deploy_bucket_name_param),
'Extract': troposphere.Ref(s3_deploy_extract_param),
'ObjectKey': troposphere.Ref(s3_deploy_object_key_param),
},
InputArtifacts = [
troposphere.codepipeline.InputArtifacts(
Name = 'CodeBuildArtifact'
)
],
RoleArn = troposphere.Ref(s3_deploy_delegate_role_arn_param),
RunOrder = troposphere.If('ManualApprovalIsEnabled', 2, 1)
)
s3_deploy_assume_role_statement = Statement(
Sid='S3AssumeRole',
Effect=Allow,
Action=[
Action('sts', 'AssumeRole'),
],
Resource=[ troposphere.Ref(s3_deploy_delegate_role_arn_param) ]