How to use the cfripper.model.enums.RuleGranularity.RESOURCE function in cfripper

To help you get started, we’ve selected a few cfripper examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Skyscanner / cfripper / tests / model / test_result.py View on Github external
def test_result_addition():
    failure1 = Failure(
        granularity=RuleGranularity.STACK, reason="reason1", risk_value="risk1", rule="rule1", rule_mode="mode1",
    )
    failure2 = Failure(
        granularity=RuleGranularity.STACK, reason="reason2", risk_value="risk2", rule="rule2", rule_mode="mode2",
    )
    monitored_failure1 = Failure(
        granularity=RuleGranularity.RESOURCE, reason="reason1", risk_value="risk1", rule="rule1", rule_mode="mode1",
    )
    monitored_failure2 = Failure(
        granularity=RuleGranularity.RESOURCE, reason="reason2", risk_value="risk2", rule="rule2", rule_mode="mode2",
    )
    result1 = Result(failed_rules=[failure1], failed_monitored_rules=[monitored_failure1])
    result2 = Result(failed_rules=[failure2], failed_monitored_rules=[monitored_failure2])
    assert result1 + result2 == Result(
        failed_rules=[failure1, failure2], failed_monitored_rules=[monitored_failure1, monitored_failure2]
    )
github Skyscanner / cfripper / tests / model / test_rule_processor.py View on Github external
actions=None,
            granularity=RuleGranularity.RESOURCE,
        )
    ]
    result.failed_rules = failed_rules

    RuleProcessor.remove_failures_of_whitelisted_resources(config=config, result=result)
    assert result.failed_rules == [
        Failure(
            rule="S3CrossAccountTrustRule",
            reason="Forbidden cross-account policy allow with 123456789 for an S3 bucket.",
            rule_mode=RuleMode.BLOCKING,
            risk_value=RuleRisk.HIGH,
            resource_ids={"thenotwhitelistedthing", "anotherone"},
            actions=None,
            granularity=RuleGranularity.RESOURCE,
        )
github Skyscanner / cfripper / tests / rules / test_CrossAccountTrustRule.py View on Github external
risk_value=RuleRisk.MEDIUM,
            resource_ids={"RootRoleOne"},
            actions=set(),
            granularity=RuleGranularity.RESOURCE,
        ),
        Failure(
            rule="CrossAccountTrustRule",
            reason=(
                "RootRoleTwo has forbidden cross-account trust relationship with "
                "arn:aws:iam::999999999:role/someuser@bla.com"
            ),
            rule_mode=RuleMode.BLOCKING,
            risk_value=RuleRisk.MEDIUM,
            resource_ids={"RootRoleTwo"},
            actions=set(),
            granularity=RuleGranularity.RESOURCE,
        ),
github Skyscanner / cfripper / cfripper / rules / cross_account_trust.py View on Github external
from cfripper.model.enums import RuleGranularity, RuleMode
from cfripper.model.result import Result
from cfripper.model.utils import get_account_id_from_principal
from cfripper.rules.base_rules import PrincipalCheckingRule

logger = logging.getLogger(__file__)


class CrossAccountCheckingRule(PrincipalCheckingRule):
    """
    Base class not intended to be instantiated, but inherited from.
    This class provides common methods used to detect access permissions from other accounts.
    """

    GRANULARITY = RuleGranularity.RESOURCE
    RESOURCE_TYPE: Resource
    PROPERTY_WITH_POLICYDOCUMENT: str

    @property
    def valid_principals(self) -> Set[str]:
        if self._valid_principals is None:
            self._valid_principals = self._get_whitelist_from_config()
            if self._config.aws_account_id:
                self._valid_principals.add(self._config.aws_account_id)
        return self._valid_principals

    def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, self.RESOURCE_TYPE):
                properties = resource.Properties
