Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Loads roles specified in file and calls _remove_permissions_from_role() for each one.
Args:
permissions (list)
role_filename (string)
commit (bool)
Returns:
None
"""
roles = list()
with open(role_filename, "r") as fd:
roles = json.load(fd)
for role_arn in tqdm(roles):
arn = ARN(role_arn)
if arn.error:
LOGGER.error("INVALID ARN: {arn}".format(arn=role_arn))
return
account_number = arn.account_number
role_name = arn.name.split("/")[-1]
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
role = Role(get_role_data(dynamo_table, role_id))
_remove_permissions_from_role(
account_number,
permissions,
role,
role_id,
dynamo_table,
def inspect_entity_arn(self, entity, same, item):
arn_input = entity.value
if arn_input == '*':
return set(['UNKNOWN'])
arn = ARN(arn_input)
if arn.error:
self.record_arn_parse_issue(item, arn_input)
if arn.tech == 's3':
return self.inspect_entity_s3(entity, arn.name, same)
return set([self.inspect_entity_account(entity, arn.account_number, same)])
def _arn_internet_accessible(self, arn_input):
if '*' == arn_input:
return True
arn = ARN(arn_input)
if arn.error:
logger.warning('Auditor could not parse ARN {arn}.'.format(arn=arn_input))
return '*' in arn_input
if arn.tech == 's3':
# S3 ARNs typically don't have account numbers.
return False
if not arn.account_number and not arn.service:
logger.warning('Auditor could not parse Account Number from ARN {arn}.'.format(arn=arn_input))
return True
if arn.account_number == '*':
return True
return False
def check_unknown_cross_account(self, item):
policies = self.load_resoruce_policies(item)
for policy in policies:
if policy.is_internet_accessible():
continue
for statement in policy.statements:
if statement.effect != 'Allow':
continue
for who in statement.whos_allowed():
if who.value == '*' and who.category == 'principal':
continue
# Ignore Service Principals
if who.category == 'principal':
arn = ARN(who.value)
if arn.service:
continue
entity = Entity.from_tuple(who)
if 'UNKNOWN' in self.inspect_entity(entity, item):
self.record_unknown_access(item, entity, list(statement.actions))
def check_root_cross_account(self, item):
policies = self.load_resource_policies(item)
for policy in policies:
for statement in policy.statements:
if statement.effect != 'Allow':
continue
for who in statement.whos_allowed():
if who.category not in ['arn', 'principal']:
continue
if who.value == '*':
continue
arn = ARN(who.value)
entity = Entity.from_tuple(who)
if arn.root and self.inspect_entity(entity, item).intersection(set(['FRIENDLY', 'THIRDPARTY', 'UNKNOWN'])):
self.record_cross_account_root(item, entity, list(statement.actions))