Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
e_msg = EmailMessage(policy=SMTP)
subject = chevron.render(subject, data=ctx)
e_msg['Subject'] = subject
e_msg['From'] = e_from
if reply_to:
e_msg['Reply-To'] = reply_to
e_msg['To'] = f'{full_name} <{user_email}>' if full_name else user_email
e_msg['List-Unsubscribe'] = '<{unsubscribe_link}>'.format(**ctx)
e_msg['X-SES-CONFIGURATION-SET'] = 'nosht'
e_msg['X-SES-MESSAGE-TAGS'] = ', '.join(f'{k}={v}' for k, v in tags.items())
if DEBUG_PRINT_REGEX.search(body):
ctx['__debug_context__'] = f'```{json.dumps(ctx, indent=2)}```'
body = apply_macros(body)
body = chevron.render(body, data=ctx)
raw_body = re.sub(r'\n{3,}', '\n\n', body).strip('\n')
e_msg.set_content(raw_body, cte='quoted-printable')
ctx.update(
styles=STYLES,
main_message=safe_markdown(raw_body),
message_preview=shorten(strip_markdown(raw_body), 60, placeholder='…'),
)
if markup_data:
ctx['markup_data'] = json.dumps(markup_data, separators=(',', ':'))
html_body = chevron.render(template, data=ctx, partials_dict={'title': title})
e_msg.add_alternative(html_body, subtype='html', cte='quoted-printable')
if attachment:
maintype, subtype = attachment.mime_type.split('/')
e_msg.add_attachment(
attachment.content.encode(), maintype=maintype, subtype=subtype, filename=attachment.filename,
# render the template
parameters = {
'SERVICE_ACCOUNT_EMAIL': service_account_email,
'ZONE': instance_config.zone,
'MACHINE_TYPE': instance_config.machine_type,
'SOURCE_IMAGE': image_link,
'STARTUP_SCRIPT': fix_indents_for_lines(startup_script, template, '{{{STARTUP_SCRIPT}}}'),
'MACHINE_NAME': machine_name,
'PREEMPTIBLE': 'false' if instance_config.on_demand else 'true',
'GPU_TYPE': instance_config.gpu['type'] if instance_config.gpu else '',
'GPU_COUNT': instance_config.gpu['count'] if instance_config.gpu else 0,
'DISK_ATTACHMENTS': disk_attachments,
'PUB_KEY_VALUE': public_key_value,
'PORTS': ', '.join([str(port) for port in set(container.config.ports + [22])]),
}
template = chevron.render(template, parameters)
# print some information about the deployment
output.write('- image URL: ' + '/'.join(image_link.split('/')[-5:]))
output.write('- zone: ' + instance_config.zone)
output.write('- on-demand VM' if instance_config.on_demand else '- preemptible VM')
output.write(('- GPUs: %d x %s' % (instance_config.gpu['count'], instance_config.gpu['type']))
if instance_config.gpu else '- no GPUs')
# print name of the volume where Docker data will be stored
if instance_config.docker_data_root:
docker_data_volume_name = [volume.name for volume in volumes
if is_subdir(instance_config.docker_data_root, volume.mount_dir)][0]
output.write('- Docker data will be stored on the "%s" volume' % docker_data_volume_name)
return template
def replace_macro(m):
arg_values = [a.strip(' ') for a in m.group(1).split('|') if a.strip(' ')]
if len(macro['args']) != len(arg_values):
raise RuntimeError(f'invalid macro call "{m.group()}"')
else:
return chevron.render(macro['body'], data=dict(zip(macro['args'], arg_values)))
def __call__(self, issue, format='long'):
template = self.templates[format]
return chevron.render(template, issue)
def prepare_image_template(instance_config: InstanceConfig, machine_name: str, scr_image_link: str, image_family: str,
service_account_email: str, stack_version: str, public_key_value: str = '',
debug_mode: bool = False):
"""Prepares deployment template to run an instance."""
# read and update the template
with open(os.path.join(os.path.dirname(__file__), 'image', 'template.yaml')) as f:
template = f.read()
# render startup script
startup_script = open(os.path.join(os.path.dirname(__file__), 'image', 'cloud_init.yaml'), 'r').read()
startup_script = chevron.render(startup_script, {
'MACHINE_NAME': machine_name,
'ZONE': instance_config.zone,
'IMAGE_NAME': instance_config.image_name,
'IMAGE_FAMILY': image_family if image_family else '',
'STACK_VERSION': stack_version,
'NVIDIA_DRIVER_VERSION': '410',
'DOCKER_CE_VERSION': '18.09.3',
'NVIDIA_DOCKER_VERSION': '2.0.3',
'DEBUG_MODE': debug_mode,
})
# render the template
parameters = {
'SERVICE_ACCOUNT_EMAIL': service_account_email,
'ZONE': instance_config.zone,
'MACHINE_TYPE': instance_config.machine_type,
def check_mustache(cls, v):
try:
chevron.render(v, data={})
except (ChevronError, IndexError):
raise ValueError('invalid mustache template')
return v
def prepare_instance_profile_template(managed_policy_arns: list):
with open(os.path.join(os.path.dirname(__file__), 'data', 'instance_profile.yaml')) as f:
content = f.read()
parameters = {
'HAS_MANAGED_POLICIES': len(managed_policy_arns),
'MANAGED_POLICY_ARNS': [{'MANAGED_POLICY_ARN': arn} for arn in managed_policy_arns]
}
template = chevron.render(content, parameters)
return template
exist in the template.
"""
tokens = list(tokenize(template))
template_keys = set()
for tag, key in tokens:
if tag not in ['literal', 'no escape', 'variable', 'set delimiter']:
raise ValueError('Script templates support only variables and delimiter changes.')
template_keys.add(key)
# check that the script contains keys for all provided parameters
for key in params:
if key not in template_keys:
raise ValueError('Parameter "%s" doesn\'t exist in the script.' % key)
return chevron.render(tokens, params)
async def index(request):
redis = await request.app['downloader'].get_redis()
data = await redis.lrange(R_OUTPUT, 0, -1)
results = [r.decode() for r in data]
session = await get_session(request)
html = chevron.render(html_template, {'message': session.get('message'), 'results': results})
session.invalidate()
return web.Response(text=html, content_type='text/html')