Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
s3.complete_multipart_upload(
Bucket=bucket_name, Key=key['Key'], UploadId=upload_id,
MultipartUpload={'Parts': parts})
return key['Key']
def restore_complete(restore):
if ',' in restore:
ongoing, avail = restore.split(',', 1)
else:
ongoing = restore
return 'false' in ongoing
@filters.register('is-log-target')
class LogTarget(Filter):
"""Filter and return buckets are log destinations.
Not suitable for use in lambda on large accounts, This is a api
heavy process to detect scan all possible log sources.
Sources:
- elb (Access Log)
- s3 (Access Log)
- cfn (Template writes)
- cloudtrail
:example:
.. code-block: yaml
policies:
def process(self, resources, event=None):
self.initialize(resources)
bucket_name = self.data.get('bucket', None)
bucket_prefix = self.data.get('prefix', None)
return [alb for alb in resources
if alb['Attributes']['access_logs.s3.enabled'] and
(not bucket_name or bucket_name == alb['Attributes'].get(
'access_logs.s3.bucket', None)) and
(not bucket_prefix or bucket_prefix == alb['Attributes'].get(
'access_logs.s3.prefix', None))
]
@AppELB.filter_registry.register('is-not-logging')
class IsNotLoggingFilter(Filter, AppELBAttributeFilterBase):
""" Matches AppELBs that are NOT logging to S3.
or do not match the optional bucket and/or prefix.
:example:
.. code-block:: yaml
policies:
- name: alb-is-not-logging-test
resource: app-elb
filters:
- type: is-not-logging
- name: alb-is-not-logging-bucket-and-prefix-test
resource: app-elb
filters:
@Role.filter_registry.register('cross-account')
class RoleCrossAccountAccess(CrossAccountAccessFilter):
policy_attribute = 'AssumeRolePolicyDocument'
permissions = ('iam:ListRoles',)
schema = type_schema(
'cross-account',
# white list accounts
whitelist_from=ValuesFrom.schema,
whitelist={'type': 'array', 'items': {'type': 'string'}})
@Role.filter_registry.register('has-inline-policy')
class IamRoleInlinePolicy(Filter):
"""Filter IAM roles that have an inline-policy attached
True: Filter roles that have an inline-policy
False: Filter roles that do not have an inline-policy
:example:
.. code-block:: yaml
policies:
- name: iam-roles-with-inline-policies
resource: iam-role
filters:
- type: has-inline-policy
value: True
"""
def get_type_protections(client, model):
pager = get_protections_paginator(client)
pager.PAGE_ITERATOR_CLS = RetryPageIterator
try:
protections = pager.paginate().build_full_result().get('Protections', [])
except client.exceptions.ResourceNotFoundException:
# shield is not enabled in the account, so all resources are not protected
return []
return [p for p in protections if model.arn_type in p['ResourceArn']]
ShieldRetry = get_retry(('ThrottlingException',))
class IsShieldProtected(Filter):
permissions = ('shield:ListProtections',)
schema = type_schema('shield-enabled', state={'type': 'boolean'})
def process(self, resources, event=None):
client = local_session(self.manager.session_factory).client(
'shield', region_name='us-east-1')
protections = get_type_protections(client, self.manager.get_model())
protected_resources = {p['ResourceArn'] for p in protections}
state = self.data.get('state', False)
results = []
for arn, r in zip(self.manager.get_arns(resources), resources):
r['c7n:ShieldProtected'] = shielded = arn in protected_resources
self.log.warning("could not parse tag:%s value:%s on %s" % (
tag, v, i['InstanceId']))
if self.current_date is None:
self.current_date = datetime.now()
if action_date.tzinfo:
# if action_date is timezone aware, set to timezone provided
action_date = action_date.astimezone(tz)
self.current_date = datetime.now(tz=tz)
return self.current_date >= (
action_date - timedelta(days=skew, hours=skew_hours))
class TagCountFilter(Filter):
"""Simplify tag counting..
ie. these two blocks are equivalent
.. code-block :: yaml
- filters:
- type: value
op: gte
count: 8
- filters:
- type: tag-count
count: 8
"""
schema = utils.type_schema(
results = []
profiles = self.instance_profile_usage()
for r in resources:
if (r['Arn'] not in profiles or r['InstanceProfileName'] not in profiles):
results.append(r)
self.log.info(
"%d of %d instance profiles currently not in use." % (
len(results), len(resources)))
return results
###################
# IAM Users #
###################
class CredentialReport(Filter):
"""Use IAM Credential report to filter users.
The IAM Credential report ( https://goo.gl/sbEPtM ) aggregates
multiple pieces of information on iam users. This makes it highly
efficient for querying multiple aspects of a user that would
otherwise require per user api calls.
For example if we wanted to retrieve all users with mfa who have
never used their password but have active access keys from the
last month
.. code-block:: yaml
- name: iam-mfa-active-keys-no-login
resource: iam-user
filters:
'ReadWriteType': tconfig.get('type', 'All'),
'DataResources': [{
'Type': 'AWS::S3::Object',
'Values': ['arn:aws:s3:::']}]})
client.put_event_selectors(
TrailName=trail['Name'],
EventSelectors=events)
if added:
client.start_logging(Name=tconfig['name'])
resources[0]['c7n_data_trail'] = trail
@filters.register('shield-enabled')
class ShieldEnabled(Filter):
permissions = ('shield:DescribeSubscription',)
schema = type_schema(
'shield-enabled',
state={'type': 'boolean'})
def process(self, resources, event=None):
state = self.data.get('state', False)
client = local_session(self.manager.session_factory).client('shield')
try:
subscription = client.describe_subscription().get(
'Subscription', None)
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceNotFoundException':
raise
query._derserializer._deserialize = lambda target, data: \
original(target, self.fix_wrap_rest_response(data))
result_list = list(query)[0]
result_list = [{result_list.columns[i].name: v for i, v in enumerate(row)}
for row in result_list.rows]
for r in result_list:
if 'ResourceGroupName' in r:
r['ResourceId'] = scope + '/resourcegroups/' + r.pop('ResourceGroupName')
r['ResourceId'] = r['ResourceId'].lower()
return result_list
class ParentFilter(Filter):
"""
Meta filter that allows you to filter child resources by applying filters to their
parent resources.
You can use any filter supported by corresponding parent resource type.
:examples:
Find Azure KeyVault Keys from Key Vaults with ``owner:ProjectA`` tag.
.. code-block:: yaml
policies:
- name: kv-keys-from-tagged-keyvaults
resource: azure.keyvault-key
filters:
class resource_type(object):
service = 'iam'
type = 'server-certificate'
enum_spec = ('list_server_certificates',
'ServerCertificateMetadataList',
None)
id = 'ServerCertificateId'
filter_name = None
name = 'ServerCertificateName'
date = 'Expiration'
dimension = None
# Denotes this resource type exists across regions
global_resource = True
class IamRoleUsage(Filter):
def get_permissions(self):
perms = list(itertools.chain([
self.manager.get_resource_manager(m).get_permissions()
for m in ['lambda', 'launch-config', 'ec2']]))
perms.extend(['ecs:DescribeClusters', 'ecs:DescribeServices'])
return perms
def service_role_usage(self):
results = set()
results.update(self.scan_lambda_roles())
results.update(self.scan_ecs_roles())
results.update(self.collect_profile_roles())
return results
def instance_profile_usage(self):
'Status': attrs.get('status'),
'Notifications': [{'Arn': a} for a in attrs.get('topics', ())]})
modified = []
for r in resources:
for k, v in attrs.items():
if k not in r or r[k] != v:
modified.append(r)
self.log.debug("Updating %d of %d ops items", len(modified), len(resources))
client = local_session(self.manager.session_factory).client('ssm')
for m in modified:
client.update_ops_item(OpsItemId=m['OpsItemId'], **attrs)
class OpsItemFilter(Filter):
"""Filter resources associated to extant OpsCenter operational items.
:example:
Find ec2 instances with open ops items.
.. code-block:: yaml
policies:
- name: ec2-instances-ops-items
resource: ec2
filters:
- type: ops-item
# we can filter on source, title, priority
priority: [1, 2]
"""