Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_name_from_structure(item, default):
"""
Given a possibly sparsely populated item dictionary, try to retrieve the item name.
First try the default field. If that doesn't exist, try to parse the from the ARN.
:param item: dict containing (at the very least) item_name and/or arn
:return: item name
"""
if item.get(default):
return item.get(default)
if item.get('Arn'):
arn = item.get('Arn')
item_arn = ARN(arn)
if item_arn.error:
raise CloudAuxException('Bad ARN: {arn}'.format(arn=arn))
return item_arn.parsed_name
raise MissingFieldException('Cannot extract item name from input: {input}.'.format(input=item))
def _conn_from_arn(arn):
"""
Extracts the account number from an ARN.
:param arn: Amazon ARN containing account number.
:return: dictionary with a single account_number key that can be merged with an existing
connection dictionary containing fields such as assume_role, session_name, region.
"""
arn = ARN(arn)
if arn.error:
raise CloudAuxException('Bad ARN: {arn}'.format(arn=arn))
return dict(
account_number=arn.account_number,
)
def revoke_rule(rule, group, account_number=None, region=None, assume_role=None, client=None):
if rule.direction == 'egress':
# response = client.authorize_security_group_egress()
raise CloudAuxException("Modifying egress rules is not yet supported.")
else:
client.revoke_security_group_ingress(
GroupId=group.aws_group_id,
IpProtocol=rule.protocol,
FromPort=rule.from_port,
ToPort=rule.to_port,
CidrIp=rule.cidr,
)
def authorize_rule(rule, group, account_number=None, region=None, assume_role=None, client=None):
if rule.direction == 'egress':
# response = client.authorize_security_group_egress()
raise CloudAuxException("Modifying egress rules is not yet supported.")
else:
if rule.cidr:
client.authorize_security_group_ingress(
GroupId=group.aws_group_id,
IpProtocol=rule.protocol,
FromPort=rule.from_port,
ToPort=rule.to_port,
CidrIp=rule.cidr
)
else:
client.authorize_security_group_ingress(
GroupId=group.aws_group_id,
IpProtocol=rule.protocol,
FromPort=rule.from_port,
ToPort=rule.to_port,
SourceSecurityGroupName=rule.source_security_group.name,
"RouteTables": ...,
"NetworkAcls": ...,
"FlowLogs": ...,
"Subnets": ...,
"Attributes": ...,
"_version": 1
}
:param vpc_id: The ID of the VPC
:param flags:
:param conn:
:return:
"""
# Is the account number that's passed in the same as in the connection dictionary?
if not conn.get("account_number"):
raise CloudAuxException({"message": "Must supply account number in the connection dict to construct "
"the VPC ARN.",
"vpc_id": vpc_id})
if not conn.get("region"):
raise CloudAuxException({"message": "Must supply region in the connection dict to construct "
"the VPC ARN.",
"vpc_id": vpc_id})
start = {
'arn': "arn:aws:ec2:{region}:{account}:vpc/{vpc_id}".format(region=conn["region"],
account=conn["account_number"],
vpc_id=vpc_id),
'id': vpc_id
}
return registry.build_out(flags, start_with=start, pass_datastructure=True, **conn)