Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def expected_result_two_roles():
return [
Failure(
rule="CrossAccountTrustRule",
reason=(
"RootRoleOne has forbidden cross-account trust relationship with "
"arn:aws:iam::999999999:role/someuser@bla.com"
),
rule_mode=RuleMode.BLOCKING,
risk_value=RuleRisk.MEDIUM,
resource_ids={"RootRoleOne"},
actions=set(),
granularity=RuleGranularity.RESOURCE,
),
Failure(
rule="CrossAccountTrustRule",
reason=(
"RootRoleTwo has forbidden cross-account trust relationship with "
"arn:aws:iam::999999999:role/someuser@bla.com"
),
rule_mode=RuleMode.BLOCKING,
risk_value=RuleRisk.MEDIUM,
resource_ids={"RootRoleTwo"},
actions=set(),
granularity=RuleGranularity.RESOURCE,
),
def test_result_valid_after_removing_failures():
result = Result()
result.add_failure(
rule="mock_rule",
reason="mock_reason",
rule_mode=RuleMode.BLOCKING,
risk_value=RuleRisk.HIGH,
granularity=RuleGranularity.STACK,
)
# Result has a blocking failure, so it should be invalid
assert result.valid is False
result.failed_rules = []
# Result has no failures, so it should be valid
assert result.valid is True
from cfripper.rules.base_rules import Rule
logger = logging.getLogger(__file__)
class SQSQueuePolicyNotPrincipalRule(Rule):
"""
Checks if an SQS Queue policy has an Allow + a NotPrincipal.
Risk:
AWS **strongly** recommends against using `NotPrincipal` in the same policy statement as `"Effect": "Allow"`.
Doing so grants the permissions specified in the policy statement to all principals except the one named
in the `NotPrincipal` element. By doing this, you might grant access to anonymous (unauthenticated) users.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "SQS Queue {} policy should not allow Allow and NotPrincipal at the same time"
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, SQSQueuePolicy):
for statement in resource.Properties.PolicyDocument._statement_as_list():
if statement.NotPrincipal:
self.add_failure_to_result(result, self.REASON.format(logical_id), resource_ids={logical_id})
return result
class SQSQueuePolicyPublicRule(Rule):
"""
Checks for wildcard principals in Allow statements in an SQS Queue Policy.
"GoodPolicy": {
"Type": "AWS::IAM::Policy",
"Properties": {
"Description": "Policy for something.",
"Path": "/",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [...]
},
"Groups": ["user_group"]
}
}
```
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "IAM policy {} should not apply directly to users. Should be on group"
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMPolicy) and resource.Properties.Users:
self.add_failure_to_result(result, self.REASON.format(logical_id), resource_ids={logical_id})
return result
logger = logging.getLogger(__file__)
class S3BucketPolicyPrincipalRule(PrincipalCheckingRule):
"""
Checks for non-whitelisted principals in S3 bucket policies.
Risk:
This is designed to block unintended access from third party accounts to your buckets.
Fix:
All principals connected to S3 Bucket Policies should be known. CFRipper checks that **all** principals meet
the requirements expected. The list of valid accounts is defined in `valid_principals`, which is set in the config.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "S3 Bucket {} policy has non-whitelisted principals {}"
RISK_VALUE = RuleRisk.HIGH
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, S3BucketPolicy):
for statement in resource.Properties.PolicyDocument._statement_as_list():
for principal in statement.get_principal_list():
account_id = get_account_id_from_principal(principal)
if not account_id:
continue
if account_id not in self.valid_principals:
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} "
specific language governing permissions and limitations under the License.
"""
from abc import ABC, abstractmethod
from typing import Optional, Set
from pycfmodel.model.cf_model import CFModel
from ..config.config import Config
from .enums import RuleGranularity, RuleMode, RuleRisk
from .result import Result
class Rule(ABC):
RULE_MODE = RuleMode.BLOCKING
RISK_VALUE = RuleRisk.MEDIUM
GRANULARITY = RuleGranularity.STACK
def __init__(self, config: Optional[Config], result: Result):
self._config = config if config else Config()
self._result = result
@abstractmethod
def invoke(self, cfmodel: CFModel):
pass
def add_failure(
self,
rule: str,
reason: str,
granularity: Optional[RuleGranularity] = None,
resource_ids: Optional[Set] = None,
actions: Optional[Set] = None,
def remove_failures_of_whitelisted_resources(config: Config, result: Result):
if not result.failed_rules:
return
clean_failures = []
for failure in result.failed_rules:
if failure.granularity != RuleGranularity.RESOURCE:
clean_failures.append(failure)
continue
if not failure.resource_ids:
logger.warning(f"Failure with resource granularity doesn't have resources: {failure}")
continue
whitelisted_resources = {
resource
for resource in failure.resource_ids
if any(
[
re.match(whitelisted_resource_regex, resource)
for whitelisted_resource_regex in config.get_whitelisted_resources(failure.rule)
]
)
from cfripper.model.enums import RuleGranularity, RuleMode, RuleRisk
from cfripper.model.result import Result
from cfripper.rules.base_rules import PrincipalCheckingRule
logger = logging.getLogger(__file__)
class GenericWildcardPrincipalRule(PrincipalCheckingRule):
"""
Checks for wildcard principals in resources.
"""
REASON_WILCARD_PRINCIPAL = "{} should not allow wildcard in principals or account-wide principals (principal: '{}')"
REASON_NOT_ALLOWED_PRINCIPAL = "{} contains an unknown principal: {}"
RULE_MODE = RuleMode.MONITOR
GRANULARITY = RuleGranularity.RESOURCE
IAM_PATTERN = re.compile(r"arn:aws:iam::(\d*|\*):.*")
FULL_REGEX = re.compile(r"^((\w*:){0,1}\*|arn:aws:iam::(\d*|\*):.*)$")
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, (IAMManagedPolicy, IAMPolicy, S3BucketPolicy, SNSTopicPolicy, SQSQueuePolicy)):
self.check_for_wildcards(result, logical_id, resource.Properties.PolicyDocument)
elif isinstance(resource, (IAMRole, IAMUser)):
if isinstance(resource, IAMRole):
self.check_for_wildcards(result, logical_id, resource.Properties.AssumeRolePolicyDocument)
if resource.Properties and resource.Properties.Policies:
for policy in resource.Properties.Policies:
self.check_for_wildcards(result, logical_id, policy.PolicyDocument)
return result