Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sub_with_vars_not_unpakaged(self):
s = 'foo ${AWS::Region} ${sub1} ${sub2}'
values = {'sub1': 'uno', 'sub2': 'dos'}
raw = Sub(s, values)
actual = raw.to_dict()
expected = {'Fn::Sub': ['foo ${AWS::Region} ${sub1} ${sub2}', values]}
self.assertEqual(expected, actual)
def create_policy(self, name):
statements = self.generate_policy_statements()
if not statements:
return
t = self.template
policy = t.add_resource(
iam.PolicyType(
"{}Policy".format(name),
PolicyName=Sub("${AWS::StackName}-${Name}-policy", Name=name),
PolicyDocument=Policy(
Statement=statements,
),
Roles=[Ref(role) for role in self.roles],
)
)
t.add_output(
Output(name + "PolicyName", Value=Ref(policy))
)
self.policies.append(policy)
iam.Policy(
PolicyName='EksServiceRolePolicy',
PolicyDocument=PolicyDocument(
Statement=[
Statement(
Action=[awacs.iam.CreateServiceLinkedRole,
awacs.iam.PutRolePolicy],
Condition=Condition(
StringLike(
'iam:AWSServiceName',
'elasticloadbalancing.amazonaws.com' # noqa
)
),
Effect=Allow,
Resource=[
Sub('arn:aws:iam::${AWS::AccountId}:role/' # noqa
'aws-service-role/'
'elasticloadbalancing.amazonaws.com/' # noqa
'AWSServiceRoleForElasticLoadBalancing*') # noqa
]
)
]
)
)
]
)
)
ekscluster = template.add_resource(
eks.Cluster(
'EksCluster',
Name=variables['EksClusterName'].ref,
NoValue
),
SecurityGroups=[nodesecuritygroup.ref()],
SpotPrice=If('SetSpotPrice',
variables['SpotBidPrice'].ref,
NoValue),
BlockDeviceMappings=[autoscaling.BlockDeviceMapping(
DeviceName='/dev/xvda',
Ebs=autoscaling.EBSBlockDevice(
VolumeSize=variables['NodeVolumeSize'].ref,
VolumeType='gp2',
DeleteOnTermination=True
)
)],
UserData=Base64(
Sub('\n'.join([
'#!/bin/bash',
'set -o xtrace',
'/etc/eks/bootstrap.sh ${ClusterName} ${BootstrapArguments}', # noqa
'/opt/aws/bin/cfn-signal --exit-code $? \\',
'--stack ${AWS::StackName} \\',
'--resource NodeGroup \\',
'--region ${AWS::Region}'
]))
)
)
)
template.add_resource(
autoscaling.AutoScalingGroup(
'NodeGroup',
DesiredCapacity=If(
AliasTarget=route53.AliasTarget(
DNSName=GetAtt(example_distribution, 'DomainName'),
HostedZoneId=FindInMap(hosted_zone_map, Ref(AWS_REGION), 'CloudFront'),
),
Comment=Sub('DNS for ${AWS::StackName}'),
HostedZoneName=Join('', [Ref(param_hosted_zone_name), '.']),
Name=domain_name,
Type='A',
))
template.add_resource(route53.RecordSetType(
"DomainAAAA",
AliasTarget=route53.AliasTarget(
DNSName=GetAtt(example_distribution, 'DomainName'),
HostedZoneId=FindInMap(hosted_zone_map, Ref(AWS_REGION), 'CloudFront'),
),
Comment=Sub('DNS for ${AWS::StackName}'),
HostedZoneName=Join('', [Ref(param_hosted_zone_name), '.']),
Name=domain_name,
Type='AAAA',
))
cfnutils.output.write_template_to_file(template)
Version='2012-10-17',
Statement=[
Statement(
Effect=Allow,
Action=[Action('sts', 'AssumeRole')],
Principal=Principal('Service', 'lambda.amazonaws.com'),
)
],
),
ManagedPolicyArns=[
'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole',
'arn:aws:iam::aws:policy/service-role/AWSLambdaRole',
],
Policies=[
iam.Policy(
PolicyName=Sub('${AWS::StackName}CustomAcmCertificateLambdaExecutionPolicy'),
PolicyDocument=PolicyDocument(
Version='2012-10-17',
Statement=[
Statement(
Effect=Allow,
Action=[
Action('acm', 'AddTagsToCertificate'),
Action('acm', 'DeleteCertificate'),
Action('acm', 'DescribeCertificate'),
Action('acm', 'RemoveTagsFromCertificate'),
],
Resource=[Sub('arn:aws:acm:*:${AWS::AccountId}:certificate/*')],
),
Statement(
Effect=Allow,
KmsKeyArn=kms_key_arn,
Environment=awslambda.Environment(
Variables={
'api_key': GetAtt(api_key, "CiphertextBase64"),
'application_key': GetAtt(application_key, "CiphertextBase64"),
"LOG_LEVEL": Ref(log_level)
}
)
))
t.add_output(Output(
"MonitorDatadogLambdaArn",
Description="Monitor lambda arn",
Value=GetAtt(datadog_monitor_lambda, "Arn"),
Export=Export(
Sub(
"${AWS::StackName}-MonitorDatadogLambdaArn"
)
)
))
t.add_output(Output(
"TimeboardDatadogLambdaArn",
Description="Timeboard lamdba arn",
Value=GetAtt(datadog_timeboard_lambda, "Arn"),
Export=Export(
Sub(
"${AWS::StackName}-TimeboardDatadogLambdaArn"
)
)
))
]
)
template.add_resource(iam_role_resource)
cfn_export_dict["Integration"]["Credentials"] = troposphere.GetAtt(iam_role_resource, "Arn")
elif method.integration.integration_type == 'AWS':
# Enable Lambda (custom) integration
# When send to a Lambda (Custom) the HTTP Method must always be POST regardless of
# the HttpMethod
cfn_export_dict["Integration"]["IntegrationHttpMethod"] = "POST"
lambda_permission_resource = troposphere.awslambda.Permission(
self.create_cfn_logical_id('LambdaPermissionApiGateway' + method.name),
Action = 'lambda:InvokeFunction',
FunctionName = method.parameter_arn_ref,
Principal = 'apigateway.amazonaws.com',
SourceArn = troposphere.Sub(
"arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${%s}/*/%s/" % (
restapi_logical_id, method.http_method
)
)
)
template.add_resource(lambda_permission_resource)
# look-up the method_names and assign a Ref to the model resource
# ToDo: validate model_names in the model
responses = []
for method_response in method.method_responses:
response_dict = {"StatusCode": method_response.status_code}
if method_response.response_models:
response_dict["ResponseModels"] = {}
for response_model in method_response.response_models:
for model in self.apigatewayrestapi.models.values():
),
)
else:
pipeline_policy_statement_list.append(
Statement(
Sid='S3Access',
Effect=Allow,
Action=[
Action('s3', 'PutObject'),
Action('s3', 'GetBucketPolicy'),
Action('s3', 'GetObject'),
Action('s3', 'ListBucket'),
],
Resource=[
troposphere.Sub('arn:aws:s3:::${ArtifactsBucketName}/*'),
troposphere.Sub('arn:aws:s3:::${ArtifactsBucketName}')
]
),
)
if self.lambda_invoke_enabled:
pipeline_policy_statement_list.append(
Statement(
Sid='LambdaInvoke',
Effect=Allow,
Action=[
Action('lambda', 'InvokeFunction'),
],
Resource=['*'],
)
)
if self.codebuild_access_enabled:
pipeline_policy_statement_list.append(
param_laearn = template.add_parameter(Parameter(
"ParamLaeArn",
Type=constants.STRING,
Description="ARN of the Lambda@Edge function",
))
template.set_parameter_label(param_laearn, "Lambda@Edge ARN")
cloudformation_tags = template.add_resource(custom_resources.cloudformation.Tags("CfnTags"))
# Don't simply import-output the Lambda@Edge ARN, but do it via a Parameter
# This allows us to migrate to a new L@E function gradually (otherwise, the output value would be locked and can't
# change)
lae_arn = template.add_resource(custom_resources.ssm.Parameter(
"LaeArn",
Name=Sub('/${AWS::StackName}/lae-arn'),
Type="String",
Value=Ref(param_laearn),
Tags=GetAtt(cloudformation_tags, 'TagList'),
))
template.add_output(Output(
"LaeArnParameter",
Description='SSM Parameter containing the Lambda@Edge ARN',
Value=Ref(lae_arn),
Export=Export(Join('-', [Ref(AWS_STACK_NAME), 'lae-arn'])),
))
template.add_output(Output(
"DomainTable",
Description='DynamoDB table containing the authorized domains',
Value=ImportValue(Join('-', [Ref(param_authorizer_stack), "DomainTable"])),
Export=Export(Join('-', [Ref(AWS_STACK_NAME), 'DomainTable'])),