How to use the cfripper.model.result.Result function in cfripper

To help you get started, we’ve selected a few cfripper examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Skyscanner / cfripper / tests / test_rules_iam_roles_overprivileged.py View on Github external
def test_with_valid_role_managed_policy():
    role_props = {
        "AWSTemplateFormatVersion": "2010-09-09",
        "Resources": {
            "RootRole": {
                "Type": "AWS::IAM::Role",
                "Properties": {"Path": "/", "ManagedPolicyArns": ["arn:aws:iam::aws:policy/YadaYadaYada"]},
            }
        },
    }

    result = Result()
    rule = IAMRolesOverprivilegedRule(None, result)
    rule.check_inline_policies = Mock()
    resources = pycfmodel.parse(role_props).resources
    rule.invoke(resources, [])
    rule.check_inline_policies.assert_called()

    assert result.valid
    assert len(result.failed_rules) == 0
github Skyscanner / cfripper / tests / test_rules_iam_roles_overprivileged.py View on Github external
def test_with_invalid_role_managed_policy():
    role_props = {
        "AWSTemplateFormatVersion": "2010-09-09",
        "Resources": {
            "RootRole": {
                "Type": "AWS::IAM::Role",
                "Properties": {"Path": "/", "ManagedPolicyArns": ["arn:aws:iam::aws:policy/AdministratorAccess"]},
            }
        },
    }

    result = Result()
    rule = IAMRolesOverprivilegedRule(None, result)
    resources = pycfmodel.parse(role_props).resources
    rule.invoke(resources, [])

    assert not result.valid
    assert (
        result.failed_rules[0].reason
        == "Role RootRole has forbidden Managed Policy arn:aws:iam::aws:policy/AdministratorAccess"
    )
    assert result.failed_rules[0].rule == "IAMRolesOverprivilegedRule"
github Skyscanner / cfripper / tests / rules / test_IAMRoleWildcardActionOnTrustPolicyRule.py View on Github external
def test_iam_role_with_wildcard_action_on_trust(iam_role_with_wildcard_action_on_trust):
    result = Result()
    rule = IAMRoleWildcardActionOnTrustPolicyRule(None, result)
    rule.invoke(iam_role_with_wildcard_action_on_trust)

    assert not result.valid
    assert len(result.failed_rules) == 1
    assert len(result.failed_monitored_rules) == 0
    assert result.failed_rules[0].rule == "IAMRoleWildcardActionOnTrustPolicyRule"
    assert (
        result.failed_rules[0].reason == "IAM role WildcardActionRole should not allow any * action on its trust policy"
    )
github Skyscanner / cfripper / tests / test_rules_iam_roles_overprivileged.py View on Github external
{
                            "PolicyName": "not_so_chill_policy",
                            "PolicyDocument": {
                                "Version": "2012-10-17",
                                "Statement": [
                                    {"Effect": "Allow", "Action": ["ec2:DeleteInternetGateway"], "Resource": "*"}
                                ],
                            },
                        }
                    ],
                },
            }
        },
    }

    result = Result()
    rule = IAMRolesOverprivilegedRule(None, result)
    rule.check_managed_policies = Mock()
    resources = pycfmodel.parse(role_props).resources
    rule.invoke(resources, [])
    rule.check_managed_policies.assert_called()

    assert not result.valid
    assert (
        result.failed_rules[0].reason
        == 'Role "RootRole" contains an insecure permission "ec2:DeleteInternetGateway" in policy "not_so_chill_policy"'
    )
    assert result.failed_rules[0].rule == "IAMRolesOverprivilegedRule"
