Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
{
"resource": "s3",
"name": "s3-bucket-policy",
"mode": {"type": "cloudtrail", "events": ["CreateBucket"]},
"filters": [
{
"type": "missing-policy-statement",
"statement_ids": ["RequireEncryptedPutObject"],
}
],
"actions": ["no-op"],
},
Config.empty(),
)
pl = PolicyLambda(p)
mgr = LambdaManager(session_factory)
self.addCleanup(mgr.remove, pl)
result = mgr.publish(pl, "Dev", role=ROLE)
events = pl.get_events(session_factory)
self.assertEqual(len(events), 1)
event = events.pop()
self.assertEqual(
json.loads(event.render_event_pattern()),
{
u"detail": {
u"eventName": [u"CreateBucket"],
u"eventSource": [u"s3.amazonaws.com"],
},
u"detail-type": ["AWS API Call via CloudTrail"],
},
)
def test_config_rule_provision(self):
session_factory = self.replay_flight_data("test_config_rule")
p = Policy(
{
"resource": "security-group",
"name": "sg-modified",
"mode": {"type": "config-rule"},
},
Config.empty(),
)
pl = PolicyLambda(p)
mgr = LambdaManager(session_factory)
result = mgr.publish(pl, "Dev", role=ROLE)
self.assertEqual(result["FunctionName"], "custodian-sg-modified")
self.addCleanup(mgr.remove, pl)
session_factory = self.replay_flight_data(flight, zdata=True)
mode = {
"type": "config-rule", "role": "arn:aws:iam::644160558196:role/custodian-mu"
}
mode.update(extra)
p = Policy(
{
"resource": "s3",
"name": "hello-world",
"actions": ["no-op"],
"mode": mode,
},
Config.empty(),
)
pl = PolicyLambda(p)
mgr = LambdaManager(session_factory)
def cleanup():
mgr.remove(pl)
if self.recording:
time.sleep(60)
self.addCleanup(cleanup)
return mgr, mgr.publish(pl)
def test_empty(self):
assert LambdaManager.diff_tags({}, {}) == ({}, [])
lname = "custodian-test-log-sub"
self.addCleanup(client.delete_log_group, logGroupName=lname)
client.create_log_group(logGroupName=lname)
linfo = client.describe_log_groups(logGroupNamePrefix=lname)["logGroups"][0]
params = dict(
session_factory=session_factory,
name="c7n-log-sub",
role=ROLE,
sns_topic="arn:",
log_groups=[linfo],
)
func = logsub.get_function(**params)
manager = LambdaManager(session_factory)
finfo = manager.publish(func)
self.addCleanup(manager.remove, func)
results = client.describe_subscription_filters(logGroupName=lname)
self.assertEqual(len(results["subscriptionFilters"]), 1)
self.assertEqual(
results["subscriptionFilters"][0]["destinationArn"], finfo["FunctionArn"]
)
def get_metrics(self, start, end, period):
from c7n.mu import LambdaManager, PolicyLambda
manager = LambdaManager(self.policy.session_factory)
values = manager.metrics(
[PolicyLambda(self.policy)], start, end, period)[0]
values.update(
super(LambdaMode, self).get_metrics(start, end, period))
return values
def region_gc(options, region, policy_config, policies):
session_factory = SessionFactory(
region=region,
assume_role=policy_config.assume_role,
profile=policy_config.profile,
external_id=policy_config.external_id)
manager = mu.LambdaManager(session_factory)
funcs = list(manager.list_functions(options.prefix))
client = session_factory().client('lambda')
remove = []
current_policies = [p.name for p in policies]
pattern = re.compile(options.policy_regex)
for f in funcs:
if not pattern.match(f['FunctionName']):
continue
match = False
for pn in current_policies:
if f['FunctionName'].endswith(pn):
match = True
if options.present:
if match:
remove.append(f)
def provision(self):
# auto tag lambda policies with mode and version, we use the
# version in mugc to effect cleanups.
tags = self.policy.data['mode'].setdefault('tags', {})
tags['custodian-info'] = "mode=%s:version=%s" % (
self.policy.data['mode']['type'], version)
from c7n import mu
with self.policy.ctx:
self.policy.log.info(
"Provisioning policy lambda %s", self.policy.name)
try:
manager = mu.LambdaManager(self.policy.session_factory)
except ClientError:
# For cli usage by normal users, don't assume the role just use
# it for the lambda
manager = mu.LambdaManager(
lambda assume=False: self.policy.session_factory(assume))
return manager.publish(
mu.PolicyLambda(self.policy),
role=self.policy.options.assume_role)
def region_gc(options, region, policy_config, policies):
session_factory = SessionFactory(
region=region,
assume_role=policy_config.assume_role,
profile=policy_config.profile,
external_id=policy_config.external_id)
manager = mu.LambdaManager(session_factory)
funcs = list(manager.list_functions(options.prefix))
client = session_factory().client('lambda')
remove = []
current_policies = [p.name for p in policies]
pattern = re.compile(options.policy_regex)
for f in funcs:
if not pattern.match(f['FunctionName']):
continue
match = False
for pn in current_policies:
if f['FunctionName'].endswith(pn):
match = True
if options.present:
if match:
remove.append(f)