Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_blocklist_from_bucket(bucket_config):
try:
s3_resource = boto3_cached_conn('s3', service_type='resource',
account_number=bucket_config.get('account_number'),
assume_role=bucket_config.get('assume_role', None),
session_name='repokid',
region=bucket_config.get('region', 'us-west-2'))
s3_obj = s3_resource.Object(bucket_name=bucket_config['bucket_name'], key=bucket_config['key'])
blocklist = s3_obj.get()['Body'].read().decode("utf-8")
blocklist_json = json.loads(blocklist)
# Blocklist problems are really bad and we should quit rather than silently continue
except (botocore.exceptions.ClientError, AttributeError):
LOGGER.error("S3 blocklist config was set but unable to connect retrieve object, quitting")
sys.exit(1)
except ValueError:
LOGGER.error("S3 blocklist config was set but the returned file is bad, quitting")
sys.exit(1)
if set(blocklist_json.keys()) != set(['arns', 'names']):
def _get_arns(self):
"""
Gets a list of all Role ARNs in a given account, optionally limited by
class property ARN filter
:return: list of role ARNs
"""
client = boto3_cached_conn(
'iam', service_type='client', **self.conn_details)
account_arns = set()
for role in list_roles(**self.conn_details):
account_arns.add(role['Arn'])
for user in list_users(**self.conn_details):
account_arns.add(user['Arn'])
for page in client.get_paginator('list_policies').paginate(Scope='Local'):
for policy in page['Policies']:
account_arns.add(policy['Arn'])
for page in client.get_paginator('list_groups').paginate():
for group in page['Groups']:
def _get_client(self):
"""
Assumes into the target account and obtains IAM client
:return: boto3 IAM client in target account & role
"""
client = boto3_cached_conn(
'iam', account_number=self.account_number, assume_role=self.role_name)
return client
'account_number': account,
'region': region,
'session_name': session_name,
'assume_role': assume_role,
'service_type': service_type,
'external_id': external_id,
'arn_partition': arn_partition,
'read_only': read_only
}
if conn_type == 'cloudaux':
kwargs['cloudaux'] = CloudAux(**conn_dict)
elif conn_type == 'dict':
kwargs['conn_dict'] = conn_dict
elif conn_type == 'boto3':
del conn_dict['tech']
kwargs['conn'] = boto3_cached_conn(service, **conn_dict)
result = func(*args, **kwargs)
if result:
threads.append(result)
result = []
for thread in threads:
result.append(thread)
return result
return decorated_function
dynamo_config (kwargs):
account_number (string)
assume_role (string) optional
session_name (string)
region (string)
endpoint (string)
Returns:
dynamo_table object
"""
if "localhost" in dynamo_config["endpoint"]:
resource = boto3.resource(
"dynamodb", region_name="us-east-1", endpoint_url=dynamo_config["endpoint"]
)
else:
resource = boto3_cached_conn(
"dynamodb",
service_type="resource",
account_number=dynamo_config["account_number"],
assume_role=dynamo_config.get("assume_role", None),
session_name=dynamo_config["session_name"],
region=dynamo_config["region"],
)
for table in resource.tables.all():
if table.name == "repokid_roles":
return table
table = None
try:
table = resource.create_table(
TableName="repokid_roles",