github Skyscanner / cfripper / tests / rules / test_SQSQueuePolicyWildcardActionRule.py View on Github external
def test_sqs_queue_with_wildcards(sqs_queue_with_wildcards):
    result = Result()
    rule = SQSQueuePolicyWildcardActionRule(None, result)
    rule.invoke(sqs_queue_with_wildcards)

    assert not result.valid
    assert len(result.failed_rules) == 4
    assert len(result.failed_monitored_rules) == 0
    assert result.failed_rules[0].rule == "SQSQueuePolicyWildcardActionRule"
    assert result.failed_rules[0].reason == "SQS Queue policy mysqspolicy1 should not allow * action"
    assert result.failed_rules[1].rule == "SQSQueuePolicyWildcardActionRule"
    assert result.failed_rules[1].reason == "SQS Queue policy mysqspolicy1b should not allow * action"
    assert result.failed_rules[2].rule == "SQSQueuePolicyWildcardActionRule"
    assert result.failed_rules[2].reason == "SQS Queue policy mysqspolicy1c should not allow * action"
    assert result.failed_rules[3].rule == "SQSQueuePolicyWildcardActionRule"
    assert result.failed_rules[3].reason == "SQS Queue policy mysqspolicy1d should not allow * action"
github Skyscanner / cfripper / cfripper / rules / privilege_escalation.py View on Github external
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, IAMPolicy):
                policy_actions = set(action.lower() for action in resource.Properties.PolicyDocument.get_iam_actions())
                for violation in policy_actions.intersection(self.IAM_BLACKLIST):
                    self.add_failure_to_result(
                        result, self.REASON.format(logical_id, violation), resource_ids={logical_id}
                    )
        return result
github Skyscanner / cfripper / cfripper / rules / s3_bucket_policy.py View on Github external
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, S3BucketPolicy):
                for statement in resource.Properties.PolicyDocument._statement_as_list():
                    for principal in statement.get_principal_list():
                        account_id = get_account_id_from_principal(principal)
                        if not account_id:
                            continue
                        if account_id not in self.valid_principals:
                            if statement.Condition and statement.Condition.dict():
                                logger.warning(
                                    f"Not adding {type(self).__name__} failure in {logical_id} "
                                    f"because there are conditions: {statement.Condition}"
                                )
                            else:
                                self.add_failure_to_result(
                                    result, self.REASON.format(logical_id, account_id), resource_ids={logical_id},
github Skyscanner / cfripper / cfripper / rules / ec2_security_group.py View on Github external
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, SecurityGroup) and not resource.Properties.SecurityGroupEgress:
                self.add_failure_to_result(
                    result,
                    self.REASON.format(logical_id),
                    resource_ids={logical_id},
                    context={"config": self._config, "extras": extras, "logical_id": logical_id, "resource": resource},
                )
        return result
github Skyscanner / cfripper / cfripper / rules / iam_roles.py View on Github external
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, IAMRole):
                self.check_managed_policies(result, logical_id, resource)
                self.check_inline_policies(result, logical_id, resource)
        return result
github Skyscanner / cfripper / cfripper / main.py View on Github external
"user_agent": String,
    }
    :param context:
    :return:
    """

    setup_logging()
    if not event.get("stack_template_url") and not event.get("stack", {}).get("name"):
        raise ValueError("Invalid event type: no parameter 'stack_template_url' or 'stack::name' in request.")

    template = get_template(event)
    extras = get_extras(event)

    if not template:
        # In case of an invalid script log a warning and return early
        result = Result()
        result.add_exception(TypeError(f"Malformed Event - could not parse!! Event: {str(event)}"))
        logger.exception(f"Malformed Event - could not parse!! Event: {str(event)}")
        return {"valid": True, "reason": "", "failed_rules": [], "exceptions": [x.args[0] for x in result.exceptions]}

    # Process Rules
    config = Config(
        project_name=event.get("project"),
        service_name=event.get("serviceName"),
        stack_name=event.get("stack", {}).get("name"),
        rules=DEFAULT_RULES.keys(),
        event=event.get("event"),
        template_url=event.get("stack_template_url", "N/A"),
        aws_region=event.get("region", "N/A"),
        aws_account_name=event.get("account", {}).get("name", "N/A"),
        aws_account_id=event.get("account", {}).get("id", "N/A"),
        aws_user_agent=event.get("user_agent", "N/A"),