Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["ec2:DeleteInternetGateway"], "Resource": ["*"]}
],
},
}
],
},
}
},
}
result = Result()
rule = IAMRolesOverprivilegedRule(None, result)
rule.check_managed_policies = Mock()
resources = pycfmodel.parse(role_props).resources
rule.invoke(resources, [])
rule.check_managed_policies.assert_called()
assert not result.valid
assert (
result.failed_rules[0].reason
== 'Role "RootRole" contains an insecure permission "ec2:DeleteInternetGateway" in policy "not_so_chill_policy"'
)
assert result.failed_rules[0].rule == "IAMRolesOverprivilegedRule"
def test_flow():
cf_model = pycfmodel.parse(test_cf)
transformer = ManagedPolicyTransformer(cf_model)
iam_client = Mock()
iam_client.get_policy = Mock(return_value={"Policy": {"DefaultVersionId": "TestV"}})
iam_client.get_policy_version = Mock(return_value=test_policy)
transformer.iam_client = iam_client
transformer.transform_managed_policies()
test_iam_role = cf_model.resources["AWS::IAM::Role"][0]
assert len(test_iam_role.policies) == 1
assert test_iam_role.policies[0].policy_name == "AutoTransformedManagedPolicyTestV"
{
"PolicyName": "chill_policy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["ec2:DescribeInstances"], "Resource": "*"}
],
},
}
],
},
}
},
}
resource = pycfmodel.parse(role_props).resources
result = Result()
rule = IAMRolesOverprivilegedRule(None, result)
rule.check_managed_policies = Mock()
rule.invoke(resource, [])
rule.check_managed_policies.assert_called()
assert result.valid
assert len(result.failed_rules) == 0
"InstanceSecurityGroup": {
"Type": "AWS::EC2::SecurityGroup",
"Properties": {
"GroupDescription": "Allow http to client host",
"VpcId": "VPCID",
"SecurityGroupIngress": [
{"IpProtocol": "tcp", "FromPort": 80, "ToPort": 80, "CidrIp": {"Ref": "IPValueIngress"}}
],
"SecurityGroupEgress": [
{"IpProtocol": "tcp", "FromPort": 80, "ToPort": 80, "CidrIp": {"Ref": "IPValueEgress"}}
],
},
}
},
}
model = parse(template).resolve(extra_params={"IPValueIngress": "1.1.1.1/16", "IPValueEgress": "127.0.0.1"})
assert (
model.Resources["InstanceSecurityGroup"].Properties.SecurityGroupIngress[0].CidrIp.with_netmask
== "1.1.0.0/255.255.0.0"
)
assert not model.Resources["InstanceSecurityGroup"].Properties.SecurityGroupIngress[0].CidrIp.is_private
assert (
model.Resources["InstanceSecurityGroup"].Properties.SecurityGroupEgress[0].CidrIp.with_netmask
== "127.0.0.1/255.255.255.255"
)
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["IAM:CREATEPOLICY"],
"Resource": ["arn:aws:glue:eu-west-1:12345678:catalog"],
}
],
},
"Roles": "some_role",
},
}
},
}
resource = pycfmodel.parse(role_props).resources
result = Result()
rule = PrivilegeEscalationRule(None, result)
rule.invoke(resource, [])
assert not result.valid
assert len(result.failed_rules) == 1
"Version": "2012-10-17",
},
"PolicyName": "ProdCredentialStoreAccessPolicy",
},
]
}
],
},
}
},
}
result = Result()
rule = IAMRolesOverprivilegedRule(None, result)
rule.check_managed_policies = Mock()
resources = pycfmodel.parse(role_props).resources
rule.invoke(resources, [])
rule.check_managed_policies.assert_called()
assert not result.valid
assert (
result.failed_rules[0].reason
== 'Role "RootRole" contains an insecure permission "ec2:DeleteVpc" in policy "ProdCredentialStoreAccessPolicy"'
)
assert result.failed_rules[0].rule == "IAMRolesOverprivilegedRule"
def template(self):
dir_path = os.path.dirname(os.path.realpath(__file__))
with open(f"{dir_path}/test_templates/iam_role_with_wildcard_action.json") as cf_script:
cf_template = convert_json_or_yaml_to_dict(cf_script.read())
return pycfmodel.parse(cf_template)
def test_with_templates(cf_path):
with open(cf_path) as cf_script:
cf_template = convert_json_or_yaml_to_dict(cf_script.read())
config = Config(project_name=cf_path, service_name=cf_path, stack_name=cf_path, rules=DEFAULT_RULES.keys())
# Scan result
cfmodel = pycfmodel.parse(cf_template).resolve()
rules = [DEFAULT_RULES.get(rule)(config) for rule in config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(cfmodel, config)
# Use this to print the stack if there'IAMManagedPolicyWildcardActionRule an error
if len(result.exceptions):
print(cf_path)
traceback.print_tb(result.exceptions[0].__traceback__)
assert len(result.exceptions) == 0
def statement_2():
return Statement(**{"Effect": "Allow", "Action": "action1", "NotAction": ["action2"], "Resource": ["arn"]})
def statement_principal_1():
return Statement(**{"Principal": {"AWS": "arn:aws:iam::123456789012:root"}})