Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_result_addition():
failure1 = Failure(
granularity=RuleGranularity.STACK, reason="reason1", risk_value="risk1", rule="rule1", rule_mode="mode1",
)
failure2 = Failure(
granularity=RuleGranularity.STACK, reason="reason2", risk_value="risk2", rule="rule2", rule_mode="mode2",
)
monitored_failure1 = Failure(
granularity=RuleGranularity.RESOURCE, reason="reason1", risk_value="risk1", rule="rule1", rule_mode="mode1",
)
monitored_failure2 = Failure(
granularity=RuleGranularity.RESOURCE, reason="reason2", risk_value="risk2", rule="rule2", rule_mode="mode2",
)
result1 = Result(failed_rules=[failure1], failed_monitored_rules=[monitored_failure1])
result2 = Result(failed_rules=[failure2], failed_monitored_rules=[monitored_failure2])
assert result1 + result2 == Result(
failed_rules=[failure1, failure2], failed_monitored_rules=[monitored_failure1, monitored_failure2]
)
actions=None,
granularity=RuleGranularity.RESOURCE,
)
]
result.failed_rules = failed_rules
RuleProcessor.remove_failures_of_whitelisted_resources(config=config, result=result)
assert result.failed_rules == [
Failure(
rule="S3CrossAccountTrustRule",
reason="Forbidden cross-account policy allow with 123456789 for an S3 bucket.",
rule_mode=RuleMode.BLOCKING,
risk_value=RuleRisk.HIGH,
resource_ids={"thenotwhitelistedthing", "anotherone"},
actions=None,
granularity=RuleGranularity.RESOURCE,
)
risk_value=RuleRisk.MEDIUM,
resource_ids={"RootRoleOne"},
actions=set(),
granularity=RuleGranularity.RESOURCE,
),
Failure(
rule="CrossAccountTrustRule",
reason=(
"RootRoleTwo has forbidden cross-account trust relationship with "
"arn:aws:iam::999999999:role/someuser@bla.com"
),
rule_mode=RuleMode.BLOCKING,
risk_value=RuleRisk.MEDIUM,
resource_ids={"RootRoleTwo"},
actions=set(),
granularity=RuleGranularity.RESOURCE,
),
from cfripper.model.enums import RuleGranularity, RuleMode
from cfripper.model.result import Result
from cfripper.model.utils import get_account_id_from_principal
from cfripper.rules.base_rules import PrincipalCheckingRule
logger = logging.getLogger(__file__)
class CrossAccountCheckingRule(PrincipalCheckingRule):
"""
Base class not intended to be instantiated, but inherited from.
This class provides common methods used to detect access permissions from other accounts.
"""
GRANULARITY = RuleGranularity.RESOURCE
RESOURCE_TYPE: Resource
PROPERTY_WITH_POLICYDOCUMENT: str
@property
def valid_principals(self) -> Set[str]:
if self._valid_principals is None:
self._valid_principals = self._get_whitelist_from_config()
if self._config.aws_account_id:
self._valid_principals.add(self._config.aws_account_id)
return self._valid_principals
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, self.RESOURCE_TYPE):
properties = resource.Properties
...
Resources:
RDSCluster:
Type: AWS::RDS::DBCluster
DeletionPolicy: "Snapshot"
Properties:
...
MasterUserPassword: !Ref 'MasterUserPassword'
...
````
"""
REASON_DEFAULT = "Default RDS {} password parameter (readable in plain-text) for {}."
REASON_MISSING_NOECHO = "RDS {} password parameter missing NoEcho for {}."
GRANULARITY = RuleGranularity.RESOURCE
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
password_protected_cluster_ids = []
instances_to_check = []
for logical_id, resource in cfmodel.Resources.items():
# flag insecure RDS Clusters.
if resource.Type == "AWS::RDS::DBCluster":
failure_added = self._failure_added(result, logical_id, resource)
if not failure_added:
password_protected_cluster_ids.append(logical_id)
# keep track of RDS instances so they can be examined in the code below.
elif resource.Type == "AWS::RDS::DBInstance":
instances_to_check.append((logical_id, resource))
return result
class S3BucketPublicReadWriteAclRule(Rule):
"""
Checks if any S3 bucket policy has access control set to `PublicReadWrite`.
Risk:
Unless required, S3 buckets should not have Public Write available on a bucket. This allows anyone
to write any objects to your S3 bucket.
Fix:
Remove any configuration that looks like `"AccessControl": "PublicReadWrite"` from your S3 bucket policy.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "S3 Bucket {} should not have a public read-write acl"
RISK_VALUE = RuleRisk.HIGH
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if (
resource.Type == "AWS::S3::Bucket"
and hasattr(resource, "Properties")
and resource.Properties.get("AccessControl") == "PublicReadWrite"
):
self.add_failure_to_result(result, self.REASON.format(logical_id), resource_ids={logical_id})
return result
if action.startswith(prefix):
self.add_failure_to_result(
result,
f"Role '{logical_id}' contains an insecure permission '{action}' in policy "
f"'{policy.PolicyName}'",
resource_ids={logical_id},
)
class IAMRoleWildcardActionOnPolicyRule(Rule):
"""
Checks for use of wildcard characters in all IAM Role policies (including `AssumeRolePolicyDocument`)
and [AWS Managed Policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html).
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "IAM role {} should not allow a `*` action on its {}"
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMRole):
# check AssumeRolePolicyDocument.
if resource.Properties.AssumeRolePolicyDocument.allowed_actions_with(REGEX_WILDCARD_POLICY_ACTION):
self.add_failure_to_result(
result, self.REASON.format(logical_id, "AssumeRolePolicy"), resource_ids={logical_id},
)
# check other policies of the IAM role.
if resource.Properties.Policies:
for policy in resource.Properties.Policies:
if policy.PolicyDocument.allowed_actions_with(REGEX_WILDCARD_POLICY_ACTION):
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule
logger = logging.getLogger(__file__)
class GenericWildcardPolicyRule(Rule):
"""
Abstract rule that checks for use of the wildcard `*` character in Actions of Policy Documents of AWS Resources.
This rule must be inherited by another class to be used, with `AWS_RESOURCE` set to the resource to be checked.
See `S3BucketPolicyWildcardActionRule` and `SQSQueuePolicyWildcardActionRule` for examples.
"""
REASON = "The {} {} should not allow a `*` action"
GRANULARITY = RuleGranularity.RESOURCE
AWS_RESOURCE: Type[Resource] = None
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
if self.AWS_RESOURCE is None:
logger.warning(f"Not running {type(self).__name__} rule as AWS_RESOURCE is not defined.")
else:
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, self.AWS_RESOURCE):
print(resource.Properties.PolicyDocument)
if isinstance(resource, self.AWS_RESOURCE) and resource.Properties.PolicyDocument.allowed_actions_with(
REGEX_HAS_STAR_OR_STAR_AFTER_COLON
):
self.add_failure_to_result(
result, self.REASON.format(self.AWS_RESOURCE.__name__, logical_id), resource_ids={logical_id},
import re
from pycfmodel.model.resources.iam_role import IAMRole
from ..config.regex import REGEX_CROSS_ACCOUNT_ROOT
from ..model.enums import RuleGranularity, RuleMode
from ..model.principal_checking_rule import PrincipalCheckingRule
logger = logging.getLogger(__file__)
class CrossAccountTrustRule(PrincipalCheckingRule):
REASON = "{} has forbidden cross-account trust relationship with {}"
ROOT_PATTERN = re.compile(REGEX_CROSS_ACCOUNT_ROOT)
GRANULARITY = RuleGranularity.RESOURCE
def invoke(self, cfmodel):
not_has_account_id = re.compile(rf"^((?!{self._config.aws_account_id}).)*$")
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMRole):
for principal in resource.Properties.AssumeRolePolicyDocument.allowed_principals_with(
self.ROOT_PATTERN
):
self.add_failure(
type(self).__name__, self.REASON.format(logical_id, principal), resource_ids={logical_id}
)
if self._config.aws_account_id:
for principal in resource.Properties.AssumeRolePolicyDocument.allowed_principals_with(
not_has_account_id
):
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule
class PrivilegeEscalationRule(Rule):
"""
Checks for any dangerous IAM actions that could allow privilege escalation and potentially
represent a large security risk.
See [current blacklisted IAM actions](https://github.com/Skyscanner/cfripper/blob/master/cfripper/rules/privilege_escalation.py#L29).
Fix:
Unless strictly necessary, do not use actions in the IAM action blacklist. CloudFormation files that do require these
actions should be added to the whitelist.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "{} has blacklisted IAM action {}"
IAM_BLACKLIST = set(
action.lower()
for action in [
"iam:CreateAccessKey",
"iam:CreateLoginProfile",
"iam:UpdateLoginProfile",
"iam:AttachUserPolicy",
"iam:AttachGroupPolicy",
"iam:AttachRolePolicy",
"iam:PutUserPolicy",
"iam:PutGroupPolicy",
"iam:PutRolePolicy",
"iam:CreatePolicy",
"iam:AddUserToGroup",
"iam:UpdateAssumeRolePolicy",