Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_iam_username(cls):
if cls._default_iam_username is None:
try:
user = resources.iam.CurrentUser().user
cls._default_iam_username = getattr(user, "name", ARN(user.arn).resource.split("/")[-1])
except Exception as e:
try:
if "Must specify userName" in str(e) or ("assumed-role" in str(e) and "botocore-session" in str(e)):
cur_session = boto3.Session()._session
src_profile = cur_session.full_config["profiles"][cur_session.profile]["source_profile"]
src_session = boto3.Session(profile_name=src_profile)
cls._default_iam_username = src_session.resource("iam").CurrentUser().user.name
else:
caller_arn = ARN(clients.sts.get_caller_identity()["Arn"])
cls._default_iam_username = caller_arn.resource.split("/")[-1]
except Exception:
cls._default_iam_username = "unknown"
return cls._default_iam_username
def ensure_iam_entity(iam_entity_name, policies, collection, constructor, **constructor_args):
for entity in collection.all():
if entity.name == iam_entity_name:
break
else:
entity = constructor(**constructor_args)
attached_policies = [policy.arn for policy in entity.attached_policies.all()]
for policy in policies:
if isinstance(policy, IAMPolicyBuilder):
entity.Policy(__name__).put(PolicyDocument=str(policy))
else:
policy_arn = "arn:aws:iam::aws:policy/{}".format(policy)
if policy_arn not in attached_policies:
entity.attach_policy(PolicyArn="arn:aws:iam::aws:policy/{}".format(policy))
# TODO: accommodate IAM eventual consistency
return entity
def ensure_iam_policy(name, doc):
try:
return resources.iam.create_policy(PolicyName=name, PolicyDocument=str(doc))
except ClientError as e:
expect_error_codes(e, "EntityAlreadyExists")
policy = resources.iam.Policy(str(ARN(service="iam", region="", resource="policy/" + name)))
policy.create_version(PolicyDocument=str(doc), SetAsDefault=True)
for version in policy.versions.all():
if not version.is_default_version:
version.delete()
return policy
if column_specs is not None:
column_names = ["Row"]
column_names.extend([col["name"] for col in column_specs])
column_specs = [{"name": "Row", "type": "float"}] + column_specs
if column_names is not None:
for i in range(len(column_names)):
if column_names[i].lower() == "id":
id_column = i
my_col = ansi_truncate(str(column_names[i]), max_col_width if i not in {0, id_column} else 99)
my_col_names.append(my_col)
col_widths[i] = max(col_widths[i], len(strip_ansi_codes(my_col)))
trunc_table = []
for row in table:
my_row = []
for i in range(len(row)):
my_item = ansi_truncate(str(row[i]), max_col_width if i not in {0, id_column} else 99)
my_row.append(my_item)
col_widths[i] = max(col_widths[i], len(strip_ansi_codes(my_item)))
trunc_table.append(my_row)
type_colormap = {"boolean": BLUE(),
"integer": YELLOW(),
"float": WHITE(),
"string": GREEN()}
for i in "uint8", "int16", "uint16", "int32", "uint32", "int64":
type_colormap[i] = type_colormap["integer"]
type_colormap["double"] = type_colormap["float"]
def col_head(i):
if column_specs is not None:
return BOLD() + type_colormap[column_specs[i]["type"]] + column_names[i] + ENDC()
else:
def ensure_iam_policy(name, doc):
try:
return resources.iam.create_policy(PolicyName=name, PolicyDocument=str(doc))
except ClientError as e:
expect_error_codes(e, "EntityAlreadyExists")
policy = resources.iam.Policy(str(ARN(service="iam", region="", resource="policy/" + name)))
policy.create_version(PolicyDocument=str(doc), SetAsDefault=True)
for version in policy.versions.all():
if not version.is_default_version:
version.delete()
return policy
def ensure_s3_bucket(name=None, policy=None, lifecycle=None):
if name is None:
name = "aegea-assets-{}".format(ARN.get_account_id())
bucket = resources.s3.Bucket(name)
try:
clients.s3.head_bucket(Bucket=bucket.name)
except ClientError as e:
logger.debug(e)
if ARN.get_region() == "us-east-1":
bucket.create()
else:
bucket.create(CreateBucketConfiguration=dict(LocationConstraint=ARN.get_region()))
bucket.wait_until_exists()
if policy:
bucket.Policy().put(Policy=str(policy))
if lifecycle:
bucket.LifecycleConfiguration().put(LifecycleConfiguration=dict(lifecycle))
return bucket
def ensure_iam_policy(name, doc):
try:
return resources.iam.create_policy(PolicyName=name, PolicyDocument=str(doc))
except ClientError as e:
expect_error_codes(e, "EntityAlreadyExists")
policy = resources.iam.Policy(str(ARN(service="iam", region="", resource="policy/" + name)))
policy.create_version(PolicyDocument=str(doc), SetAsDefault=True)
for version in policy.versions.all():
if not version.is_default_version:
version.delete()
return policy
def ensure_iam_role(name, policies=frozenset(), trust=frozenset()):
assume_role_policy = IAMPolicyBuilder()
assume_role_policy.add_assume_role_principals(trust)
role = ensure_iam_entity(name, policies=policies, collection=resources.iam.roles,
constructor=resources.iam.create_role, RoleName=name,
AssumeRolePolicyDocument=str(assume_role_policy))
trust_policy = IAMPolicyBuilder(role.assume_role_policy_document)
trust_policy.add_assume_role_principals(trust)
if trust_policy.policy != role.assume_role_policy_document:
logger.debug("Updating trust policy for %s", role)
role.AssumeRolePolicy().update(PolicyDocument=str(trust_policy))
return role
if len(table) > 0:
col_widths = [0] * len(table[0])
elif column_specs is not None:
col_widths = [0] * (len(column_specs) + 1)
elif column_names is not None:
col_widths = [0] * len(column_names)
my_col_names, id_column = [], None
if column_specs is not None:
column_names = ["Row"]
column_names.extend([col["name"] for col in column_specs])
column_specs = [{"name": "Row", "type": "float"}] + column_specs
if column_names is not None:
for i in range(len(column_names)):
if column_names[i].lower() == "id":
id_column = i
my_col = ansi_truncate(str(column_names[i]), max_col_width if i not in {0, id_column} else 99)
my_col_names.append(my_col)
col_widths[i] = max(col_widths[i], len(strip_ansi_codes(my_col)))
trunc_table = []
for row in table:
my_row = []
for i in range(len(row)):
my_item = ansi_truncate(str(row[i]), max_col_width if i not in {0, id_column} else 99)
my_row.append(my_item)
col_widths[i] = max(col_widths[i], len(strip_ansi_codes(my_item)))
trunc_table.append(my_row)
type_colormap = {"boolean": BLUE(),
"integer": YELLOW(),
"float": WHITE(),
"string": GREEN()}
for i in "uint8", "int16", "uint16", "int32", "uint32", "int64":