Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __call__(self, parser, namespace, values, option_string=None):
if len(values.split('=')) != 2:
raise TaskCatException("tags must be in the format TagKey=TagValue")
n, v = values.split('=')
try:
getattr(namespace, 'tags')
except AttributeError:
setattr(namespace, 'tags', [])
namespace.tags.append({"Key": n, "Value": v})
def validate_output_dir(directory):
if os.path.isfile(directory):
directory = os.path.split(directory)[0]
if not os.path.isdir(directory):
LOG.info(
"Directory [{}] does not exist. Trying to create it.".format(directory)
)
os.makedirs(directory)
elif not os.access(directory, os.W_OK):
raise TaskCatException(
f"No write access allowed to output directory "
f"[{directory}]. Aborting."
def _get_file_list(self, input_path):
if not self._file_list:
_file_list = []
if os.path.isfile(input_path):
_file_list.append(input_path)
elif os.path.isdir(input_path):
for root, dirs, files in os.walk(input_path):
for _current_file in files:
if not _current_file.endswith(tuple(self._GIT_EXT)):
_file_list.append(os.path.join(root, _current_file))
for directory in self._EXCLUDED_DIRS:
if directory in dirs:
dirs.remove(directory)
else:
raise TaskCatException("Directory/File is non-existent. Aborting.")
self._file_list = _file_list
return self._file_list
:param project_root: base path for project
:param strict: fail on lint warnings as well as errors
"""
project_root_path: Path = Path(project_root).expanduser().resolve()
input_file_path: Path = project_root_path / input_file
config = Config.create(
project_root=project_root_path, project_config_path=input_file_path
)
templates = config.get_templates(project_root_path)
lint = TaskCatLint(config, templates, strict)
errors = lint.lints[1]
lint.output_results()
if errors or not lint.passed:
raise TaskCatException("Lint failed with errors")
:param quiet: Optional value, if set True suppress verbose output
:param strict: Optional value, Display errors and exit
:return: TRUPrintMsg.ERROR if given yaml is valid, FALSE otherwise.
"""
try:
loader = cfnlint.decode.cfn_yaml.MarkedLoader(yamlin, None)
loader.add_multi_constructor('!', cfnlint.decode.cfn_yaml.multi_constructor)
if self.verbose:
if not quiet:
print(loader.get_single_data())
except TaskCatException:
raise
except Exception as e:
if strict:
raise TaskCatException(str(e))
return False
return True
if args.boto_profile is not None:
if args.aws_access_key is not None or args.aws_secret_key is not None:
parser.error("Cannot use boto profile -P (--boto_profile)" +
"with --aws_access_key or --aws_secret_key")
print(parser.print_help())
raise TaskCatException("Cannot use boto profile -P (--boto_profile)" +
"with --aws_access_key or --aws_secret_key")
if args.public_s3_bucket:
self.public_s3_bucket = True
if args.no_cleanup_failed:
if args.no_cleanup:
parser.error("Cannot use -n (--no_cleanup) with -N (--no_cleanup_failed)")
print(parser.print_help())
raise TaskCatException("Cannot use -n (--no_cleanup) with -N (--no_cleanup_failed)")
self.retain_if_failed = True
return args
# (NOTE: physical id will be missing if stack creation is failed)
elif (
resource.get("ResourceType") != "AWS::CloudFormation::Stack"
and "PhysicalResourceId" in resource
):
d = {
"logicalId": resource.get("LogicalResourceId"),
"physicalId": resource.get("PhysicalResourceId"),
"resourceType": resource.get("ResourceType"),
}
l_resources.append(d)
except TaskCatException:
raise
except Exception as e:
log.error(str(e))
raise TaskCatException(
"Unable to get resources for stack %s" % stackname
)
This function validates the given YAML.
:param yamlin: Yaml object to be validated
:param quiet: Optional value, if set True suppress verbose output
:param strict: Optional value, Display errors and exit
:return: TRUPrintMsg.ERROR if given yaml is valid, FALSE otherwise.
"""
try:
parms = yaml.safe_load(yaml)
if self.verbose:
if not quiet:
print(yaml.safe_dump(parms))
except yaml.YAMLError as e:
if strict:
raise TaskCatException(str(e))
return False
return True
def _find_children(self) -> None: # noqa: C901
children = set()
if "Resources" not in self.template:
raise TaskCatException(
f"did not receive a valid template: {self.template_path} does not "
f"have a Resources section"
)
for resource in self.template["Resources"].keys():
resource = self.template["Resources"][resource]
if resource["Type"] == "AWS::CloudFormation::Stack":
child_name = self._template_url_to_path(
resource["Properties"]["TemplateURL"]
)
if child_name:
children.add(child_name)
for child in children:
child_template_instance = None
for descendent in self.descendents:
if str(descendent.template_path) == str(child):
child_template_instance = descendent
print(parser.print_help())
raise TaskCatException(
"Cannot use boto profile -P (--boto_profile) with --aws_access_key or --aws_secret_"
"key"
)
if not args.config_yml:
parser.error("-c (--config_yml) not passed (Config File Required!)")
print(parser.print_help())
raise TaskCatException("-c (--config_yml) not passed (Config File Required!)")
if args.no_cleanup_failed:
if args.no_cleanup:
parser.error("Cannot use -n (--no_cleanup) with -N (--no_cleanup_failed)")
print(parser.print_help())
raise TaskCatException(
"Cannot use -n (--no_cleanup) with -N (--no_cleanup_failed)"
)
return args