Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def s3_bucket_policy():
return S3BucketPolicy(
**{
"Type": "AWS::S3::BucketPolicy",
"Properties": {
"Bucket": {"Ref": "S3Bucket"},
"PolicyDocument": {
"Statement": [
{
"Action": ["*"],
"Effect": "Allow",
"Resource": "arn:aws:s3:::fakebucketfakebucket/*",
"Principal": {"AWS": ["156460612806"]},
}
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, S3BucketPolicy):
for statement in resource.Properties.PolicyDocument._statement_as_list():
for principal in statement.get_principal_list():
account_id = get_account_id_from_principal(principal)
if not account_id:
continue
if account_id not in self.valid_principals:
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} "
f"because there are conditions: {statement.Condition}"
)
else:
self.add_failure_to_result(
result, self.REASON.format(logical_id, account_id), resource_ids={logical_id},
)
return result
def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, S3BucketPolicy) and resource.Properties.PolicyDocument.allowed_actions_with(
re.compile(r"^(\w*:){0,1}\*$")
):
self.add_failure(type(self).__name__, self.REASON.format(logical_id))
from .resources.sqs_queue_policy import SQSQueuePolicy
from .resources.sns_topic_policy import SNSTopicPolicy
from .resources.kms_key import KMSKey
_RESOURCE_MAP = {
"AWS::EC2::SecurityGroup": SecurityGroup,
"AWS::EC2::SecurityGroupEgress": SecurityGroupEgress,
"AWS::EC2::SecurityGroupIngress": SecurityGroupIngress,
"AWS::IAM::Group": IAMGroup,
"AWS::IAM::ManagedPolicy": IAMManagedPolicy,
"AWS::IAM::Policy": IAMPolicy,
"AWS::IAM::Role": IAMRole,
"AWS::IAM::User": IAMUser,
"AWS::KMS::Key": KMSKey,
"AWS::S3::BucketPolicy": S3BucketPolicy,
"AWS::SNS::TopicPolicy": SNSTopicPolicy,
"AWS::SQS::QueuePolicy": SQSQueuePolicy,
}
_DEFAULT_RESOURCE = Resource
def create_resource(logical_id: str, value: Dict[str, Any]) -> Resource:
resource = _RESOURCE_MAP.get(value.get("Type"), _DEFAULT_RESOURCE)
return resource(logical_id, value)
def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, S3BucketPolicy):
for statement in resource.Properties.PolicyDocument._statement_as_list():
if statement.Effect == "Allow":
for principal in statement.get_principal_list():
account_id = get_account_id_from_principal(principal)
if account_id not in self.valid_principals:
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} "
f"because there are conditions: {statement.Condition}"
)
elif "GETATT" in principal or "UNDEFINED_" in principal:
self.add_failure(
type(self).__name__,
self.REASON.format(logical_id, principal),
rule_mode=RuleMode.DEBUG,
)
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, S3BucketPolicy) and resource.Properties.PolicyDocument.allowed_actions_with(
re.compile(r"^s3:L.*$")
):
bucket_name = resource.Properties.Bucket
if "UNDEFINED_PARAM_" in bucket_name:
bucket_name = bucket_name[len("UNDEFINED_PARAM_") :]
bucket = cfmodel.Resources.get(bucket_name)
if bucket and bucket.Properties.get("AccessControl") == "PublicRead":
self.add_failure_to_result(result, self.REASON.format(logical_id), resource_ids={logical_id})
return result
from pycfmodel.model.resources.kms_key import KMSKey
from pycfmodel.model.resources.s3_bucket_policy import S3BucketPolicy
from pycfmodel.model.resources.security_group import SecurityGroup
from pycfmodel.model.resources.security_group_egress import SecurityGroupEgress
from pycfmodel.model.resources.security_group_ingress import SecurityGroupIngress
from pycfmodel.model.resources.sns_topic_policy import SNSTopicPolicy
from pycfmodel.model.resources.sqs_queue_policy import SQSQueuePolicy
ResourceModels = Union[
IAMGroup,
IAMManagedPolicy,
IAMPolicy,
IAMRole,
IAMUser,
KMSKey,
S3BucketPolicy,
SecurityGroup,
SecurityGroupEgress,
SecurityGroupIngress,
SNSTopicPolicy,
SQSQueuePolicy,
]