Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@resources.register('kms')
class KeyAlias(QueryResourceManager):
class resource_type(TypeInfo):
service = 'kms'
arn_type = 'alias'
enum_spec = ('list_aliases', 'Aliases', None)
name = "AliasName"
id = "AliasArn"
def augment(self, resources):
return [r for r in resources if 'TargetKeyId' in r]
@resources.register('kms-key')
class Key(QueryResourceManager):
class resource_type(TypeInfo):
service = 'kms'
arn_type = "key"
enum_spec = ('list_keys', 'Keys', None)
name = "KeyId"
id = "KeyArn"
universal_taggable = True
def augment(self, resources):
client = local_session(self.session_factory).client('kms')
for r in resources:
try:
key_id = r.get('KeyArn')
info = client.describe_key(KeyId=key_id)['KeyMetadata']
class resource_type(query.TypeInfo):
service = 'ec2'
arn_type = 'subnet'
enum_spec = ('describe_subnets', 'Subnets', None)
name = id = 'SubnetId'
filter_name = 'SubnetIds'
filter_type = 'list'
config_type = 'AWS::EC2::Subnet'
id_prefix = "subnet-"
Subnet.filter_registry.register('flow-logs', FlowLogFilter)
@resources.register('security-group')
class SecurityGroup(query.QueryResourceManager):
class resource_type(query.TypeInfo):
service = 'ec2'
arn_type = 'security-group'
enum_spec = ('describe_security_groups', 'SecurityGroups', None)
name = id = 'GroupId'
filter_name = "GroupIds"
filter_type = 'list'
config_type = "AWS::EC2::SecurityGroup"
id_prefix = "sg-"
def get_source(self, source_type):
if source_type == 'config':
return ConfigSG(self)
return super(SecurityGroup, self).get_source(source_type)
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from c7n.manager import resources
from c7n.query import QueryResourceManager, TypeInfo
from c7n.actions import BaseAction
from c7n.tags import Tag, TagDelayedAction, RemoveTag, coalesce_copy_user_tags, TagActionFilter
from c7n.utils import local_session, type_schema
from c7n.filters.kms import KmsRelatedFilter
@resources.register('fsx')
class FSx(QueryResourceManager):
class resource_type(TypeInfo):
service = 'fsx'
enum_spec = ('describe_file_systems', 'FileSystems', None)
name = id = 'FileSystemId'
arn = "ResourceARN"
date = 'CreationTime'
@resources.register('fsx-backup')
class FSxBackup(QueryResourceManager):
class resource_type(TypeInfo):
service = 'fsx'
enum_spec = ('describe_backups', 'Backups', None)
name = id = 'BackupId'
group_name = self.data['group']
state = self.data['state']
client = local_session(self.manager.session_factory).client('iam')
op_map = {
'add': client.add_user_to_group,
'remove': client.remove_user_from_group
}
for r in resources:
try:
op_map[state](GroupName=group_name, UserName=r['UserName'])
except client.exceptions.NoSuchEntityException:
continue
@resources.register('iam-policy')
class Policy(QueryResourceManager):
class resource_type(TypeInfo):
service = 'iam'
arn_type = 'policy'
enum_spec = ('list_policies', 'Policies', None)
id = 'PolicyId'
name = 'PolicyName'
date = 'CreateDate'
config_type = "AWS::IAM::Policy"
# Denotes this resource type exists across regions
global_resource = True
arn = 'Arn'
def get_source(self, source_type):
if source_type == 'describe':
return DescribePolicy(self)
chunks, local_session, set_annotation, type_schema, dumps)
log = logging.getLogger('custodian.s3')
filters = FilterRegistry('s3.filters')
actions = ActionRegistry('s3.actions')
filters.register('marked-for-op', TagActionFilter)
actions.register('auto-tag-user', AutoTagUser)
actions.register('put-metric', PutMetric)
MAX_COPY_SIZE = 1024 * 1024 * 1024 * 2
@resources.register('s3')
class S3(query.QueryResourceManager):
class resource_type(object):
service = 's3'
type = 'bucket'
enum_spec = ('list_buckets', 'Buckets[]', None)
detail_spec = ('list_objects', 'Bucket', 'Contents[]')
name = id = 'Name'
filter_name = None
date = 'CreationDate'
dimension = 'BucketName'
config_type = 'AWS::S3::Bucket'
filter_registry = filters
action_registry = actions
def __init__(self, ctx, data):
self.manager.session_factory).client('elasticache')
groups = super(
ElasticacheClusterModifyVpcSecurityGroups, self).get_groups(
clusters)
for idx, c in enumerate(clusters):
# build map of Replication Groups to Security Groups
replication_group_map[c['ReplicationGroupId']] = groups[idx]
for idx, r in enumerate(replication_group_map.keys()):
client.modify_replication_group(
ReplicationGroupId=r,
SecurityGroupIds=replication_group_map[r])
@resources.register('cache-subnet-group')
class ElastiCacheSubnetGroup(QueryResourceManager):
class resource_type(TypeInfo):
service = 'elasticache'
arn_type = 'subnet-group'
enum_spec = ('describe_cache_subnet_groups',
'CacheSubnetGroups', None)
name = id = 'CacheSubnetGroupName'
filter_name = 'CacheSubnetGroupName'
filter_type = 'scalar'
@resources.register('cache-snapshot')
class ElastiCacheSnapshot(QueryResourceManager):
class resource_type(TypeInfo):
service = 'elasticache'
from c7n.actions import Action
from c7n.exceptions import PolicyValidationError
from c7n.filters import ValueFilter, AgeFilter, Filter
from c7n.filters.offhours import OffHour, OnHour
import c7n.filters.vpc as net_filters
from c7n.manager import resources
from c7n import query
from c7n.tags import TagActionFilter, DEFAULT_TAG, TagCountFilter, TagTrim, TagDelayedAction
from c7n.utils import local_session, type_schema, chunks, get_retry
from .ec2 import deserialize_user_data
@resources.register('asg')
class ASG(query.QueryResourceManager):
class resource_type(query.TypeInfo):
service = 'autoscaling'
arn = 'AutoScalingGroupARN'
arn_type = 'autoScalingGroup'
arn_separator = ":"
id = name = 'AutoScalingGroupName'
date = 'CreatedTime'
dimension = 'AutoScalingGroupName'
enum_spec = ('describe_auto_scaling_groups', 'AutoScalingGroups', None)
filter_name = 'AutoScalingGroupNames'
filter_type = 'list'
config_type = 'AWS::AutoScaling::AutoScalingGroup'
default_report_fields = (
'AutoScalingGroupName',
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
import json
from c7n.actions import RemovePolicyBase, Action
from c7n.exceptions import PolicyValidationError
from c7n.filters import CrossAccountAccessFilter, Filter, ValueFilter
from c7n.manager import resources
from c7n.query import QueryResourceManager, TypeInfo
from c7n import tags
from c7n.utils import local_session, type_schema
@resources.register('ecr')
class ECR(QueryResourceManager):
class resource_type(TypeInfo):
service = 'ecr'
enum_spec = ('describe_repositories', 'repositories', None)
name = "repositoryName"
arn = id = "repositoryArn"
arn_type = 'repository'
filter_name = 'repositoryNames'
filter_type = 'list'
def augment(self, resources):
client = local_session(self.session_factory).client('ecr')
results = []
for r in resources:
try:
r['Tags'] = client.list_tags_for_resource(