Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def xtest_policy_run(self):
manager.resources.register("dummy", DummyResource)
self.addCleanup(manager.resources.unregister, "dummy")
self.output_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.output_dir)
collection = self.load_policy_set(
{"policies": [{"name": "process-instances", "resource": "dummy"}]},
{"output_dir": self.output_dir},
)
p = collection.policies[0]
p()
self.assertEqual(len(p.ctx.metrics.data), 3)
import c7n.resources.secretsmanager
import c7n.resources.sfn
import c7n.resources.shield
import c7n.resources.simpledb
import c7n.resources.snowball
import c7n.resources.sns
import c7n.resources.storagegw
import c7n.resources.sqs
import c7n.resources.ssm
import c7n.resources.support
import c7n.resources.vpc
import c7n.resources.waf
# Load external plugins (private sdks etc)
from c7n.manager import resources
resources.load_plugins()
resources.notify(resources.EVENT_FINAL)
# super cheese tech debt / fix me pls
from c7n.actions import securityhub
LOADED = True
@resources.register('iam-profile')
class InstanceProfile(QueryResourceManager):
class resource_type(TypeInfo):
service = 'iam'
arn_type = 'instance-profile'
enum_spec = ('list_instance_profiles', 'InstanceProfiles', None)
id = 'InstanceProfileId'
name = 'InstanceProfileId'
date = 'CreateDate'
# Denotes this resource type exists across regions
global_resource = True
arn = 'Arn'
@resources.register('iam-certificate')
class ServerCertificate(QueryResourceManager):
class resource_type(TypeInfo):
service = 'iam'
arn_type = 'server-certificate'
enum_spec = ('list_server_certificates',
'ServerCertificateMetadataList',
None)
id = 'ServerCertificateId'
name = 'ServerCertificateName'
date = 'Expiration'
# Denotes this resource type exists across regions
global_resource = True
@User.filter_registry.register('usage')
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from c7n.manager import resources
from c7n.query import QueryResourceManager
@resources.register('firehose')
class DeliveryStream(QueryResourceManager):
resource_type = "aws.firehose.deliverystream"
params['Payload'] = utils.dumps(payload)
result = client.invoke(**params)
result['Payload'] = result['Payload'].read()
if isinstance(result['Payload'], bytes):
result['Payload'] = result['Payload'].decode('utf-8')
results.append(result)
return results
def register_action_invoke_lambda(registry, _):
for resource in registry.keys():
klass = registry.get(resource)
klass.action_registry.register('invoke-lambda', LambdaInvoke)
resources.subscribe(resources.EVENT_FINAL, register_action_invoke_lambda)
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from c7n.manager import resources
from c7n.query import QueryResourceManager, TypeInfo
from c7n import utils
from c7n import tags
from c7n.utils import local_session, type_schema
from c7n.actions import BaseAction
log = logging.getLogger('custodian.elasticbeanstalk')
@resources.register('elasticbeanstalk')
class ElasticBeanstalk(QueryResourceManager):
class resource_type(TypeInfo):
service = 'elasticbeanstalk'
enum_spec = ('describe_applications', 'Applications', None)
name = "ApplicationName"
id = "ApplicationName"
arn = "ApplicationArn"
arn_type = 'application'
default_report_fields = (
'ApplicationName',
'DateCreated',
'DateUpdated'
)
filter_name = 'ApplicationNames'
filter_type = 'list'
from c7n.filters import (
Filter, FilterRegistry, DefaultVpcBase, MetricsFilter, ValueFilter)
import c7n.filters.vpc as net_filters
from c7n import tags
from c7n.manager import resources
from c7n.query import QueryResourceManager, DescribeSource, ConfigSource, TypeInfo
from c7n.utils import (
local_session, chunks, type_schema, get_retry, set_annotation)
from c7n.resources.shield import IsShieldProtected, SetShieldProtection
log = logging.getLogger('custodian.app-elb')
@resources.register('app-elb')
class AppELB(QueryResourceManager):
"""Resource manager for v2 ELBs (AKA ALBs and NLBs).
"""
class resource_type(TypeInfo):
service = 'elbv2'
enum_spec = ('describe_load_balancers', 'LoadBalancers', None)
name = 'LoadBalancerName'
id = 'LoadBalancerArn'
filter_name = "Names"
filter_type = "list"
dimension = "LoadBalancer"
date = 'CreatedTime'
config_type = 'AWS::ElasticLoadBalancingV2::LoadBalancer'
arn = "LoadBalancerArn"
# The suffix varies by type of loadbalancer (app vs net)
@classmethod
def register_resources(klass, registry, resource_class):
""" meta model subscriber on resource registration.
SecurityHub Findings Filter
"""
for rtype, resource_manager in registry.items():
if not resource_manager.has_arn():
continue
if 'post-finding' in resource_manager.action_registry:
continue
resource_class.filter_registry.register('finding', klass)
resources.subscribe(resources.EVENT_REGISTER, SecurityHubFindingFilter.register_resources)
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from botocore.exceptions import ClientError
import json
from c7n.actions import RemovePolicyBase
from c7n.filters import CrossAccountAccessFilter
from c7n.query import QueryResourceManager, TypeInfo
from c7n.manager import resources
from c7n.utils import get_retry, local_session
@resources.register('glacier')
class Glacier(QueryResourceManager):
permissions = ('glacier:ListTagsForVault',)
retry = staticmethod(get_retry(('Throttled',)))
class resource_type(TypeInfo):
service = 'glacier'
enum_spec = ('list_vaults', 'VaultList', None)
name = id = "VaultName"
arn = "VaultARN"
arn_type = 'vaults'
universal_taggable = True
def augment(self, resources):
def process_tags(resource):
client = local_session(self.session_factory).client('glacier')
@classmethod
def register_resources(klass, registry, resource_class):
"""model resource subscriber on resource registration.
Watch for new resource types being registered if they support aws config,
automatically, register the config-compliance filter.
"""
config_type = getattr(resource_class.resource_type, 'config_type', None)
if config_type is None:
return
resource_class.filter_registry.register('config-compliance', klass)
resources.subscribe(resources.EVENT_REGISTER, ConfigCompliance.register_resources)