Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_remove_debug_rules():
original_failed_monitored_rules = [
Failure(
rule="a",
reason="something",
rule_mode=RuleMode.MONITOR,
granularity=RuleGranularity.STACK,
risk_value=RuleRisk.HIGH,
),
Failure(
rule="b",
reason="something",
rule_mode=RuleMode.DEBUG,
granularity=RuleGranularity.STACK,
risk_value=RuleRisk.MEDIUM,
),
Failure(
rule="c",
reason="something",
rule_mode=RuleMode.MONITOR,
granularity=RuleGranularity.STACK,
risk_value=RuleRisk.LOW,
reason="something",
rule_mode=RuleMode.MONITOR,
granularity=RuleGranularity.STACK,
risk_value=RuleRisk.HIGH,
),
Failure(
rule="b",
reason="something",
rule_mode=RuleMode.DEBUG,
granularity=RuleGranularity.STACK,
risk_value=RuleRisk.MEDIUM,
),
Failure(
rule="c",
reason="something",
rule_mode=RuleMode.MONITOR,
granularity=RuleGranularity.STACK,
risk_value=RuleRisk.LOW,
),
]
list_with_no_debug_rules = [original_failed_monitored_rules[0], original_failed_monitored_rules[2]]
processed_list = RuleProcessor.remove_debug_rules(rules=original_failed_monitored_rules)
assert list_with_no_debug_rules == processed_list
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from pycfmodel.model.resources.security_group import SecurityGroup
from ..model.enums import RuleMode
from ..model.rule import Rule
class SecurityGroupMissingEgressRule(Rule):
REASON = (
"Missing egress rule in {} means all traffic is allowed outbound. Make this explicit if it is desired "
"configuration"
)
RULE_MODE = RuleMode.MONITOR
def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, SecurityGroup) and not resource.Properties.SecurityGroupEgress:
self.add_failure(type(self).__name__, self.REASON.format(logical_id))
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from pycfmodel.model.resources.sqs_queue_policy import SQSQueuePolicy
from ..model.enums import RuleMode
from ..model.rule import Rule
class SQSQueuePolicyNotPrincipalRule(Rule):
REASON = "SQS Queue {} policy should not allow Allow and NotPrincipal at the same time"
RULE_MODE = RuleMode.MONITOR
def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, SQSQueuePolicy):
for statement in resource.Properties.PolicyDocument._statement_as_list():
if statement.NotPrincipal:
self.add_failure(type(self).__name__, self.REASON.format(logical_id))
from cfripper.config.regex import REGEX_FULL_WILDCARD_PRINCIPAL
from cfripper.model.enums import RuleGranularity, RuleMode, RuleRisk
from cfripper.model.result import Result
from cfripper.rules.base_rules import PrincipalCheckingRule
logger = logging.getLogger(__file__)
class GenericWildcardPrincipalRule(PrincipalCheckingRule):
"""
Checks for wildcard principals in resources.
"""
REASON_WILCARD_PRINCIPAL = "{} should not allow wildcard in principals or account-wide principals (principal: '{}')"
REASON_NOT_ALLOWED_PRINCIPAL = "{} contains an unknown principal: {}"
RULE_MODE = RuleMode.MONITOR
GRANULARITY = RuleGranularity.RESOURCE
IAM_PATTERN = re.compile(r"arn:aws:iam::(\d*|\*):.*")
FULL_REGEX = re.compile(r"^((\w*:){0,1}\*|arn:aws:iam::(\d*|\*):.*)$")
def invoke(self, cfmodel: CFModel, extras: Optional[Dict] = None) -> Result:
result = Result()
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, (IAMManagedPolicy, IAMPolicy, S3BucketPolicy, SNSTopicPolicy, SQSQueuePolicy)):
self.check_for_wildcards(result, logical_id, resource.Properties.PolicyDocument)
elif isinstance(resource, (IAMRole, IAMUser)):
if isinstance(resource, IAMRole):
self.check_for_wildcards(result, logical_id, resource.Properties.AssumeRolePolicyDocument)
if resource.Properties and resource.Properties.Policies:
for policy in resource.Properties.Policies:
self.check_for_wildcards(result, logical_id, policy.PolicyDocument)
def cfripper_rules():
severity_map = {RuleRisk.HIGH: "**High**", RuleRisk.MEDIUM: "Medium", RuleRisk.LOW: "Low"}
rules_inspection = inspect.getmembers(rules, inspect.isclass)
results = []
for _, klass in rules_inspection:
doc = inspect.getdoc(klass)
parsed_doc = parse_doc_string(doc)
content = ""
for paragraph_title, paragraph_text in parsed_doc.items():
if paragraph_title == "Description":
# Remove ABCMeta default docstring
if not paragraph_text.startswith("Helper class that"):
content += paragraph_text
content += f"\n\n>Severity: {severity_map[klass.RISK_VALUE]}\n"
if klass.RULE_MODE == RuleMode.MONITOR:
content += "\n>Defaults to monitor mode (rule not enforced)\n"
if klass.RULE_MODE == RuleMode.DEBUG:
content += "\n>Defaults to debug mode (rule not enforced)\n"
else:
content += f"\n#### {paragraph_title}\n"
content += f"{paragraph_text}\n"
results.append((klass.__name__, content))
return sorted(results)
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import re
from ..model.enums import RuleMode, RuleRisk
from .GenericWildcardPrincipalRule import GenericWildcardPrincipalRule
class PartialWildcardPrincipalRule(GenericWildcardPrincipalRule):
REASON_WILCARD_PRINCIPAL = "{} should not allow wildcard in principals or account-wide principals (principal: '{}')"
RULE_MODE = RuleMode.MONITOR
RISK_VALUE = RuleRisk.MEDIUM
"""
Will catch:
- Principal: arn:aws:iam:12345:12345*
"""
FULL_REGEX = re.compile(r"^arn:aws:iam::.*:(.*\*|root)$")
import logging
import re
from cfripper.model.enums import RuleMode
from cfripper.model.rule import Rule
logger = logging.getLogger(__file__)
POLICY_DOCUMENT_TYPES = ("policy_document", "key_policy", "assume_role_policy_document")
class GenericWildcardPrincipal(Rule):
REASON_WILCARD_PRINCIPAL = "{} should not allow wildcard in principals or account-wide principals (principal: '{}')"
REASON_NOT_ALLOWED_PRINCIPAL = "{} contains an unknown principal: {}"
RULE_MODE = RuleMode.MONITOR
IAM_PATTERN = r"arn:aws:iam::(\d*|\*):.*"
FULL_REGEX = r"^((\w*:){0,1}\*|arn:aws:iam::(\d*|\*):.*)$"
def invoke(self, resources, parameters):
for resource_list in resources.values():
for resource in resource_list:
# Resource has policies
for policy in getattr(resource, "policies", []):
self.check_for_wildcards(
resource=policy, resource_id=getattr(policy, "policy_name", resource.logical_id)
)
# Resouce is a policy
self.check_for_wildcards(resource=resource, resource_id=resource.logical_id)
def check_for_wildcards(self, resource, resource_id):