Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_with_valid_role_managed_policy():
role_props = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"RootRole": {
"Type": "AWS::IAM::Role",
"Properties": {"Path": "/", "ManagedPolicyArns": ["arn:aws:iam::aws:policy/YadaYadaYada"]},
}
},
}
result = Result()
rule = IAMRolesOverprivilegedRule(None, result)
rule.check_inline_policies = Mock()
resources = pycfmodel.parse(role_props).resources
rule.invoke(resources, [])
rule.check_inline_policies.assert_called()
assert result.valid
assert len(result.failed_rules) == 0
def test_with_invalid_role_managed_policy():
role_props = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"RootRole": {
"Type": "AWS::IAM::Role",
"Properties": {"Path": "/", "ManagedPolicyArns": ["arn:aws:iam::aws:policy/AdministratorAccess"]},
}
},
}
result = Result()
rule = IAMRolesOverprivilegedRule(None, result)
resources = pycfmodel.parse(role_props).resources
rule.invoke(resources, [])
assert not result.valid
assert (
result.failed_rules[0].reason
== "Role RootRole has forbidden Managed Policy arn:aws:iam::aws:policy/AdministratorAccess"
)
assert result.failed_rules[0].rule == "IAMRolesOverprivilegedRule"
def test_iam_role_with_wildcard_action_on_trust(iam_role_with_wildcard_action_on_trust):
result = Result()
rule = IAMRoleWildcardActionOnTrustPolicyRule(None, result)
rule.invoke(iam_role_with_wildcard_action_on_trust)
assert not result.valid
assert len(result.failed_rules) == 1
assert len(result.failed_monitored_rules) == 0
assert result.failed_rules[0].rule == "IAMRoleWildcardActionOnTrustPolicyRule"
assert (
result.failed_rules[0].reason == "IAM role WildcardActionRole should not allow any * action on its trust policy"
)
{
"PolicyName": "not_so_chill_policy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["ec2:DeleteInternetGateway"], "Resource": "*"}
],
},
}
],
},
}
},
}
result = Result()
rule = IAMRolesOverprivilegedRule(None, result)
rule.check_managed_policies = Mock()
resources = pycfmodel.parse(role_props).resources
rule.invoke(resources, [])
rule.check_managed_policies.assert_called()
assert not result.valid
assert (
result.failed_rules[0].reason
== 'Role "RootRole" contains an insecure permission "ec2:DeleteInternetGateway" in policy "not_so_chill_policy"'
)
assert result.failed_rules[0].rule == "IAMRolesOverprivilegedRule"
def test_sqs_queue_with_wildcards(sqs_queue_with_wildcards):
result = Result()
rule = SQSQueuePolicyWildcardActionRule(None, result)
rule.invoke(sqs_queue_with_wildcards)
assert not result.valid
assert len(result.failed_rules) == 4
assert len(result.failed_monitored_rules) == 0
assert result.failed_rules[0].rule == "SQSQueuePolicyWildcardActionRule"
assert result.failed_rules[0].reason == "SQS Queue policy mysqspolicy1 should not allow * action"
assert result.failed_rules[1].rule == "SQSQueuePolicyWildcardActionRule"
assert result.failed_rules[1].reason == "SQS Queue policy mysqspolicy1b should not allow * action"
assert result.failed_rules[2].rule == "SQSQueuePolicyWildcardActionRule"
assert result.failed_rules[2].reason == "SQS Queue policy mysqspolicy1c should not allow * action"
assert result.failed_rules[3].rule == "SQSQueuePolicyWildcardActionRule"
assert result.failed_rules[3].reason == "SQS Queue policy mysqspolicy1d should not allow * action"
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMPolicy):
policy_actions = set(action.lower() for action in resource.Properties.PolicyDocument.get_iam_actions())
for violation in policy_actions.intersection(self.IAM_BLACKLIST):
self.add_failure_to_result(
result, self.REASON.format(logical_id, violation), resource_ids={logical_id}
)
return result
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, S3BucketPolicy):
for statement in resource.Properties.PolicyDocument._statement_as_list():
for principal in statement.get_principal_list():
account_id = get_account_id_from_principal(principal)
if not account_id:
continue
if account_id not in self.valid_principals:
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} "
f"because there are conditions: {statement.Condition}"
)
else:
self.add_failure_to_result(
result, self.REASON.format(logical_id, account_id), resource_ids={logical_id},
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, SecurityGroup) and not resource.Properties.SecurityGroupEgress:
self.add_failure_to_result(
result,
self.REASON.format(logical_id),
resource_ids={logical_id},
context={"config": self._config, "extras": extras, "logical_id": logical_id, "resource": resource},
)
return result
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMRole):
self.check_managed_policies(result, logical_id, resource)
self.check_inline_policies(result, logical_id, resource)
return result
"user_agent": String,
}
:param context:
:return:
"""
setup_logging()
if not event.get("stack_template_url") and not event.get("stack", {}).get("name"):
raise ValueError("Invalid event type: no parameter 'stack_template_url' or 'stack::name' in request.")
template = get_template(event)
extras = get_extras(event)
if not template:
# In case of an invalid script log a warning and return early
result = Result()
result.add_exception(TypeError(f"Malformed Event - could not parse!! Event: {str(event)}"))
logger.exception(f"Malformed Event - could not parse!! Event: {str(event)}")
return {"valid": True, "reason": "", "failed_rules": [], "exceptions": [x.args[0] for x in result.exceptions]}
# Process Rules
config = Config(
project_name=event.get("project"),
service_name=event.get("serviceName"),
stack_name=event.get("stack", {}).get("name"),
rules=DEFAULT_RULES.keys(),
event=event.get("event"),
template_url=event.get("stack_template_url", "N/A"),
aws_region=event.get("region", "N/A"),
aws_account_name=event.get("account", {}).get("name", "N/A"),
aws_account_id=event.get("account", {}).get("id", "N/A"),
aws_user_agent=event.get("user_agent", "N/A"),