Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if self.template['Parameters'][util[3]]['Default'] == 'Auto':
with open(os.path.dirname(os.path.abspath(__file__)) + "/functions/%s/template.snippet" % util[1], 'r') as stream:
snippet = CFNYAMLHandler.ordered_safe_load(stream)
if not os.path.isfile(os.path.dirname(os.path.abspath(__file__)) + "/functions/%s/requirements.txt" % util[1]):
with open(os.path.dirname(os.path.abspath(__file__)) + "/functions/%s/lambda_function.py" % util[1], 'r') as stream:
function_code = stream.read()
snippet['Resources']['AWSSBInjected%sLambda' % util[2]]['Properties']['Code']['ZipFile'] = function_code
else:
self._inject_copy_zips()
bucket, key = self._publish_lambda_zip(os.path.dirname(os.path.abspath(__file__)) + "/functions/%s/" % util[1], util[1])
snippet['Resources']['AWSSBInjected%sLambda' % util[2]]['Properties']['Code']['S3Bucket'] = '!Ref AWSSBInjectedLambdaZipsBucket'
snippet['Resources']['AWSSBInjected%sLambda' % util[2]]['Properties']['Code']['S3Key'] = key
snippet['Resources']['AWSSBInjected%sLambda' % util[2]]['Properties']['Handler'] = 'lambda_function.handler'
snippet['Resources']['AWSSBInjected%sLambda' % util[2]]['Properties']['Code'].pop('ZipFile')
self.template['Resources']['AWSSBInjectedCopyZips']['Properties']['Objects'].append(util[1] + '/lambda_function.zip')
temp_template = CFNYAMLHandler.ordered_safe_dump(self.template, default_flow_style=False).replace(
"!Ref %s" % util[3],
"!If [ %s, !GetAtt AWSSBInjected%s.%s, !Ref %s ]" % (util[4], util[2], util[3], util[3])
)
self.template = CFNYAMLHandler.ordered_safe_load(temp_template)
self.template['Resources'] = OrderedDict({**self.template['Resources'], **snippet['Resources']})
self.template['Conditions'] = OrderedDict({**self.template['Conditions'], **snippet['Conditions']})
def _upload_template(self):
for k in list(self.template.keys()):
if not self.template[k]:
self.template.pop(k)
tpl = CFNYAMLHandler.ordered_safe_dump(self.template, default_flow_style=False)
key = os.path.join(self.key_prefix, 'templates/%s/template.yaml' % self.service_name)
self.s3_client.put_object(Body=tpl, Bucket=self.bucket_name, Key=key, ACL=self.s3acl)
return self.bucket_name, key
t['asb_encode_binding']['fields'][camel_convert(b).upper()] = "{{ cfn.stack_outputs.%s }}" % b
description = ""
if "Description" in template['Outputs'][b].keys():
description = template['Outputs'][b]["Description"]
full_bindings.append({"name": camel_convert(b).upper(), "description": description})
elif 'block' in t.keys():
for it in t['block']:
if it['name'] == 'Create Resources':
if 'Parameters' in template.keys():
for p in template['Parameters'].keys():
default = ""
if 'Default' in template['Parameters'][p].keys():
default = template['Parameters'][p]['Default']
it['cloudformation']['template_parameters'][p] = '{{ %s | default("%s") | string }}' % (p, default)
with open(tmpname + '/apb/roles/aws-provision-apb/tasks/main.yml', 'w') as f:
f.write(CFNYAMLHandler.ordered_safe_dump(main_provision_task, default_flow_style=False))
with open(tmpname + '/template.yaml', 'w') as f:
f.write(CFNYAMLHandler.ordered_safe_dump(template, default_flow_style=False))
render_documentation(apb_spec, template, prescribed_parameters, tmpname, full_bindings, add_iam)
return tmpname
print("build path: %s" % tmpname)
shutil.copytree(os.path.dirname(os.path.abspath(__file__)) + '/data/apb_template/', tmpname + '/apb')
for dname, dirs, files in os.walk(tmpname):
for fname in files:
fpath = os.path.join(dname, fname)
if not fname.endswith('.zip'):
with open(fpath) as f:
s = f.read()
s = s.replace("${SERVICE_NAME}", service_name).replace("${SERVICE_NAME_UPPER}", service_name.upper()).replace('${CREATE_IAM_USER}', str(bindings['IAMUser']))
with open(fpath, "w") as f:
f.write(s)
for plan in prescribed_parameters.keys():
prescribed_parameters[plan]['params_string'] = "{{ namespace }}::{{ _apb_plan_id }}::{{ _apb_service_class_id }}::{{ _apb_service_instance_id }}"
prescribed_parameters[plan]['params_hash'] = "{{ params_string | checksum }}"
with open(tmpname + '/apb/roles/aws-provision-apb/vars/%s.yml' % plan, "w") as f:
f.write(CFNYAMLHandler.ordered_safe_dump(prescribed_parameters[plan], default_flow_style=False))
shutil.copy(tmpname + '/apb/roles/aws-provision-apb/vars/%s.yml' % plan, tmpname + '/apb/roles/aws-deprovision-apb/vars/%s.yml' % plan)
with open(tmpname + '/apb/apb.yml', "w") as f:
f.write(CFNYAMLHandler.ordered_safe_dump(apb_spec, default_flow_style=False))
with open(tmpname + '/apb/roles/aws-provision-apb/tasks/main.yml') as f:
main_provision_task = yaml.load(f)
create_user = False
try:
create_user = template['Metadata']['AWS::ServiceBroker::Specification']['Bindings']['IAM']['AddKeypair']
except KeyError as e:
pass
full_bindings = []
try:
add_iam = bool(template['Metadata']['AWS::ServiceBroker::Specification']['Bindings']['IAM']['AddKeypair'])
except KeyError:
add_iam = False
if add_iam:
if template_raw_data[0] in ['{', '['] and template_raw_data[-1] in ['}', ']']:
logger.info('Detected JSON. Loading file.')
FILE_FORMAT = 'JSON'
template_data = json.load(open(current_file, 'r', newline=None), object_pairs_hook=OrderedDict)
else:
logger.info('Detected YAML. Loading file.')
FILE_FORMAT = 'YAML'
template_data = utils.CFNYAMLHandler.ordered_safe_load(open(current_file, 'r', newline=None), object_pairs_hook=OrderedDict)
with open(current_file, 'w') as updated_template:
logger.info("Writing file [{}]".format(current_file))
if FILE_FORMAT == 'JSON':
updated_template.write(json.dumps(template_data, indent=4, separators=(',', ': ')))
elif FILE_FORMAT == 'YAML':
updated_template.write(utils.CFNYAMLHandler.ordered_safe_dump(template_data, indent=2, allow_unicode=True, default_flow_style=False, explicit_start=True, explicit_end=True))
updated_template.close()
else:
logger.warning("File type not supported. Please use .template file.")
continue
if "Description" in template['Outputs'][b].keys():
description = template['Outputs'][b]["Description"]
full_bindings.append({"name": camel_convert(b).upper(), "description": description})
elif 'block' in t.keys():
for it in t['block']:
if it['name'] == 'Create Resources':
if 'Parameters' in template.keys():
for p in template['Parameters'].keys():
default = ""
if 'Default' in template['Parameters'][p].keys():
default = template['Parameters'][p]['Default']
it['cloudformation']['template_parameters'][p] = '{{ %s | default("%s") | string }}' % (p, default)
with open(tmpname + '/apb/roles/aws-provision-apb/tasks/main.yml', 'w') as f:
f.write(CFNYAMLHandler.ordered_safe_dump(main_provision_task, default_flow_style=False))
with open(tmpname + '/template.yaml', 'w') as f:
f.write(CFNYAMLHandler.ordered_safe_dump(template, default_flow_style=False))
render_documentation(apb_spec, template, prescribed_parameters, tmpname, full_bindings, add_iam)
return tmpname
for fname in files:
fpath = os.path.join(dname, fname)
if not fname.endswith('.zip'):
with open(fpath) as f:
s = f.read()
s = s.replace("${SERVICE_NAME}", service_name).replace("${SERVICE_NAME_UPPER}", service_name.upper()).replace('${CREATE_IAM_USER}', str(bindings['IAMUser']))
with open(fpath, "w") as f:
f.write(s)
for plan in prescribed_parameters.keys():
prescribed_parameters[plan]['params_string'] = "{{ namespace }}::{{ _apb_plan_id }}::{{ _apb_service_class_id }}::{{ _apb_service_instance_id }}"
prescribed_parameters[plan]['params_hash'] = "{{ params_string | checksum }}"
with open(tmpname + '/apb/roles/aws-provision-apb/vars/%s.yml' % plan, "w") as f:
f.write(CFNYAMLHandler.ordered_safe_dump(prescribed_parameters[plan], default_flow_style=False))
shutil.copy(tmpname + '/apb/roles/aws-provision-apb/vars/%s.yml' % plan, tmpname + '/apb/roles/aws-deprovision-apb/vars/%s.yml' % plan)
with open(tmpname + '/apb/apb.yml', "w") as f:
f.write(CFNYAMLHandler.ordered_safe_dump(apb_spec, default_flow_style=False))
with open(tmpname + '/apb/roles/aws-provision-apb/tasks/main.yml') as f:
main_provision_task = yaml.load(f)
create_user = False
try:
create_user = template['Metadata']['AWS::ServiceBroker::Specification']['Bindings']['IAM']['AddKeypair']
except KeyError as e:
pass
full_bindings = []
try:
add_iam = bool(template['Metadata']['AWS::ServiceBroker::Specification']['Bindings']['IAM']['AddKeypair'])
except KeyError:
add_iam = False
if add_iam:
full_bindings.append({
"name": apb_spec['name'].upper() + '_AWS_ACCESS_KEY_ID',
"description": 'AWS IAM Access Key ID, your application must use this for authenticating runtime calls to the %s service' % apb_spec['metadata']['displayName']