Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
policy = analyze_policy_string(
"""{
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "s3:listbucket",
"Resource": "arn:aws:s3:::bucket-name",
"Condition": {
"DateGreaterThan" :{"aws:CurrentTime" : "bad"},
"StringEquals": {"s3:prefix":["home/${aws:username}/*"]}
} }}"""
)
assert_false(len(policy.findings) == 0, "First condition is bad")
# Second bad
policy = analyze_policy_string(
"""{
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "s3:listbucket",
"Resource": "arn:aws:s3:::bucket-name",
"Condition": {
"DateGreaterThan" :{"aws:CurrentTime" : "2019-07-16T12:00:00Z"},
"StringEquals": {"s3:x":["home/${aws:username}/*"]}
} }}"""
)
assert_false(len(policy.findings) == 0, "Second condition is bad")
def test_analyze_policy_string_no_statement(self):
policy = analyze_policy_string(
"""{
"Version": "2012-10-17" }"""
)
assert_false(len(policy.findings) == 0, "Policy has no Statement")
def test_analyze_policy_string_opposites(self):
# Policy contains Action and NotAction
policy = analyze_policy_string(
"""{
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "s3:listallmybuckets",
"NotAction": "s3:listallmybuckets",
"Resource": "*"}}"""
)
assert_false(len(policy.findings) == 0, "Policy contains Action and NotAction")
def test_resource_policy_privilege_escalation_at_bucket_level(self):
policy = analyze_policy_string(
"""{
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutBucketPolicy"],
"Resource": ["arn:aws:s3:::bucket", "arn:aws:s3:::bucket/*"]
}}"""
)
assert_false(
len(policy.findings) == 0,
"Resource policy privilege escalation",
)
policy = analyze_policy_string(
"""{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["s3:*Bucket*", "s3:*Object*"],
"Resource": ["arn:aws:s3:::bucket1", "arn:aws:s3:::bucket1/*"]
},
{
"Effect": "Allow",
"Action": ["s3:*Object"],
"Resource": ["arn:aws:s3:::bucket2/*"]
}]}"""
)
print(policy.findings)
# There is one finding for "No resources match for s3:ListAllMyBuckets which requires a resource format of *"
def test_analyze_policy_string_correct_multiple_statements_and_actions(self):
policy = analyze_policy_string(
"""{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": "s3:listallmybuckets",
"Resource": "*"},
{
"Effect": "Allow",
"Action": "iam:listusers",
"Resource": "*"}]}"""
)
assert_equal(len(policy.findings), 0)
def test_condition_action_specific(self):
policy = analyze_policy_string(
"""{
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "s3:listbucket",
"Resource": "arn:aws:s3:::bucket-name",
"Condition": {"StringEquals": {"s3:prefix":["home/${aws:username}/*"]}} }}"""
)
assert_equal(len(policy.findings), 0)
# The key s3:x-amz-storage-class is not allowed for ListBucket,
# but is for other S3 actions
policy = analyze_policy_string(
"""{
"Version": "2012-10-17",
"Statement": {
"Statement":[
{
"Effect":"Allow",
"Principal":{"AWS":"000000000000"},
"Action":["s3:GetObject"],
"Resource":["arn:aws:s3:::examplebucket/*"]
}
]
}"""
)
assert_true(
len(policy.findings) == 0,
"S3 bucket policy with one account granted access via ID",
)
policy = analyze_policy_string(
"""{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Principal": { "AWS": "arn:aws:iam::000000000000:user/alice" },
"Action":["s3:GetObject"],
"Resource":["arn:aws:s3:::examplebucket/*"]
}
]
}"""
)
assert_true(len(policy.findings) == 0, "S3 bucket policy with ARN of user")
policy = analyze_policy_string(
"""{
def test_condition(self):
policy = analyze_policy_string(
"""{
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "s3:listbucket",
"Resource": "arn:aws:s3:::bucket-name",
"Condition": {"DateGreaterThan" :{"aws:CurrentTime" : "2019-07-16T12:00:00Z"}} }}"""
)
assert_equal(len(policy.findings), 0)
json.dumps(version["Document"]), role["Arn"]
)
findings.extend(policy.findings)
for group in auth_details_json["GroupDetailList"]:
for policy in group.get("GroupPolicyList", []):
policy = analyze_policy_string(
json.dumps(version["Document"]), group["Arn"]
)
findings.extend(policy.findings)
elif args.string:
policy = analyze_policy_string(args.string)
findings.extend(policy.findings)
elif args.file:
with open(args.file) as f:
contents = f.read()
policy = analyze_policy_string(contents, args.file)
findings.extend(policy.findings)
else:
parser.print_help()
exit(-1)
filtered_findings = []
override_config(args.config)
for finding in findings:
finding = enhance_finding(finding)
if not is_finding_filtered(finding, args.minimum_severity):
filtered_findings.append(finding)
if len(filtered_findings) == 0:
# Return with exit code 0 if no findings
return
if args.aws_managed_policies:
filenames = [
f
for f in listdir(args.aws_managed_policies)
if isfile(join(args.aws_managed_policies, f))
]
for filename in filenames:
filepath = join(args.aws_managed_policies, filename)
with open(filepath) as f:
contents = f.read()
policy_file_json = json.loads(contents)
policy_string = json.dumps(
policy_file_json["PolicyVersion"]["Document"]
)
policy = analyze_policy_string(policy_string, filepath)
findings.extend(policy.findings)
elif args.auth_details_file:
with open(args.auth_details_file) as f:
contents = f.read()
auth_details_json = json.loads(contents)
for policy in auth_details_json["Policies"]:
# Ignore AWS defined policies
if "arn:aws:iam::aws:" not in policy["Arn"]:
continue
for version in policy["PolicyVersionList"]:
if not version["IsDefaultVersion"]:
continue
policy = analyze_policy_string(
json.dumps(version["Document"]), policy["Arn"]