github Skyscanner / cfripper / cfripper / rules / hardcoded_RDS_password.py View on Github external
...

        Resources:
          RDSCluster:
            Type: AWS::RDS::DBCluster
            DeletionPolicy: "Snapshot"
            Properties:
              ...
              MasterUserPassword: !Ref 'MasterUserPassword'
              ...
        ````
    """

    REASON_DEFAULT = "Default RDS {} password parameter (readable in plain-text) for {}."
    REASON_MISSING_NOECHO = "RDS {} password parameter missing NoEcho for {}."
    GRANULARITY = RuleGranularity.RESOURCE

    def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        password_protected_cluster_ids = []
        instances_to_check = []

        for logical_id, resource in cfmodel.Resources.items():
            # flag insecure RDS Clusters.
            if resource.Type == "AWS::RDS::DBCluster":
                failure_added = self._failure_added(result, logical_id, resource)
                if not failure_added:
                    password_protected_cluster_ids.append(logical_id)

            # keep track of RDS instances so they can be examined in the code below.
            elif resource.Type == "AWS::RDS::DBInstance":
                instances_to_check.append((logical_id, resource))
github Skyscanner / cfripper / cfripper / rules / s3_public_access.py View on Github external
return result


class S3BucketPublicReadWriteAclRule(Rule):
    """
    Checks if any S3 bucket policy has access control set to `PublicReadWrite`.

    Risk:
        Unless required, S3 buckets should not have Public Write available on a bucket. This allows anyone
        to write any objects to your S3 bucket.

    Fix:
        Remove any configuration that looks like `"AccessControl": "PublicReadWrite"` from your S3 bucket policy.
    """

    GRANULARITY = RuleGranularity.RESOURCE
    REASON = "S3 Bucket {} should not have a public read-write acl"
    RISK_VALUE = RuleRisk.HIGH

    def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if (
                resource.Type == "AWS::S3::Bucket"
                and hasattr(resource, "Properties")
                and resource.Properties.get("AccessControl") == "PublicReadWrite"
            ):
                self.add_failure_to_result(result, self.REASON.format(logical_id), resource_ids={logical_id})
        return result
github Skyscanner / cfripper / cfripper / rules / iam_roles.py View on Github external
if action.startswith(prefix):
                                self.add_failure_to_result(
                                    result,
                                    f"Role '{logical_id}' contains an insecure permission '{action}' in policy "
                                    f"'{policy.PolicyName}'",
                                    resource_ids={logical_id},
                                )


class IAMRoleWildcardActionOnPolicyRule(Rule):
    """
    Checks for use of wildcard characters in all IAM Role policies (including `AssumeRolePolicyDocument`)
    and [AWS Managed Policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html).
    """

    GRANULARITY = RuleGranularity.RESOURCE
    REASON = "IAM role {} should not allow a `*` action on its {}"

    def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, IAMRole):
                # check AssumeRolePolicyDocument.
                if resource.Properties.AssumeRolePolicyDocument.allowed_actions_with(REGEX_WILDCARD_POLICY_ACTION):
                    self.add_failure_to_result(
                        result, self.REASON.format(logical_id, "AssumeRolePolicy"), resource_ids={logical_id},
                    )

                # check other policies of the IAM role.
                if resource.Properties.Policies:
                    for policy in resource.Properties.Policies:
                        if policy.PolicyDocument.allowed_actions_with(REGEX_WILDCARD_POLICY_ACTION):
github Skyscanner / cfripper / cfripper / rules / wildcard_policies.py View on Github external
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule

logger = logging.getLogger(__file__)


class GenericWildcardPolicyRule(Rule):
    """
    Abstract rule that checks for use of the wildcard `*` character in Actions of Policy Documents of AWS Resources.
    This rule must be inherited by another class to be used, with `AWS_RESOURCE` set to the resource to be checked.
    See `S3BucketPolicyWildcardActionRule` and `SQSQueuePolicyWildcardActionRule` for examples.
    """

    REASON = "The {} {} should not allow a `*` action"

    GRANULARITY = RuleGranularity.RESOURCE

    AWS_RESOURCE: Type[Resource] = None

    def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        if self.AWS_RESOURCE is None:
            logger.warning(f"Not running {type(self).__name__} rule as AWS_RESOURCE is not defined.")
        else:
            for logical_id, resource in cfmodel.Resources.items():
                if isinstance(resource, self.AWS_RESOURCE):
                    print(resource.Properties.PolicyDocument)
                if isinstance(resource, self.AWS_RESOURCE) and resource.Properties.PolicyDocument.allowed_actions_with(
                    REGEX_HAS_STAR_OR_STAR_AFTER_COLON
                ):
                    self.add_failure_to_result(
                        result, self.REASON.format(self.AWS_RESOURCE.__name__, logical_id), resource_ids={logical_id},
github Skyscanner / cfripper / cfripper / rules / CrossAccountTrustRule.py View on Github external
import re

from pycfmodel.model.resources.iam_role import IAMRole

from ..config.regex import REGEX_CROSS_ACCOUNT_ROOT
from ..model.enums import RuleGranularity, RuleMode
from ..model.principal_checking_rule import PrincipalCheckingRule

logger = logging.getLogger(__file__)


class CrossAccountTrustRule(PrincipalCheckingRule):

    REASON = "{} has forbidden cross-account trust relationship with {}"
    ROOT_PATTERN = re.compile(REGEX_CROSS_ACCOUNT_ROOT)
    GRANULARITY = RuleGranularity.RESOURCE

    def invoke(self, cfmodel):
        not_has_account_id = re.compile(rf"^((?!{self._config.aws_account_id}).)*$")
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, IAMRole):
                for principal in resource.Properties.AssumeRolePolicyDocument.allowed_principals_with(
                    self.ROOT_PATTERN
                ):
                    self.add_failure(
                        type(self).__name__, self.REASON.format(logical_id, principal), resource_ids={logical_id}
                    )

                if self._config.aws_account_id:
                    for principal in resource.Properties.AssumeRolePolicyDocument.allowed_principals_with(
                        not_has_account_id
                    ):
github Skyscanner / cfripper / cfripper / rules / privilege_escalation.py View on Github external
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule


class PrivilegeEscalationRule(Rule):
    """
    Checks for any dangerous IAM actions that could allow privilege escalation and potentially
    represent a large security risk.
    See [current blacklisted IAM actions](https://github.com/Skyscanner/cfripper/blob/master/cfripper/rules/privilege_escalation.py#L29).

    Fix:
        Unless strictly necessary, do not use actions in the IAM action blacklist. CloudFormation files that do require these
        actions should be added to the whitelist.
    """

    GRANULARITY = RuleGranularity.RESOURCE
    REASON = "{} has blacklisted IAM action {}"
    IAM_BLACKLIST = set(
        action.lower()
        for action in [
            "iam:CreateAccessKey",
            "iam:CreateLoginProfile",
            "iam:UpdateLoginProfile",
            "iam:AttachUserPolicy",
            "iam:AttachGroupPolicy",
            "iam:AttachRolePolicy",
            "iam:PutUserPolicy",
            "iam:PutGroupPolicy",
            "iam:PutRolePolicy",
            "iam:CreatePolicy",
            "iam:AddUserToGroup",
            "iam:UpdateAssumeRolePolicy",