Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
__all__ = ["IAMRolesOverprivilegedRule", "IAMRoleWildcardActionOnPolicyRule"]
from typing import Dict, Optional
from pycfmodel.model.cf_model import CFModel
from pycfmodel.model.resources.iam_managed_policy import IAMManagedPolicy
from pycfmodel.model.resources.iam_role import IAMRole
from cfripper.config.regex import REGEX_IS_STAR, REGEX_WILDCARD_POLICY_ACTION
from cfripper.model.enums import RuleGranularity
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule
class IAMRolesOverprivilegedRule(Rule):
"""
Rule that checks for wildcards in resources for a set of actions and restricts managed policies.
"""
GRANULARITY = RuleGranularity.RESOURCE
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMRole):
self.check_managed_policies(result, logical_id, resource)
self.check_inline_policies(result, logical_id, resource)
return result
def check_managed_policies(self, result: Result, logical_id: str, role: IAMRole):
"""Run the managed policies against a blacklist."""
from itertools import groupby
from operator import itemgetter
from typing import Dict, List, Optional, Tuple, Union
from pycfmodel.model.cf_model import CFModel
from pycfmodel.model.resources.properties.security_group_ingress_prop import SecurityGroupIngressProp
from pycfmodel.model.resources.security_group import SecurityGroup
from pycfmodel.model.resources.security_group_ingress import SecurityGroupIngress, SecurityGroupIngressProperties
from cfripper.model.enums import RuleGranularity
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule
class SecurityGroupOpenToWorldRule(Rule):
"""
Base class not intended to be instantiated, but inherited from.
This class provides common methods used to detect open ports.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "Port(s) {} open to public IPs: ({}) in security group '{}'"
def analyse_ingress(
self,
result: Result,
logical_id: str,
ingress: Union[SecurityGroupIngressProp, SecurityGroupIngressProperties],
filters_available_context: Dict,
):
if self.non_compliant_ip_range(ingress=ingress):
"""
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.resources_filtered_by_type({SecurityGroupIngress}).items():
filters_available_context = {
"config": self._config,
"extras": extras,
"logical_id": logical_id,
"resource": resource,
}
self.analyse_ingress(result, logical_id, resource.Properties, filters_available_context)
return result
class EC2SecurityGroupMissingEgressRule(Rule):
"""
Checks that Security Groups are defined with an egress policy, even if this is still allowing all
outbound traffic.
Risk:
If no egress rule is specified, the default is to open all outbound traffic to the world. Whilst
some services may need this, it is usually the case that the security group can be locked down
more. A NAT instance for example may require a completely open egress policy.
Allowing unrestricted (`0.0.0.0/0` or `::/0`) outbound/egress access can increase opportunities for
malicious activity such as such as Denial of Service (DoS) attacks or Distributed Denial of Service (DDoS)
attacks.
Fix:
Explicitly defining the egress policy for the security group.
import logging
import re
from typing import Dict, Optional
from pycfmodel.model.cf_model import CFModel
from pycfmodel.model.resources.s3_bucket_policy import S3BucketPolicy
from cfripper.model.enums import RuleGranularity, RuleRisk
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule
logger = logging.getLogger(__file__)
class S3BucketPublicReadAclAndListStatementRule(Rule):
# TODO: refactor regex to regex file.
"""
Checks if any S3 bucket policy has a public read ACL and `List` permission in the bucket policy.
Fix:
Unless the bucket is hosting static content and needs to be accessed publicly,
these bucket policies should be locked down.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "S3 Bucket {} should not have a public read acl and list bucket statement"
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, S3BucketPolicy) and resource.Properties.PolicyDocument.allowed_actions_with(
__all__ = ["SNSTopicPolicyNotPrincipalRule"]
from typing import Dict, Optional
from pycfmodel.model.cf_model import CFModel
from pycfmodel.model.resources.sns_topic_policy import SNSTopicPolicy
from cfripper.model.enums import RuleGranularity
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule
class SNSTopicPolicyNotPrincipalRule(Rule):
"""
Checks if an SNS topic policy has an Allow + a NotPrincipal.
Risk:
AWS **strongly** recommends against using `NotPrincipal` in the same policy statement as `"Effect": "Allow"`.
Doing so grants the permissions specified in the policy statement to all principals except the one named
in the `NotPrincipal` element. By doing this, you might grant access to anonymous (unauthenticated) users.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "SNS Topic {} policy should not allow Allow and NotPrincipal at the same time"
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, SNSTopicPolicy):
import logging
from typing import Dict, Optional
from pycfmodel.model.cf_model import CFModel
from pycfmodel.model.resources.sqs_queue_policy import SQSQueuePolicy
from cfripper.config.regex import REGEX_HAS_STAR_OR_STAR_AFTER_COLON
from cfripper.model.enums import RuleGranularity, RuleRisk
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule
logger = logging.getLogger(__file__)
class SQSQueuePolicyNotPrincipalRule(Rule):
"""
Checks if an SQS Queue policy has an Allow + a NotPrincipal.
Risk:
AWS **strongly** recommends against using `NotPrincipal` in the same policy statement as `"Effect": "Allow"`.
Doing so grants the permissions specified in the policy statement to all principals except the one named
in the `NotPrincipal` element. By doing this, you might grant access to anonymous (unauthenticated) users.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "SQS Queue {} policy should not allow Allow and NotPrincipal at the same time"
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, SQSQueuePolicy):
__all__ = ["KMSKeyWildcardPrincipalRule"]
import logging
import re
from typing import Dict, Optional
from pycfmodel.model.cf_model import CFModel
from pycfmodel.model.resources.kms_key import KMSKey
from cfripper.model.enums import RuleGranularity
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule
logger = logging.getLogger(__file__)
class KMSKeyWildcardPrincipalRule(Rule):
"""
Check for wildcards in principals in KMS Policies.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "KMS Key policy {} should not allow wildcard principals"
CONTAINS_WILDCARD_PATTERN = re.compile(r"^(\w*:)?\*$")
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, KMSKey):
for statement in resource.Properties.KeyPolicy._statement_as_list():
if statement.Effect == "Allow" and statement.principals_with(self.CONTAINS_WILDCARD_PATTERN):
for principal in statement.get_principal_list():
risk_value = fltr.risk_value or risk_value
rule_mode = fltr.rule_mode or rule_mode
if rule_mode not in (RuleMode.DISABLED, RuleMode.WHITELISTED):
warning = Failure(
rule=type(self).__name__,
reason=reason,
granularity=granularity or self.GRANULARITY,
resource_ids=resource_ids,
actions=actions,
risk_value=risk_value,
rule_mode=rule_mode,
)
result.add_warning(warning)
class PrincipalCheckingRule(Rule):
"""Abstract class for rules that check principals"""
_valid_principals = None
def _get_whitelist_from_config(self, services: List[str] = None) -> Set[str]:
if services is None:
services = self._config.aws_service_accounts.keys()
unique_list = set()
for service in services:
unique_list |= set(self._config.aws_service_accounts[service])
return unique_list
@property
def valid_principals(self) -> Set[str]:
if self._valid_principals is None:
__all__ = ["ManagedPolicyOnUserRule"]
from typing import Dict, Optional
from pycfmodel.model.cf_model import CFModel
from pycfmodel.model.resources.iam_managed_policy import IAMManagedPolicy
from cfripper.model.enums import RuleGranularity
from cfripper.model.result import Result
from cfripper.rules.base_rules import Rule
class ManagedPolicyOnUserRule(Rule):
"""
Checks if any IAM managed policy is applied to a group and not a user.
Risk:
Instead of defining permissions for individual IAM users, it's usually more convenient and secure
to create [IAM groups](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_groups.html) that relate
to different functions. IAM users can be assigned to these groups.
All the users in an IAM group inherit the permissions assigned to the group. That way, you can make
changes for everyone in a group in just one place. As people move around in your company, you can
simply change what IAM group their IAM user belongs to, without risking a user having too much
privilege.
Fix:
Use IAM Groups as opposed to users in IAM Managed Policies.
Code for fix:
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, S3BucketPolicy) and resource.Properties.PolicyDocument.allowed_actions_with(
re.compile(r"^s3:L.*$")
):
bucket_name = resource.Properties.Bucket
if "UNDEFINED_PARAM_" in bucket_name:
bucket_name = bucket_name[len("UNDEFINED_PARAM_") :]
bucket = cfmodel.Resources.get(bucket_name)
if bucket and bucket.Properties.get("AccessControl") == "PublicRead":
self.add_failure_to_result(result, self.REASON.format(logical_id), resource_ids={logical_id})
return result
class S3BucketPublicReadWriteAclRule(Rule):
"""
Checks if any S3 bucket policy has access control set to `PublicReadWrite`.
Risk:
Unless required, S3 buckets should not have Public Write available on a bucket. This allows anyone
to write any objects to your S3 bucket.
Fix:
Remove any configuration that looks like `"AccessControl": "PublicReadWrite"` from your S3 bucket policy.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "S3 Bucket {} should not have a public read-write acl"
RISK_VALUE = RuleRisk.HIGH
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result: