Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["ec2:DeleteInternetGateway"], "Resource": ["*"]}
],
},
}
],
},
}
},
}
result = Result()
rule = IAMRolesOverprivilegedRule(None, result)
rule.check_managed_policies = Mock()
resources = pycfmodel.parse(role_props).resources
rule.invoke(resources, [])
rule.check_managed_policies.assert_called()
assert not result.valid
assert (
result.failed_rules[0].reason
== 'Role "RootRole" contains an insecure permission "ec2:DeleteInternetGateway" in policy "not_so_chill_policy"'
)
assert result.failed_rules[0].rule == "IAMRolesOverprivilegedRule"
def test_flow():
cf_model = pycfmodel.parse(test_cf)
transformer = ManagedPolicyTransformer(cf_model)
iam_client = Mock()
iam_client.get_policy = Mock(return_value={"Policy": {"DefaultVersionId": "TestV"}})
iam_client.get_policy_version = Mock(return_value=test_policy)
transformer.iam_client = iam_client
transformer.transform_managed_policies()
test_iam_role = cf_model.resources["AWS::IAM::Role"][0]
assert len(test_iam_role.policies) == 1
assert test_iam_role.policies[0].policy_name == "AutoTransformedManagedPolicyTestV"
{
"PolicyName": "chill_policy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["ec2:DescribeInstances"], "Resource": "*"}
],
},
}
],
},
}
},
}
resource = pycfmodel.parse(role_props).resources
result = Result()
rule = IAMRolesOverprivilegedRule(None, result)
rule.check_managed_policies = Mock()
rule.invoke(resource, [])
rule.check_managed_policies.assert_called()
assert result.valid
assert len(result.failed_rules) == 0
"InstanceSecurityGroup": {
"Type": "AWS::EC2::SecurityGroup",
"Properties": {
"GroupDescription": "Allow http to client host",
"VpcId": "VPCID",
"SecurityGroupIngress": [
{"IpProtocol": "tcp", "FromPort": 80, "ToPort": 80, "CidrIp": {"Ref": "IPValueIngress"}}
],
"SecurityGroupEgress": [
{"IpProtocol": "tcp", "FromPort": 80, "ToPort": 80, "CidrIp": {"Ref": "IPValueEgress"}}
],
},
}
},
}
model = parse(template).resolve(extra_params={"IPValueIngress": "1.1.1.1/16", "IPValueEgress": "127.0.0.1"})
assert (
model.Resources["InstanceSecurityGroup"].Properties.SecurityGroupIngress[0].CidrIp.with_netmask
== "1.1.0.0/255.255.0.0"
)
assert not model.Resources["InstanceSecurityGroup"].Properties.SecurityGroupIngress[0].CidrIp.is_private
assert (
model.Resources["InstanceSecurityGroup"].Properties.SecurityGroupEgress[0].CidrIp.with_netmask
== "127.0.0.1/255.255.255.255"
)
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["IAM:CREATEPOLICY"],
"Resource": ["arn:aws:glue:eu-west-1:12345678:catalog"],
}
],
},
"Roles": "some_role",
},
}
},
}
resource = pycfmodel.parse(role_props).resources
result = Result()
rule = PrivilegeEscalationRule(None, result)
rule.invoke(resource, [])
assert not result.valid
assert len(result.failed_rules) == 1
"Version": "2012-10-17",
},
"PolicyName": "ProdCredentialStoreAccessPolicy",
},
]
}
],
},
}
},
}
result = Result()
rule = IAMRolesOverprivilegedRule(None, result)
rule.check_managed_policies = Mock()
resources = pycfmodel.parse(role_props).resources
rule.invoke(resources, [])
rule.check_managed_policies.assert_called()
assert not result.valid
assert (
result.failed_rules[0].reason
== 'Role "RootRole" contains an insecure permission "ec2:DeleteVpc" in policy "ProdCredentialStoreAccessPolicy"'
)
assert result.failed_rules[0].rule == "IAMRolesOverprivilegedRule"
def template(self):
dir_path = os.path.dirname(os.path.realpath(__file__))
with open(f"{dir_path}/test_templates/iam_role_with_wildcard_action.json") as cf_script:
cf_template = convert_json_or_yaml_to_dict(cf_script.read())
return pycfmodel.parse(cf_template)
def test_with_templates(cf_path):
with open(cf_path) as cf_script:
cf_template = convert_json_or_yaml_to_dict(cf_script.read())
config = Config(project_name=cf_path, service_name=cf_path, stack_name=cf_path, rules=DEFAULT_RULES.keys())
# Scan result
cfmodel = pycfmodel.parse(cf_template).resolve()
rules = [DEFAULT_RULES.get(rule)(config) for rule in config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(cfmodel, config)
# Use this to print the stack if there'IAMManagedPolicyWildcardActionRule an error
if len(result.exceptions):
print(cf_path)
traceback.print_tb(result.exceptions[0].__traceback__)
assert len(result.exceptions) == 0
def get_cfmodel(template: TextIOWrapper) -> CFModel:
template = convert_json_or_yaml_to_dict(template.read())
cfmodel = pycfmodel.parse(template)
return cfmodel
rules=DEFAULT_RULES.keys(),
event=event.get("event"),
template_url=event.get("stack_template_url", "N/A"),
aws_region=event.get("region", "N/A"),
aws_account_name=event.get("account", {}).get("name", "N/A"),
aws_account_id=event.get("account", {}).get("id", "N/A"),
aws_user_agent=event.get("user_agent", "N/A"),
)
logger.info("Scan started for: {}; {}; {};".format(config.project_name, config.service_name, config.stack_name))
rules = [DEFAULT_RULES.get(rule)(config) for rule in config.rules]
processor = RuleProcessor(*rules)
# TODO get AWS variables/parameters and pass them to resolve
cfmodel = pycfmodel.parse(template).resolve()
result = processor.process_cf_template(cfmodel=cfmodel, config=config, extras=extras)
perform_logging(result, config, event)
return {
"valid": result.valid,
"reason": ",".join(["{}-{}".format(r.rule, r.reason) for r in result.failed_rules]),
"failed_rules": [
failure.serializable() for failure in RuleProcessor.remove_debug_rules(rules=result.failed_rules)
],
"exceptions": [x.args[0] for x in result.exceptions],
"warnings": [
failure.serializable() for failure in RuleProcessor.remove_debug_rules(rules=result.failed_monitored_rules)
],