How to use the cfripper.model.enums.RuleGranularity function in cfripper

To help you get started, we’ve selected a few cfripper examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Skyscanner / cfripper / tests / rules / test_CrossAccountTrustRule.py View on Github external
def expected_result_two_roles():
    return [
        Failure(
            rule="CrossAccountTrustRule",
            reason=(
                "RootRoleOne has forbidden cross-account trust relationship with "
                "arn:aws:iam::999999999:role/someuser@bla.com"
            ),
            rule_mode=RuleMode.BLOCKING,
            risk_value=RuleRisk.MEDIUM,
            resource_ids={"RootRoleOne"},
            actions=set(),
            granularity=RuleGranularity.RESOURCE,
        ),
        Failure(
            rule="CrossAccountTrustRule",
            reason=(
                "RootRoleTwo has forbidden cross-account trust relationship with "
                "arn:aws:iam::999999999:role/someuser@bla.com"
            ),
            rule_mode=RuleMode.BLOCKING,
            risk_value=RuleRisk.MEDIUM,
            resource_ids={"RootRoleTwo"},
            actions=set(),
            granularity=RuleGranularity.RESOURCE,
        ),
github Skyscanner / cfripper / tests / model / test_result.py View on Github external
def test_result_valid_after_removing_failures():
    result = Result()
    result.add_failure(
        rule="mock_rule",
        reason="mock_reason",
        rule_mode=RuleMode.BLOCKING,
        risk_value=RuleRisk.HIGH,
        granularity=RuleGranularity.STACK,
    )
    # Result has a blocking failure, so it should be invalid
    assert result.valid is False

    result.failed_rules = []
    # Result has no failures, so it should be valid
    assert result.valid is True
github Skyscanner / cfripper / cfripper / rules / sqs_queue_policy.py View on Github external
from cfripper.rules.base_rules import Rule

logger = logging.getLogger(__file__)


class SQSQueuePolicyNotPrincipalRule(Rule):
    """
    Checks if an SQS Queue policy has an Allow + a NotPrincipal.

    Risk:
        AWS **strongly** recommends against using `NotPrincipal` in the same policy statement as `"Effect": "Allow"`.
        Doing so grants the permissions specified in the policy statement to all principals except the one named
        in the `NotPrincipal` element. By doing this, you might grant access to anonymous (unauthenticated) users.
    """

    GRANULARITY = RuleGranularity.RESOURCE
    REASON = "SQS Queue {} policy should not allow Allow and NotPrincipal at the same time"

    def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, SQSQueuePolicy):
                for statement in resource.Properties.PolicyDocument._statement_as_list():
                    if statement.NotPrincipal:
                        self.add_failure_to_result(result, self.REASON.format(logical_id), resource_ids={logical_id})
        return result


class SQSQueuePolicyPublicRule(Rule):
    """
    Checks for wildcard principals in Allow statements in an SQS Queue Policy.
github Skyscanner / cfripper / cfripper / rules / policy_on_user.py View on Github external
"GoodPolicy": {
          "Type": "AWS::IAM::Policy",
          "Properties": {
            "Description": "Policy for something.",
            "Path": "/",
            "PolicyDocument": {
              "Version": "2012-10-17",
              "Statement": [...]
            },
            "Groups": ["user_group"]
          }
        }
        ```
    """

    GRANULARITY = RuleGranularity.RESOURCE
    REASON = "IAM policy {} should not apply directly to users. Should be on group"

    def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, IAMPolicy) and resource.Properties.Users:
                self.add_failure_to_result(result, self.REASON.format(logical_id), resource_ids={logical_id})
        return result
github Skyscanner / cfripper / cfripper / rules / s3_bucket_policy.py View on Github external
logger = logging.getLogger(__file__)


