Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def exit0(msg=''):
if msg:
print(PrintMsg.INFO + msg)
sys.exit(0)
def exit1(msg=''):
if msg:
print(PrintMsg.ERROR + msg)
sys.exit(1)
if self._strict_syntax_json:
self.check_json(cfntemplate, quiet=True, strict=True)
self.template_data = json.loads(cfntemplate)
else:
self.set_template_type(None)
self.check_cfnyaml(cfntemplate, quiet=True, strict=False)
self.set_template_type('yaml')
m_constructor = cfnlint.decode.cfn_yaml.multi_constructor
loader = cfnlint.decode.cfn_yaml.MarkedLoader(cfntemplate, None)
loader.add_multi_constructor('!', m_constructor)
self.template_data = loader.get_single_data()
if self.verbose:
print(PrintMsg.INFO + "|Acquiring tests assets for .......[%s]" % test)
print(PrintMsg.DEBUG + "|S3 Bucket => [%s]" % self.get_s3bucket())
print(PrintMsg.DEBUG + "|Project => [%s]" % self.get_project_name())
print(PrintMsg.DEBUG + "|Template => [%s]" % self.get_template_path())
print(PrintMsg.DEBUG + "|Parameter => [%s]" % self.get_parameter_path())
print(PrintMsg.DEBUG + "|TemplateType => [%s]" % self.get_template_type())
if 'regions' in yamlc['tests'][test]:
if yamlc['tests'][test]['regions'] is not None:
r = yamlc['tests'][test]['regions']
self.set_test_region(list(r))
if self.verbose:
print(PrintMsg.DEBUG + "|Defined Regions:")
for list_o in self.get_test_region():
print("\t\t\t - [%s]" % list_o)
else:
global_regions = self.get_global_region(yamlc)
self.set_test_region(list(global_regions))
def __init__(self, nametag='[taskcat]'):
self.nametag = '{1}{0}{2}'.format(nametag, PrintMsg.name_color, PrintMsg.rst_color)
self._project_name = None
self._project_path = None
self.owner = None
self.banner = None
self.capabilities = []
self.verbose = False
self.config = 'taskcat.yml'
self.test_region = []
self.s3bucket = None
self.s3bucket_type = None
self.template_path = None
self.parameter_path = None
self.default_region = None
self._template_file = None
self._template_type = None
self._parameter_file = None
"""
Returns the content of an object, given the bucket name and the key of the object
:param bucket: Bucket name
:param object_key: Key of the object
:return: Content of the object
"""
s3_client = self._boto_client.get('s3', region=self.get_default_region(), s3v4=True)
try:
dict_object = s3_client.get_object(Bucket=bucket, Key=object_key)
except TaskCatException:
raise
except Exception:
print("{} Attempted to fetch Bucket: {}, Key: {}".format(PrintMsg.ERROR, bucket, object_key))
raise
content = dict_object['Body'].read().decode('utf-8').strip()
return content
if 'regions' in yamlc['tests'][test]:
if yamlc['tests'][test]['regions'] is not None:
r = yamlc['tests'][test]['regions']
self.set_test_region(list(r))
if self.verbose:
print(PrintMsg.DEBUG + "|Defined Regions:")
for list_o in self.get_test_region():
print("\t\t\t - [%s]" % list_o)
else:
global_regions = self.get_global_region(yamlc)
self.set_test_region(list(global_regions))
if self.verbose:
print(PrintMsg.DEBUG + "|Global Regions:")
for list_o in self.get_test_region():
print("\t\t\t - [%s]" % list_o)
print(PrintMsg.PASS + "(Completed) acquisition of [%s]" % test)
print('\n')
def get_stackstatus(self, testdata_list, speed):
"""
Given a list of TestData objects, this function checks the stack status
of each CloudFormation stack and updates the corresponding TestData object
with the status.
:param testdata_list: List of TestData object
:param speed: Interval (in seconds) in which the status has to be checked in loop
"""
active_tests = 1
print('\n')
while active_tests > 0:
current_active_tests = 0
print(PrintMsg.INFO + "{}{} {} [{}]{}".format(
PrintMsg.header,
'AWS REGION'.ljust(15),
'CLOUDFORMATION STACK STATUS'.ljust(25),
'CLOUDFORMATION STACK NAME',
PrintMsg.rst_color))
time_stamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for test in testdata_list:
for stack in test.get_test_stacks():
stackquery = self.stackcheck(str(stack['StackId']))
current_active_tests = stackquery[
3] + current_active_tests
logs = (PrintMsg.INFO + "{3}{0} {1} [{2}]{4}".format(
stackquery[1].ljust(15),
stackquery[2].ljust(25),
stackquery[0],
PrintMsg.highlight,
def _print_upgrade_msg(newversion, nine=False):
if nine:
print("{} !!!!!".format(PrintMsg.INFO))
print("{} Breaking changes have been introduced to the taskcat CLI arguments in the v0.9.x branch.".format(PrintMsg.INFO))
print("{} Please review the migration document *BEFORE* upgrading to v0.9.x!".format(PrintMsg.INFO))
print("{} - https://github.com/aws-quickstart/taskcat/blob/master/README_v0.9_MIGRATION.md".format(PrintMsg.INFO))
print("{} !!!!!".format(PrintMsg.INFO))
print('\n')
print("version %s" % version)
print('\n')
print("{} A newer version of {} is available ({})".format(
PrintMsg.INFO, 'taskcat', newversion))
print('{} To upgrade pip version {}[ pip install --upgrade taskcat]{}'.format(
PrintMsg.INFO, PrintMsg.highlight, PrintMsg.rst_color))
print('{} To upgrade docker version {}[ docker pull taskcat/taskcat ]{}'.format(
PrintMsg.INFO, PrintMsg.highlight, PrintMsg.rst_color))
print('\n')
print(PrintMsg.DEBUG + "Capabilities=%s" % self.get_capabilities())
print(PrintMsg.DEBUG + "Parameters:")
print(PrintMsg.DEBUG + "Tags:%s" % str(self.tags))
if self.get_template_type() == 'json':
print(json.dumps(j_params, sort_keys=True, indent=11, separators=(',', ': ')))
try:
stackdata = cfn.create_stack(
StackName=stackname,
DisableRollback=True,
TemplateURL=self.get_template_path(),
Parameters=j_params,
Capabilities=self.get_capabilities(),
Tags=self.tags
)
print(PrintMsg.INFO + "|CFN Execution mode [create_stack]")
except cfn.exceptions.ClientError as e:
if not str(e).endswith('cannot be used with templates containing Transforms.'):
raise
print(PrintMsg.INFO + "|CFN Execution mode [change_set]")
stack_cs_data = cfn.create_change_set(
StackName=stackname,
TemplateURL=self.get_template_path(),
Parameters=j_params,
Capabilities=self.get_capabilities(),
ChangeSetType="CREATE",
ChangeSetName=stackname + "-cs"
)
change_set_name = stack_cs_data['Id']
# wait for change set
waiter = cfn.get_waiter('change_set_create_complete')
def _print_upgrade_msg(newversion, nine=False):
if nine:
print("{} !!!!!".format(PrintMsg.INFO))
print("{} Breaking changes have been introduced to the taskcat CLI arguments in the v0.9.x branch.".format(PrintMsg.INFO))
print("{} Please review the migration document *BEFORE* upgrading to v0.9.x!".format(PrintMsg.INFO))
print("{} - https://github.com/aws-quickstart/taskcat/blob/master/README_v0.9_MIGRATION.md".format(PrintMsg.INFO))
print("{} !!!!!".format(PrintMsg.INFO))
print('\n')
print("version %s" % version)
print('\n')
print("{} A newer version of {} is available ({})".format(
PrintMsg.INFO, 'taskcat', newversion))
print('{} To upgrade pip version {}[ pip install --upgrade taskcat]{}'.format(
PrintMsg.INFO, PrintMsg.highlight, PrintMsg.rst_color))
print('{} To upgrade docker version {}[ docker pull taskcat/taskcat ]{}'.format(
PrintMsg.INFO, PrintMsg.highlight, PrintMsg.rst_color))
print('\n')