Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def set_policy_document(self, doc):
self.__policy_document = doc
self.__policy_summary = Policy(doc)
def audit_s3_buckets(findings, region):
buckets_json = query_aws(region.account, "s3-list-buckets", region)
buckets = pyjq.all(".Buckets[].Name", buckets_json)
for bucket in buckets:
# Check policy
try:
policy_file_json = get_parameter_file(
region, "s3", "get-bucket-policy", bucket
)
if policy_file_json is not None:
# Find the entity we need
policy_string = policy_file_json["Policy"]
# Load the string value as json
policy = json.loads(policy_string)
policy = Policy(policy)
if policy.is_internet_accessible():
if (
len(policy.statements) == 1
and len(policy.statements[0].actions) == 1
and "s3:GetObject" in policy.statements[0].actions
):
findings.add(
Finding(region, "S3_PUBLIC_POLICY_GETOBJECT_ONLY", bucket)
)
else:
findings.add(
Finding(
region,
"S3_PUBLIC_POLICY",
bucket,
resource_details=policy_string,
reasons_for_being_admin.append("Custom policy: {}".format(policy["PolicyName"]))
findings.add(
Finding(
region,
"IAM_CUSTOM_POLICY_ALLOWS_ADMIN",
role["Arn"],
resource_details={
"comment": "Role has custom policy allowing admin",
"policy": policy_doc,
},
)
)
# Check if role is accessible from anywhere
policy = Policy(role["AssumeRolePolicyDocument"])
if policy.is_internet_accessible():
findings.add(
Finding(
region,
"IAM_ROLE_ALLOWS_ASSUMPTION_FROM_ANYWHERE",
role["Arn"],
resource_details={"statement": role["AssumeRolePolicyDocument"]},
)
)
# Check if anything looks malformed
for stmt in role["AssumeRolePolicyDocument"]["Statement"]:
if stmt["Effect"] != "Allow":
findings.add(
Finding(
region,
def _process_access_policy(neo4j_session, domain_id, domain_data):
"""
Link the ES domain to its DNS FQDN endpoint and create associated nodes in the graph
if needed
:param neo4j_session: Neo4j session object
:param domain_id: ES domain id
:param domain_data: domain data
"""
tag_es = "MATCH (es:ESDomain{id: {DomainId}}) SET es.exposed_internet = {InternetExposed}"
exposed_internet = False
if domain_data.get("Endpoint") and domain_data.get("AccessPolicies"):
policy = Policy(json.loads(domain_data['AccessPolicies']))
if policy.is_internet_accessible():
exposed_internet = True
neo4j_session.run(tag_es, DomainId=domain_id, InternetExposed=exposed_internet)
# Check for publicly accessible functions. They should be called from apigateway or something else.
json_blob = query_aws(region.account, "lambda-list-functions", region)
for function in json_blob.get("Functions", []):
name = function["FunctionName"]
# Check policy
policy_file_json = get_parameter_file(region, "lambda", "get-policy", name)
if policy_file_json is None:
# No policy
continue
# Find the entity we need
policy_string = policy_file_json["Policy"]
# Load the string value as json
policy = json.loads(policy_string)
policy = Policy(policy)
if policy.is_internet_accessible():
findings.add(
Finding(region, "LAMBDA_PUBLIC", name, resource_details=policy_string)
)
)
if queue_attributes is None:
# No policy
continue
# Find the entity we need
attributes = queue_attributes["Attributes"]
if "Policy" in attributes:
policy_string = attributes["Policy"]
else:
# No policy set
continue
# Load the string value as json
policy = json.loads(policy_string)
policy = Policy(policy)
if policy.is_internet_accessible():
findings.add(
Finding(
region, "SQS_PUBLIC", queue_name, resource_details=policy_string
)
from policyuniverse.policy import Policy
policies = list()
for key in policy_keys:
try:
policy = dpath.util.values(item.config, key, separator='$')
if isinstance(policy, list):
for p in policy:
if not p:
continue
if isinstance(p, list):
policies.extend([Policy(pp) for pp in p])
else:
policies.append(Policy(p))
else:
policies.append(Policy(policy))
except PathNotFound:
continue
return policies
"""
author: spiper
description: Detect resources being made public.
playbook: (a) identify the AWS account in the log
(b) identify what resource(s) are impacted by the API call
(c) determine if the intent is valid, malicious or accidental
"""
# Check S3
if rec['eventName'] == 'PutBucketPolicy':
# S3 doesn't use a policy string, but actual json, unlike all
# other commands
policy = rec.get('requestParameters', {}).get('bucketPolicy', None)
if not policy:
return False
policy = Policy(policy)
if policy.is_internet_accessible():
return True
# Get the policy string for each resource
policy_string = ''
# Check ElasticSearch
if rec['eventName'] == 'CreateElasticsearchDomain':
policy_string = rec.get('requestParameters', {}).get('accessPolicies', '')
elif rec['eventName'] == 'UpdateElasticsearchDomainConfig':
policy_string = rec.get('requestParameters', {}).get('accessPolicies', '')
# Check Glacier Vaults
elif rec['eventName'] == 'SetVaultAccessPolicy':
policy_string = (
rec.get('requestParameters', {}).get('policy', {}).get('policy', '')
policy_string = rec.get('requestParameters', {}).get('policyText', '')
# Check KMS
elif rec['eventName'] == 'PutKeyPolicy':
policy_string = rec.get('requestParameters', {}).get('policy', '')
elif rec['eventName'] == 'CreateKey':
policy_string = rec.get('requestParameters', {}).get('policy', '')
# Check SecretsManager
elif rec['eventName'] == 'PutResourcePolicy':
policy_string = rec.get('requestParameters', {}).get('resourcePolicy', '')
# Check the policy
if policy_string:
policy = json.loads(policy_string)
policy = Policy(policy)
if policy.is_internet_accessible():
return True
return False
return
for key in json_blob.get("Keys", []):
name = key["KeyId"]
# Check policy
policy_file_json = get_parameter_file(region, "kms", "get-key-policy", name)
if policy_file_json is None:
# No policy
continue
# Find the entity we need
policy_string = policy_file_json["Policy"]
# Load the string value as json
policy = json.loads(policy_string)
policy = Policy(policy)
if policy.is_internet_accessible():
findings.add(
Finding(region, "KMS_PUBLIC", name, resource_details=policy_string)
)