Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _display_final_status(stacker: TaskcatStacker):
for final_stack in stacker.stacks:
LOG.info("{}stack {} {}".format("\u250f ", "\u24c2", final_stack.name))
if final_stack.descendants():
for nested_stack in final_stack.descendants():
LOG.info(
"{}stack {} {}".format("\u2523 ", "\u24c3", nested_stack.name)
)
LOG.info("{} region: {}".format("\u2523", final_stack.region_name))
LOG.info(
"{}status: {}{} {}".format(
"\u2517 ", PrintMsg.white, final_stack.status, PrintMsg.rst_color
)
def _print_upgrade_msg(new_version, version):
LOG.info("version %s\n" % version, extra={"nametag": ""})
LOG.warning("A newer version of %s is available (%s)", NAME, new_version)
LOG.info(
"To upgrade pip version %s[ pip install --upgrade %s]%s",
PrintMsg.highlight,
NAME,
PrintMsg.rst_color,
)
LOG.info(
"To upgrade docker version %s[ docker pull %s/%s ]%s\n",
PrintMsg.highlight,
NAME,
NAME,
PrintMsg.rst_color,
)
def filter(self, record):
if "nametag" in dir(record):
record.color_loglevel = record.nametag
else:
record.color_loglevel = getattr(PrintMsg, record.levelname)
return True
def _sync(
self, local_list, s3_list, bucket, prefix, acl, threads=16
): # pylint: disable=too-many-locals
# determine which files to remove from S3
remove_from_s3 = []
for s3_file in s3_list.keys():
if s3_file not in local_list.keys() and not self._exclude_remote(s3_file):
LOG.info(
f"s3://{bucket}/{prefix + prefix + s3_file}",
extra={"nametag": PrintMsg.S3DELETE},
)
remove_from_s3.append({"Key": prefix + s3_file})
# deleting objects, max 1k objects per s3 delete_objects call
for objects in [
remove_from_s3[i : i + 1000] for i in range(0, len(remove_from_s3), 1000)
]:
response = self.s3_client.delete_objects(
Bucket=bucket, Delete={"Objects": objects}
)
if "Errors" in response.keys():
for error in response["Errors"]:
LOG.error("S3 delete error: %s" % str(error))
raise TaskCatException("Failed to delete one or more files from S3")
# build list of files to upload
upload_to_s3 = []
for local_file in local_list:
config: Config, templates: Dict[str, Template], buckets: dict
) -> None:
# TODO: validation should happen concurrently, recursively and be run against all
# test regions
_validated_templates: List[Path] = []
for test_name, test in config.config.tests.items():
if test.template in _validated_templates:
continue
try:
region = test.regions[0] if test.regions else "us-east-1"
templates[test_name].validate(region, buckets[test_name][region].name)
_validated_templates.append(test.template)
LOG.info(
f"Validated template: {str(test.template)}",
extra={"nametag": PrintMsg.PASS},
)
except Exception as e:
LOG.debug(f"Exception: {str(e)}", exc_info=True)
raise TaskCatException(f"Unable to validate {test.template}")
def _s3_upload_file(paths, prefix, s3_client, acl):
local_filename, bucket, s3_path = paths
retry = 0
# backoff and retry
while retry < 5:
LOG.info(
f"s3://{bucket}/{prefix + s3_path}", extra={"nametag": PrintMsg.S3}
)
try:
s3_client.upload_file(
local_filename, bucket, prefix + s3_path, ExtraArgs={"ACL": acl}
)
break
except Exception as e: # pylint: disable=broad-except
retry += 1
LOG.error("S3 upload error: %s" % e)
# give up if we've exhausted retries, or if the error is not-retryable
# ie. AccessDenied
if retry == 5 or (
isinstance(e, S3UploadFailedError) and "(AccessDenied)" in str(e)
):
raise TaskCatException("Failed to upload to S3")
time.sleep(retry * 2)
def _print_stack_tree(stack, buffer):
padding_1 = " "
buffer.append(
"{}{}stack {} {}".format(padding_1, "\u250f ", "\u24c2", stack.name)
)
if stack.descendants():
for nested_stack in stack.descendants():
buffer.append(
"{}{}stack {} {}".format(
padding_1, "\u2523 ", "\u24c3", nested_stack.name
)
)
buffer.append("{}{} region: {}".format(padding_1, "\u2523", stack.region_name))
buffer.append(
"{}{}status: {}{}{}".format(
padding_1, "\u2517 ", PrintMsg.white, stack.status, PrintMsg.rst_color
)
def _print_upgrade_msg(new_version, version):
LOG.info("version %s\n" % version, extra={"nametag": ""})
LOG.warning("A newer version of %s is available (%s)", NAME, new_version)
LOG.info(
"To upgrade pip version %s[ pip install --upgrade %s]%s",
PrintMsg.highlight,
NAME,
PrintMsg.rst_color,
)
LOG.info(
"To upgrade docker version %s[ docker pull %s/%s ]%s\n",
PrintMsg.highlight,
NAME,
NAME,
PrintMsg.rst_color,
)