Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Update our service definition, task definition and tasks from the live
versions in AWS.
"""
self.__aws_service = self.__get_service()
if not self.scaling:
# This only gets executed if we don't have an "application_scaling"
# section in our service YAML definition.
#
# But we're looking here for an autoscaling setup that we previously
# had created but which we no longer want
self.scaling = ApplicationAutoscaling(self.serviceName, self.clusterName)
if not self.scaling.exists():
self.scaling = None
if self.__aws_service:
self.active_task_definition = TaskDefinition(self.taskDefinition)
# If we have helper tasks, update them from AWS now
helpers = self.active_task_definition.get_helper_tasks()
if helpers:
for t in self.tasks.values():
t.from_aws(helpers[t.family])
if self.__aws_service['serviceRegistries']:
self.serviceDiscovery = ServiceDiscovery(self.service_discovery[0]['registryArn'])
else:
self.serviceDiscovery = None
else:
self.active_task_definition = None
def __setattr__(self, attr, value):
if attr in ['family', 'networkMode', 'taskRoleArn', 'requiresCompatibilities', 'executionRoleArn', 'cpu', 'memory']:
setattr(self, "_" + attr, value)
else:
super(TaskDefinition, self).__setattr__(attr, value)
def from_aws(self):
"""
Update our task definition from the most recent version in AWS.
"""
task_definition_id = self.desired_task_definition.get_latest_revision()
if task_definition_id:
self.active_task_definition = TaskDefinition(task_definition_id)
else:
self.active_task_definition = None
def from_aws(self, taskDefinition):
self.active_task_definition = TaskDefinition(taskDefinition)
else:
self.clusterName = 'default'
if 'vpc_configuration' in yml:
self.set_vpc_configuration(
yml['vpc_configuration']
)
if 'placement_constraints' in yml:
self.placementConstraints = yml['placement_constraints']
if 'placement_strategy' in yml:
self.placementStrategy = yml['placement_strategy']
if 'count' in yml:
self.desired_count = yml['count']
if 'platform_version' in yml:
self.platform_version = yml['platform_version']
self.desired_task_definition = TaskDefinition(yml=yml)
deployfish_environment = {
"DEPLOYFISH_TASK_NAME": yml['name'],
"DEPLOYFISH_ENVIRONMENT": yml.get('environment', 'undefined'),
"DEPLOYFISH_CLUSTER_NAME": self.clusterName
}
self.desired_task_definition.inject_environment(deployfish_environment)
parameters = []
if 'config' in yml:
parameters = yml['config']
self.parameter_store = ParameterStore("task-{}".format(self.taskName), self.clusterName, yml=parameters)
if 'schedule' in yml:
self.schedule_expression = yml['schedule']
if 'schedule_role' in yml:
self.schedule_role = yml['schedule_role']
if 'group' in yml:
self.group = yml['group']
def __init__(self, clusterName, yml={}):
"""
:param clusterName: the name of the cluster in which we'll run our
helper tasks
:type clusterName: string
:param yml: the task definition information for the task from our
deployfish.yml file
:type yml: dict
"""
self.clusterName = clusterName
self.ecs = get_boto3_session().client('ecs')
self.commands = {}
self.from_yaml(yml)
self.desired_task_definition = TaskDefinition(yml=yml)
self.active_task_definition = None