Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from c7n.actions import Action
from c7n.manager import resources
from c7n.query import QueryResourceManager, TypeInfo
from c7n.tags import Tag, RemoveTag
from c7n.utils import type_schema, local_session, dumps, chunks
@resources.register('step-machine')
class StepFunction(QueryResourceManager):
"""AWS Step Functions State Machine"""
class resource_type(TypeInfo):
service = 'stepfunctions'
enum_spec = ('list_state_machines', 'stateMachines', None)
arn = id = 'stateMachineArn'
arn_type = 'stateMachine'
name = 'name'
date = 'creationDate'
detail_spec = (
"describe_state_machine", "stateMachineArn",
'stateMachineArn', None)
class InvokeStepFunction(Action):
"""Invoke step function on resources.
By default this will invoke a step function for each resource
providing both the `policy` and `resource` as input.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from c7n.manager import resources
from c7n.query import QueryResourceManager, TypeInfo
@resources.register('dlm-policy')
class DLMPolicy(QueryResourceManager):
class resource_type(TypeInfo):
service = 'dlm'
id = name = 'PolicyId'
enum_spec = (
'get_lifecycle_policies', 'Policies', None)
detail_spec = ('get_lifecycle_policy', 'PolicyId', 'PolicyId', 'Policy')
filter_name = 'PolicyIds'
filter_type = 'list'
arn = False
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from c7n.actions import BaseAction
from c7n.manager import resources
from c7n.query import QueryResourceManager, DescribeSource, ConfigSource, TypeInfo
from c7n.tags import universal_augment, register_universal_tags
from c7n.utils import type_schema, local_session
@resources.register('acm-certificate')
class Certificate(QueryResourceManager):
class resource_type(TypeInfo):
service = 'acm'
enum_spec = ('list_certificates', 'CertificateSummaryList', None)
id = 'CertificateArn'
name = 'DomainName'
date = 'CreatedAt'
detail_spec = (
"describe_certificate", "CertificateArn",
'CertificateArn', 'Certificate')
config_type = "AWS::ACM::Certificate"
arn_type = 'certificate'
universal_taggable = object()
def get_source(self, source_type):
if source_type == 'describe':
return DescribeCertificate(self)
elif source_type == 'config':
service = 'redshift'
arn_type = 'redshift-subnet-group'
id = name = 'ClusterSubnetGroupName'
enum_spec = (
'describe_cluster_subnet_groups', 'ClusterSubnetGroups', None)
filter_name = 'ClusterSubnetGroupName'
filter_type = 'scalar'
config_type = "AWS::Redshift::ClusterSubnetGroup"
@resources.register('redshift-snapshot')
class RedshiftSnapshot(QueryResourceManager):
"""Resource manager for Redshift snapshots.
"""
class resource_type(TypeInfo):
service = 'redshift'
arn_type = 'snapshot'
arn_separator = ':'
enum_spec = ('describe_cluster_snapshots', 'Snapshots', None)
name = id = 'SnapshotIdentifier'
date = 'SnapshotCreateTime'
config_type = "AWS::Redshift::ClusterSnapshot"
universal_taggable = True
def get_arns(self, resources):
arns = []
for r in resources:
arns.append(self.generate_arn(r['ClusterIdentifier'] + '/' + r[self.get_model().id]))
return arns
import json
from c7n.actions import RemovePolicyBase
from c7n.filters import CrossAccountAccessFilter
from c7n.query import QueryResourceManager, TypeInfo
from c7n.manager import resources
from c7n.utils import get_retry, local_session
@resources.register('glacier')
class Glacier(QueryResourceManager):
permissions = ('glacier:ListTagsForVault',)
retry = staticmethod(get_retry(('Throttled',)))
class resource_type(TypeInfo):
service = 'glacier'
enum_spec = ('list_vaults', 'VaultList', None)
name = id = "VaultName"
arn = "VaultARN"
arn_type = 'vaults'
universal_taggable = True
def augment(self, resources):
def process_tags(resource):
client = local_session(self.session_factory).client('glacier')
tag_dict = self.retry(
client.list_tags_for_vault,
vaultName=resource[self.get_model().name])['Tags']
tag_list = []
for k, v in tag_dict.items():
tag_list.append({'Key': k, 'Value': v})
from c7n.actions import BaseAction, RemovePolicyBase, ModifyVpcSecurityGroupsAction
from c7n.filters import CrossAccountAccessFilter, ValueFilter
import c7n.filters.vpc as net_filters
from c7n.manager import resources
from c7n import query
from c7n.resources.iam import CheckPermissions
from c7n.tags import universal_augment
from c7n.utils import local_session, type_schema
ErrAccessDenied = "AccessDeniedException"
@resources.register('lambda')
class AWSLambda(query.QueryResourceManager):
class resource_type(query.TypeInfo):
service = 'lambda'
arn_type = 'function'
arn_separator = ":"
enum_spec = ('list_functions', 'Functions', None)
name = id = 'FunctionName'
date = 'LastModified'
dimension = 'FunctionName'
config_type = "AWS::Lambda::Function"
universal_taggable = object()
def get_source(self, source_type):
if source_type == 'describe':
return DescribeLambda(self)
elif source_type == 'config':
return ConfigLambda(self)
raise ValueError("Unsupported source: %s for %s" % (
class resource_type(query.TypeInfo):
service = 'ec2'
arn_type = 'vpc-connection'
enum_spec = ('describe_vpn_connections', 'VpnConnections', None)
name = id = 'VpnConnectionId'
filter_name = 'VpnConnectionIds'
filter_type = 'list'
config_type = 'AWS::EC2::VPNConnection'
id_prefix = "vpn-"
@resources.register('vpn-gateway')
class VPNGateway(query.QueryResourceManager):
class resource_type(query.TypeInfo):
service = 'ec2'
arn_type = 'vpc-gateway'
enum_spec = ('describe_vpn_gateways', 'VpnGateways', None)
name = id = 'VpnGatewayId'
filter_name = 'VpnGatewayIds'
filter_type = 'list'
config_type = 'AWS::EC2::VPNGateway'
id_prefix = "vgw-"
@resources.register('vpc-endpoint')
class VpcEndpoint(query.QueryResourceManager):
class resource_type(query.TypeInfo):
service = 'ec2'
arn_type = 'vpc-endpoint'
chunks,
get_retry,
local_session,
set_annotation,
type_schema,
QueryParser,
)
from c7n.resources.ami import AMI
log = logging.getLogger('custodian.ebs')
@resources.register('ebs-snapshot')
class Snapshot(QueryResourceManager):
class resource_type(TypeInfo):
service = 'ec2'
arn_type = 'snapshot'
enum_spec = (
'describe_snapshots', 'Snapshots', None)
id = 'SnapshotId'
filter_name = 'SnapshotIds'
filter_type = 'list'
name = 'SnapshotId'
date = 'StartTime'
default_report_fields = (
'SnapshotId',
'VolumeId',
'tag:InstanceId',
'VolumeSize',
'StartTime',
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from c7n.manager import resources
from c7n.query import QueryResourceManager, TypeInfo
from c7n.utils import local_session
@resources.register('backup-plan')
class BackupPlan(QueryResourceManager):
class resource_type(TypeInfo):
service = 'backup'
enum_spec = ('list_backup_plans', 'BackupPlansList', None)
detail_spec = ('get_backup_plan', 'BackupPlanId', 'BackupPlanId', 'BackupPlan')
id = 'BackupPlanName'
name = 'BackupPlanId'
arn = 'BackupPlanArn'
def augment(self, resources):
super(BackupPlan, self).augment(resources)
client = local_session(self.session_factory).client('backup')
results = []
for r in resources:
try:
tags = client.list_tags(ResourceArn=r['BackupPlanArn']).get('Tags', {})
except client.exceptions.ResourceNotFoundException:
continue
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from c7n.manager import resources
from c7n.query import QueryResourceManager, TypeInfo
@resources.register('waf')
class WAF(QueryResourceManager):
class resource_type(TypeInfo):
service = "waf"
enum_spec = ("list_web_acls", "WebACLs", None)
detail_spec = ("get_web_acl", "WebACLId", "WebACLId", "WebACL")
name = "Name"
id = "WebACLId"
dimension = "WebACL"
config_type = "AWS::WAF::WebACL"
arn_type = "webacl"
@resources.register('waf-regional')
class RegionalWAF(QueryResourceManager):
class resource_type(TypeInfo):
service = "waf-regional"
enum_spec = ("list_web_acls", "WebACLs", None)