Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_yaml_spec(self):
httpretty.register_uri(httpretty.POST, "https://sts.amazonaws.com/",
body=open(os.path.join(os.getcwd(), 'test', 'fixtures', 'sts-get-caller-identity-response.xml')).read(),
content_type="application/xml")
httpretty.register_uri(httpretty.POST, "https://api.ecr.us-east-1.amazonaws.com/",
body=open(os.path.join(os.getcwd(), 'test', 'fixtures', 'ecr-repositories-response.json')).read(),
content_type="application/x-amz-json-1.1")
yaml_spec = YamlSpec(self.kubernetes_yml).to_list()
self.assertEqual(yaml_spec[0]['metadata']['name'], 'hello')
self.assertEqual(yaml_spec[0]['spec']['template']['spec']['containers'][0]['name'], 'web')
self.assertEqual(yaml_spec[0]['spec']['template']['spec']['containers'][0]['image'], 'eggs')
self.assertEqual(yaml_spec[0]['spec']['template']['spec']['containers'][1]['name'], 'worker')
self.assertEqual(yaml_spec[0]['spec']['template']['spec']['containers'][1]['image'], 'eggs')
# Run the pre-deploy hook for the canonical app or a review app
if config.pre_deploy and (filename is None or (filename and self.namespace)):
print_green("Running pre-deploy hook '%s'..." % config.pre_deploy, newline_after=True)
return_code = CommandRunner(self.context, namespace=self.namespace).run(tag, config.pre_deploy, constraint=constraint, tty=False)
if return_code:
raise HokusaiError("Pre-deploy hook failed with return code %s" % return_code, return_code=return_code)
# Patch the deployments
deployment_timestamp = datetime.datetime.utcnow().strftime("%s%f")
if filename is None:
yaml_template = TemplateSelector().get(os.path.join(CWD, HOKUSAI_CONFIG_DIR, self.context))
else:
yaml_template = TemplateSelector().get(filename)
yaml_spec = YamlSpec(yaml_template).to_list()
# If a review app, a canary app or the canonical app while updating config,
# bust the deployment cache and populate deployments from the yaml file
if filename or update_config:
self.cache = []
for item in yaml_spec:
if item['kind'] == 'Deployment':
self.cache.append(item)
# If updating config, patch the spec and apply
if update_config:
print_green("Patching Deployments in spec %s with image digest %s" % (yaml_template, digest), newline_after=True)
payload = []
for item in yaml_spec:
if item['kind'] == 'Deployment':
item['spec']['template']['metadata']['labels']['deploymentTimestamp'] = deployment_timestamp
def dev_logs(follow, tail, filename):
if filename is None:
yaml_template = TemplateSelector().get(os.path.join(CWD, HOKUSAI_CONFIG_DIR, DEVELOPMENT_YML_FILE))
else:
yaml_template = TemplateSelector().get(filename)
docker_compose_yml = YamlSpec(yaml_template).to_file()
follow_extends(docker_compose_yml)
opts = ''
if follow:
opts += ' --follow'
if tail:
opts += " --tail=%i" % tail
shout("docker-compose -f %s -p hokusai logs%s" % (docker_compose_yml, opts), print_output=True)
def dev_run(command, service_name, stop, filename):
if filename is None:
yaml_template = TemplateSelector().get(os.path.join(CWD, HOKUSAI_CONFIG_DIR, DEVELOPMENT_YML_FILE))
else:
yaml_template = TemplateSelector().get(filename)
docker_compose_yml = YamlSpec(yaml_template).to_file()
follow_extends(docker_compose_yml)
if service_name is None:
service_name = config.project_name
shout("docker-compose -f %s -p hokusai run %s %s" % (docker_compose_yml, service_name, command), print_output=True)
if stop:
shout("docker-compose -f %s -p hokusai stop" % docker_compose_yml, print_output=True)
def dev_start(build, detach, filename):
if filename is None:
yaml_template = TemplateSelector().get(os.path.join(CWD, HOKUSAI_CONFIG_DIR, DEVELOPMENT_YML_FILE))
else:
yaml_template = TemplateSelector().get(filename)
docker_compose_yml = YamlSpec(yaml_template).to_file()
follow_extends(docker_compose_yml)
def cleanup(*args):
shout("docker-compose -f %s -p hokusai stop" % docker_compose_yml, print_output=True)
for sig in EXIT_SIGNALS:
signal.signal(sig, cleanup)
opts = ''
if build:
Docker().build()
if detach:
opts += ' -d'
if not detach:
print_green("Starting development environment... Press Ctrl+C to stop.")
def dev_status(filename):
if filename is None:
yaml_template = TemplateSelector().get(os.path.join(CWD, HOKUSAI_CONFIG_DIR, DEVELOPMENT_YML_FILE))
else:
yaml_template = TemplateSelector().get(filename)
docker_compose_yml = YamlSpec(yaml_template).to_file()
follow_extends(docker_compose_yml)
shout("docker-compose -f %s -p hokusai ps" % docker_compose_yml, print_output=True)
def dev_stop(filename):
if filename is None:
yaml_template = TemplateSelector().get(os.path.join(CWD, HOKUSAI_CONFIG_DIR, DEVELOPMENT_YML_FILE))
else:
yaml_template = TemplateSelector().get(filename)
docker_compose_yml = YamlSpec(yaml_template).to_file()
follow_extends(docker_compose_yml)
shout("docker-compose -f %s -p hokusai stop" % docker_compose_yml, print_output=True)
def follow_extends(docker_compose_yml):
with open(docker_compose_yml, 'r') as f:
rendered_templates = []
struct = yaml.safe_load(f.read())
for service_name, service_spec in struct['services'].iteritems():
if 'extends' not in service_spec or 'file' not in service_spec['extends']:
continue
extended_filename = service_spec['extends']['file']
extended_template_path = os.path.join(CWD, HOKUSAI_CONFIG_DIR, extended_filename)
if not os.path.isfile(extended_template_path):
extended_template_path = os.path.join(CWD, HOKUSAI_CONFIG_DIR, extended_filename + '.j2')
extended_template = TemplateSelector().get(extended_template_path)
rendered_templates.append(YamlSpec(extended_template).to_file())
return rendered_templates
def dev_clean(filename):
if filename is None:
yaml_template = TemplateSelector().get(os.path.join(CWD, HOKUSAI_CONFIG_DIR, DEVELOPMENT_YML_FILE))
else:
yaml_template = TemplateSelector().get(filename)
docker_compose_yml = YamlSpec(yaml_template).to_file()
follow_extends(docker_compose_yml)
shout("docker-compose -f %s -p hokusai stop" % docker_compose_yml, print_output=True)
shout("docker-compose -f %s -p hokusai rm --force" % docker_compose_yml, print_output=True)