Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_retry_errors(self):
self.patch(time, "sleep", lambda x: x)
self.count = 0
def func():
self.count += 1
raise ClientError({"Error": {"Code": 42}}, "something")
retry = utils.get_retry((42,), 5)
try:
retry(func)
except ClientError:
self.assertEqual(self.count, 5)
else:
self.fail("should have raised")
from botocore.paginate import Paginator
from c7n.query import QueryResourceManager, ChildResourceManager, TypeInfo
from c7n.manager import resources
from c7n.utils import chunks, get_retry, generate_arn, local_session, type_schema
from c7n.actions import BaseAction
from c7n.filters import Filter
from c7n.resources.shield import IsShieldProtected, SetShieldProtection
from c7n.tags import RemoveTag, Tag
class Route53Base(object):
permissions = ('route53:ListTagsForResources',)
retry = staticmethod(get_retry(('Throttled',)))
@property
def generate_arn(self):
if self._generate_arn is None:
self._generate_arn = functools.partial(
generate_arn,
self.get_model().service,
resource_type=self.get_model().arn_type)
return self._generate_arn
def get_arn(self, r):
return self.generate_arn(r[self.get_model().id].split("/")[-1])
def augment(self, resources):
_describe_route53_tags(
self.get_model(), resources, self.session_factory,
With automatic credential renewal.
Args:
role_arn: iam role arn to assume
session_name: client session identifier
session: an optional extant session, note session is captured
in a function closure for renewing the sts assumed role.
:return: a boto3 session using the sts assumed role credentials
Notes: We have to poke at botocore internals a few times
"""
if session is None:
session = Session()
retry = get_retry(('Throttling',))
def refresh():
parameters = {"RoleArn": role_arn, "RoleSessionName": session_name}
if external_id is not None:
parameters['ExternalId'] = external_id
credentials = retry(
session.client(
'sts', endpoint_url=sts_regional_endpoint(region)
).assume_role, **parameters)['Credentials']
return dict(
access_key=credentials['AccessKeyId'],
secret_key=credentials['SecretAccessKey'],
token=credentials['SessionToken'],
def filter_last_write(client, groups, start):
"""Filter log groups where the last write was before the start date.
"""
retry = get_retry(('ThrottlingException',))
def process_group(group_set):
matched = []
for g in group_set:
streams = retry(
client.describe_log_streams,
logGroupName=g['logGroupName'],
orderBy='LastEventTime',
limit=1, descending=True)
if not streams.get('logStreams'):
continue
stream = streams['logStreams'][0]
if stream['storedBytes'] == 0 and datetime.fromtimestamp(
stream['creationTime'] / 1000) > start:
matched.append(g)
elif 'lastIngestionTime' in stream and datetime.fromtimestamp(
def get_related_ids(self, resources):
if self.efs_group_cache:
group_ids = set()
for r in resources:
group_ids.update(
self.efs_group_cache.get(r['MountTargetId'], ()))
return list(group_ids)
client = local_session(self.manager.session_factory).client('efs')
groups = {}
group_ids = set()
retry = get_retry(('Throttled',), 12)
for r in resources:
groups[r['MountTargetId']] = retry(
client.describe_mount_target_security_groups,
MountTargetId=r['MountTargetId'])['SecurityGroups']
group_ids.update(groups[r['MountTargetId']])
self.efs_group_cache = groups
return list(group_ids)
{'input_token': 'NextToken', 'output_token': 'NextToken', 'result_key': 'Protections'},
client.meta.service_model.operation_model('ListProtections'))
def get_type_protections(client, model):
pager = get_protections_paginator(client)
pager.PAGE_ITERATOR_CLS = RetryPageIterator
try:
protections = pager.paginate().build_full_result().get('Protections', [])
except client.exceptions.ResourceNotFoundException:
# shield is not enabled in the account, so all resources are not protected
return []
return [p for p in protections if model.arn_type in p['ResourceArn']]
ShieldRetry = get_retry(('ThrottlingException',))
class IsShieldProtected(Filter):
permissions = ('shield:ListProtections',)
schema = type_schema('shield-enabled', state={'type': 'boolean'})
def process(self, resources, event=None):
client = local_session(self.manager.session_factory).client(
'shield', region_name='us-east-1')
protections = get_type_protections(client, self.manager.get_model())
protected_resources = {p['ResourceArn'] for p in protections}
state = self.data.get('state', False)
results = []
try:
self.manager.retry(
asg_client.suspend_processes,
ScalingProcesses=processes,
AutoScalingGroupName=asg['AutoScalingGroupName'])
except ClientError as e:
if e.response['Error']['Code'] == 'ValidationError':
return
raise
ec2_client = session.client('ec2')
try:
instance_ids = [i['InstanceId'] for i in asg['Instances']]
if not instance_ids:
return
retry = get_retry((
'RequestLimitExceeded', 'Client.RequestLimitExceeded'))
retry(ec2_client.stop_instances, InstanceIds=instance_ids)
except ClientError as e:
if e.response['Error']['Code'] in (
'InvalidInstanceID.NotFound',
'IncorrectInstanceState'):
self.log.warning("Erroring stopping asg instances %s %s" % (
asg['AutoScalingGroupName'], e))
return
raise
from botocore.exceptions import ClientError
import json
from c7n.actions import RemovePolicyBase
from c7n.filters import CrossAccountAccessFilter
from c7n.query import QueryResourceManager, TypeInfo
from c7n.manager import resources
from c7n.utils import get_retry, local_session
@resources.register('glacier')
class Glacier(QueryResourceManager):
permissions = ('glacier:ListTagsForVault',)
retry = staticmethod(get_retry(('Throttled',)))
class resource_type(TypeInfo):
service = 'glacier'
enum_spec = ('list_vaults', 'VaultList', None)
name = id = "VaultName"
arn = "VaultARN"
arn_type = 'vaults'
universal_taggable = True
def augment(self, resources):
def process_tags(resource):
client = local_session(self.session_factory).client('glacier')
tag_dict = self.retry(
client.list_tags_for_vault,
vaultName=resource[self.get_model().name])['Tags']
tag_list = []
if (type_name in ('asg', 'ecs-task') and
"%s%s" % (klass.resource_type.arn_type, klass.resource_type.arn_separator)
in arn.resource_type):
return type_name
elif (klass.resource_type.arn_type is not None and
klass.resource_type.arn_type == arn.resource_type):
return type_name
@metrics_outputs.register('aws')
class MetricsOutput(Metrics):
"""Send metrics data to cloudwatch
"""
permissions = ("cloudWatch:PutMetricData",)
retry = staticmethod(utils.get_retry(('Throttling',)))
def __init__(self, ctx, config=None):
super(MetricsOutput, self).__init__(ctx, config)
self.namespace = self.config.get('namespace', DEFAULT_NAMESPACE)
self.region = self.config.get('region')
self.destination = (
self.config.scheme == 'aws' and
self.config.get('netloc') == 'master') and 'master' or None
def _format_metric(self, key, value, unit, dimensions):
d = {
"MetricName": key,
"Timestamp": datetime.datetime.utcnow(),
"Value": value,
"Unit": unit}
d["Dimensions"] = [
def process_instance_set(self, client, instances):
# Setup retry with insufficient capacity as well
retryable = ('InsufficientInstanceCapacity', 'RequestLimitExceeded',
'Client.RequestLimitExceeded'),
retry = utils.get_retry(retryable, max_attempts=5)
instance_ids = [i['InstanceId'] for i in instances]
try:
retry(client.reboot_instances, InstanceIds=instance_ids)
except ClientError as e:
if e.response['Error']['Code'] in retryable:
return True
raise