Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def describe_group(record, region):
"""Attempts to describe group ids."""
account_id = record['account']
group_name = cloudwatch.filter_request_parameters('groupName', record)
vpc_id = cloudwatch.filter_request_parameters('vpcId', record)
group_id = cloudwatch.filter_request_parameters('groupId', record, look_in_response=True)
# Did this get collected already by the poller?
if cloudwatch.get_collected_details(record):
LOG.debug(f"[<--] Received already collected security group data: {record['detail']['collected']}")
return [record['detail']['collected']]
try:
# Always depend on Group ID first:
if group_id: # pylint: disable=R1705
return describe_security_groups(
account_number=account_id,
assume_role=HISTORICAL_ROLE,
region=region,
GroupIds=[group_id]
)['SecurityGroups']
elif vpc_id and group_name:
return describe_security_groups(
account_number=account_id,
assume_role=HISTORICAL_ROLE,
region=region,
Filters=[
{
'Name': 'group-name',
'Values': [group_name]
},
for record in records:
# Skip accounts that have role assumption errors:
try:
# Did we get a NextToken?
if record.get('NextToken'):
LOG.debug(f"[@] Received pagination token: {record['NextToken']}")
groups = describe_security_groups(
account_number=record['account_id'],
assume_role=HISTORICAL_ROLE,
region=record['region'],
MaxResults=200,
NextToken=record['NextToken']
)
else:
groups = describe_security_groups(
account_number=record['account_id'],
assume_role=HISTORICAL_ROLE,
region=record['region'],
MaxResults=200
)
# FIRST THINGS FIRST: Did we get a `NextToken`? If so, we need to enqueue that ASAP because
# 'NextToken`s expire in 60 seconds!
if groups.get('NextToken'):
logging.debug(f"[-->] Pagination required {groups['NextToken']}. Tasking continuation.")
produce_events(
[poller_task_schema.serialize_me(record['account_id'], record['region'],
next_token=groups['NextToken'])],
takser_queue_url
)
"""
LOG.debug('[@] Running Poller...')
collector_poller_queue_url = get_queue_url(os.environ.get('POLLER_QUEUE_NAME', 'HistoricalSecurityGroupPoller'))
takser_queue_url = get_queue_url(os.environ.get('POLLER_TASKER_QUEUE_NAME', 'HistoricalSecurityGroupPollerTasker'))
poller_task_schema = HistoricalPollerTaskEventModel()
records = deserialize_records(event['Records'])
for record in records:
# Skip accounts that have role assumption errors:
try:
# Did we get a NextToken?
if record.get('NextToken'):
LOG.debug(f"[@] Received pagination token: {record['NextToken']}")
groups = describe_security_groups(
account_number=record['account_id'],
assume_role=HISTORICAL_ROLE,
region=record['region'],
MaxResults=200,
NextToken=record['NextToken']
)
else:
groups = describe_security_groups(
account_number=record['account_id'],
assume_role=HISTORICAL_ROLE,
region=record['region'],
MaxResults=200
)
# FIRST THINGS FIRST: Did we get a `NextToken`? If so, we need to enqueue that ASAP because
# 'NextToken`s expire in 60 seconds!
if cloudwatch.get_collected_details(record):
LOG.debug(f"[<--] Received already collected security group data: {record['detail']['collected']}")
return [record['detail']['collected']]
try:
# Always depend on Group ID first:
if group_id: # pylint: disable=R1705
return describe_security_groups(
account_number=account_id,
assume_role=HISTORICAL_ROLE,
region=region,
GroupIds=[group_id]
)['SecurityGroups']
elif vpc_id and group_name:
return describe_security_groups(
account_number=account_id,
assume_role=HISTORICAL_ROLE,
region=region,
Filters=[
{
'Name': 'group-name',
'Values': [group_name]
},
{
'Name': 'vpc-id',
'Values': [vpc_id]
}
]
)['SecurityGroups']
else: