Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_remove_debug_rules():
original_failed_monitored_rules = [
Failure(
rule="a",
reason="something",
rule_mode=RuleMode.MONITOR,
granularity=RuleGranularity.STACK,
risk_value=RuleRisk.HIGH,
),
Failure(
rule="b",
reason="something",
rule_mode=RuleMode.DEBUG,
granularity=RuleGranularity.STACK,
risk_value=RuleRisk.MEDIUM,
),
Failure(
rule="c",
reason="something",
rule_mode=RuleMode.MONITOR,
granularity=RuleGranularity.STACK,
risk_value=RuleRisk.LOW,
),
]
def test_sqs_policy_public(sqs_policy_public):
rule = SQSQueuePolicyPublicRule(None)
result = rule.invoke(sqs_policy_public)
assert not result.valid
assert len(result.failed_rules) == 4
assert len(result.failed_monitored_rules) == 0
assert result.failed_rules[0].risk_value == RuleRisk.HIGH
assert result.failed_rules[0].rule == "SQSQueuePolicyPublicRule"
assert result.failed_rules[0].reason == "SQS Queue policy QueuePolicyPublic1 should not be public"
assert result.failed_rules[1].rule == "SQSQueuePolicyPublicRule"
assert result.failed_rules[1].reason == "SQS Queue policy QueuePolicyPublic2 should not be public"
assert result.failed_rules[2].rule == "SQSQueuePolicyPublicRule"
assert result.failed_rules[2].reason == "SQS Queue policy QueuePolicyPublic3 should not be public"
assert result.failed_rules[3].rule == "SQSQueuePolicyPublicRule"
assert result.failed_rules[3].reason == "SQS Queue policy QueuePolicyPublic4 should not be public"
rule_mode=RuleMode.BLOCKING,
risk_value=RuleRisk.HIGH,
resource_ids=set(),
actions=set(),
granularity=RuleGranularity.STACK,
),
]
result.failed_rules = failed_rules
RuleProcessor.remove_failures_of_whitelisted_actions(config=config, result=result)
assert result.failed_rules == [
Failure(
rule="WildcardResourceRule",
reason="rolething is using a wildcard resource in BucketAccessPolicy",
rule_mode=RuleMode.BLOCKING,
risk_value=RuleRisk.HIGH,
resource_ids=set(),
actions=set(),
granularity=RuleGranularity.STACK,
)
class FullWildcardPrincipalRule(GenericWildcardPrincipalRule):
"""
Checks for any wildcard principals defined in any statements.
Risk:
It might allow other AWS identities to escalate privileges.
Fix:
Where possible, restrict the access to only the required resources.
For example, instead of `Principal: "*"`, include a list of the roles that need access.
"""
REASON_WILCARD_PRINCIPAL = "{} should not allow wildcards in principals (principal: '{}')"
RULE_MODE = RuleMode.BLOCKING
RISK_VALUE = RuleRisk.HIGH
FULL_REGEX = REGEX_FULL_WILDCARD_PRINCIPAL
for statement in resource.Properties.PolicyDocument._statement_as_list():
if statement.NotPrincipal:
self.add_failure_to_result(result, self.REASON.format(logical_id), resource_ids={logical_id})
return result
class SQSQueuePolicyPublicRule(Rule):
"""
Checks for wildcard principals in Allow statements in an SQS Queue Policy.
Risk:
This is deemed a potential security risk as anyone would be able to interact with your queue.
"""
REASON = "SQS Queue policy {} should not be public"
RISK_VALUE = RuleRisk.HIGH
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, SQSQueuePolicy) and resource.Properties.PolicyDocument.allowed_principals_with(
REGEX_HAS_STAR_OR_STAR_AFTER_COLON
):
for statement in resource.Properties.PolicyDocument._statement_as_list():
if statement.Effect == "Allow" and statement.principals_with(REGEX_HAS_STAR_OR_STAR_AFTER_COLON):
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} "
f"because there are conditions: {statement.Condition}"
)
else:
self.add_failure_to_result(
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from ..config.regex import REGEX_FULL_WILDCARD_PRINCIPAL
from ..model.enums import RuleMode, RuleRisk
from .GenericWildcardPrincipalRule import GenericWildcardPrincipalRule
class FullWildcardPrincipalRule(GenericWildcardPrincipalRule):
REASON_WILCARD_PRINCIPAL = "{} should not allow wildcards in principals (principal: '{}')"
RULE_MODE = RuleMode.BLOCKING
RISK_VALUE = RuleRisk.HIGH
FULL_REGEX = REGEX_FULL_WILDCARD_PRINCIPAL
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from ..model.enums import RuleRisk
from ..model.rule import Rule
class S3BucketPublicReadWriteAclRule(Rule):
REASON = "S3 Bucket {} should not have a public read-write acl"
RISK_VALUE = RuleRisk.HIGH
def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if (
resource.Type == "AWS::S3::Bucket"
and hasattr(resource, "Properties")
and resource.Properties.get("AccessControl") == "PublicReadWrite"
):
self.add_failure(type(self).__name__, self.REASON.format(logical_id))
def cfripper_rules():
severity_map = {RuleRisk.HIGH: "**High**", RuleRisk.MEDIUM: "Medium", RuleRisk.LOW: "Low"}
rules_inspection = inspect.getmembers(rules, inspect.isclass)
results = []
for _, klass in rules_inspection:
doc = inspect.getdoc(klass)
parsed_doc = parse_doc_string(doc)
content = ""
for paragraph_title, paragraph_text in parsed_doc.items():
if paragraph_title == "Description":
# Remove ABCMeta default docstring
if not paragraph_text.startswith("Helper class that"):
content += paragraph_text
content += f"\n\n>Severity: {severity_map[klass.RISK_VALUE]}\n"
if klass.RULE_MODE == RuleMode.MONITOR:
content += "\n>Defaults to monitor mode (rule not enforced)\n"
if klass.RULE_MODE == RuleMode.DEBUG:
class S3BucketPolicyPrincipalRule(PrincipalCheckingRule):
"""
Checks for non-whitelisted principals in S3 bucket policies.
Risk:
This is designed to block unintended access from third party accounts to your buckets.
Fix:
All principals connected to S3 Bucket Policies should be known. CFRipper checks that **all** principals meet
the requirements expected. The list of valid accounts is defined in `valid_principals`, which is set in the config.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "S3 Bucket {} policy has non-whitelisted principals {}"
RISK_VALUE = RuleRisk.HIGH
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, S3BucketPolicy):
for statement in resource.Properties.PolicyDocument._statement_as_list():
for principal in statement.get_principal_list():
account_id = get_account_id_from_principal(principal)
if not account_id:
continue
if account_id not in self.valid_principals:
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} "
f"because there are conditions: {statement.Condition}"
)
class S3BucketPublicReadWriteAclRule(Rule):
"""
Checks if any S3 bucket policy has access control set to `PublicReadWrite`.
Risk:
Unless required, S3 buckets should not have Public Write available on a bucket. This allows anyone
to write any objects to your S3 bucket.
Fix:
Remove any configuration that looks like `"AccessControl": "PublicReadWrite"` from your S3 bucket policy.
"""
GRANULARITY = RuleGranularity.RESOURCE
REASON = "S3 Bucket {} should not have a public read-write acl"
RISK_VALUE = RuleRisk.HIGH
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if (
resource.Type == "AWS::S3::Bucket"
and hasattr(resource, "Properties")
and resource.Properties.get("AccessControl") == "PublicReadWrite"
):
self.add_failure_to_result(result, self.REASON.format(logical_id), resource_ids={logical_id})
return result