class S3BucketPolicyPrincipalRule(PrincipalCheckingRule):
    """
    Checks for non-whitelisted principals in S3 bucket policies.

    Risk:
        This is designed to block unintended access from third party accounts to your buckets.

    Fix:
        All principals connected to S3 Bucket Policies should be known. CFRipper checks that **all** principals meet
        the requirements expected. The list of valid accounts is defined in `valid_principals`, which is set in the config.
    """

    GRANULARITY = RuleGranularity.RESOURCE
    REASON = "S3 Bucket {} policy has non-whitelisted principals {}"
    RISK_VALUE = RuleRisk.HIGH

    def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, S3BucketPolicy):
                for statement in resource.Properties.PolicyDocument._statement_as_list():
                    for principal in statement.get_principal_list():
                        account_id = get_account_id_from_principal(principal)
                        if not account_id:
                            continue
                        if account_id not in self.valid_principals:
                            if statement.Condition and statement.Condition.dict():
                                logger.warning(
                                    f"Not adding {type(self).__name__} failure in {logical_id} "
github Skyscanner / cfripper / cfripper / model / rule.py View on Github external
specific language governing permissions and limitations under the License.
"""
from abc import ABC, abstractmethod
from typing import Optional, Set

from pycfmodel.model.cf_model import CFModel

from ..config.config import Config
from .enums import RuleGranularity, RuleMode, RuleRisk
from .result import Result


class Rule(ABC):
    RULE_MODE = RuleMode.BLOCKING
    RISK_VALUE = RuleRisk.MEDIUM
    GRANULARITY = RuleGranularity.STACK

    def __init__(self, config: Optional[Config], result: Result):
        self._config = config if config else Config()
        self._result = result

    @abstractmethod
    def invoke(self, cfmodel: CFModel):
        pass

    def add_failure(
        self,
        rule: str,
        reason: str,
        granularity: Optional[RuleGranularity] = None,
        resource_ids: Optional[Set] = None,
        actions: Optional[Set] = None,
github Skyscanner / cfripper / cfripper / rule_processor.py View on Github external
def remove_failures_of_whitelisted_resources(config: Config, result: Result):

        if not result.failed_rules:
            return

        clean_failures = []

        for failure in result.failed_rules:
            if failure.granularity != RuleGranularity.RESOURCE:
                clean_failures.append(failure)
                continue

            if not failure.resource_ids:
                logger.warning(f"Failure with resource granularity doesn't have resources: {failure}")
                continue

            whitelisted_resources = {
                resource
                for resource in failure.resource_ids
                if any(
                    [
                        re.match(whitelisted_resource_regex, resource)
                        for whitelisted_resource_regex in config.get_whitelisted_resources(failure.rule)
                    ]
                )
github Skyscanner / cfripper / cfripper / rules / wildcard_principals.py View on Github external
from cfripper.model.enums import RuleGranularity, RuleMode, RuleRisk
from cfripper.model.result import Result
from cfripper.rules.base_rules import PrincipalCheckingRule

logger = logging.getLogger(__file__)


class GenericWildcardPrincipalRule(PrincipalCheckingRule):
    """
    Checks for wildcard principals in resources.
    """

    REASON_WILCARD_PRINCIPAL = "{} should not allow wildcard in principals or account-wide principals (principal: '{}')"
    REASON_NOT_ALLOWED_PRINCIPAL = "{} contains an unknown principal: {}"
    RULE_MODE = RuleMode.MONITOR
    GRANULARITY = RuleGranularity.RESOURCE

    IAM_PATTERN = re.compile(r"arn:aws:iam::(\d*|\*):.*")
    FULL_REGEX = re.compile(r"^((\w*:){0,1}\*|arn:aws:iam::(\d*|\*):.*)$")

    def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
        result = Result()
        for logical_id, resource in cfmodel.Resources.items():
            if isinstance(resource, (IAMManagedPolicy, IAMPolicy, S3BucketPolicy, SNSTopicPolicy, SQSQueuePolicy)):
                self.check_for_wildcards(result, logical_id, resource.Properties.PolicyDocument)
            elif isinstance(resource, (IAMRole, IAMUser)):
                if isinstance(resource, IAMRole):
                    self.check_for_wildcards(result, logical_id, resource.Properties.AssumeRolePolicyDocument)
                if resource.Properties and resource.Properties.Policies:
                    for policy in resource.Properties.Policies:
                        self.check_for_wildcards(result, logical_id, policy.PolicyDocument)
        return result