Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"QueueUrl"
]
self.addCleanup(client.delete_queue, QueueUrl=reduce_queue)
with SQSExecutor(session_factory, map_queue, reduce_queue) as w:
w.op_sequence_start = 699723
w.op_sequence = 699723
# Submit work
futures = []
for i in range(10):
futures.append(w.submit(int_processor, i))
# Manually process and send results
messages = MessageIterator(client, map_queue, limit=10)
for m in messages:
d = utils.loads(m["Body"])
self.assertEqual(
m["MessageAttributes"]["op"]["StringValue"],
"tests.test_sqsexec:int_processor",
)
client.send_message(
QueueUrl=reduce_queue,
MessageBody=utils.dumps([d["args"], int_processor(*d["args"])]),
MessageAttributes=m["MessageAttributes"],
)
w.gather()
results = [
json.loads(r.result()["Body"]) for r in list(as_completed(futures))
]
self.assertEqual(list(sorted(results))[-1], [[9], 18])
def _get_storage_management_client_api_string(self):
return local_session(Session)\
.client('azure.mgmt.storage.StorageManagementClient')\
.DEFAULT_API_VERSION.replace("-", "_")
'actions': [
{
'type': 'save-throughput-state',
'state-tag': 'test-store-throughput'
}
]
})
collections = p.run()
self.assertEqual(len(collections), 1)
account_name = collections[0]['c7n:parent']['name']
self.sleep_in_live_mode()
client = local_session(Session).client(
'azure.mgmt.cosmosdb.CosmosDBManagementClient')
cosmos_account = client.database_accounts.get('test_cosmosdb', account_name)
self.assertTrue('test-store-throughput' in cosmos_account.tags)
tag_value = cosmos_account.tags['test-store-throughput']
expected_throughput = collections[0]['c7n:offer']['content']['offerThroughput']
expected_scaled_throughput = int(expected_throughput / THROUGHPUT_MULTIPLIER)
expected_tag_value = '{}:{}'.format(collections[0]['_rid'], expected_scaled_throughput)
self.assertEqual(expected_tag_value, tag_value)
"filters": [
{"type": "value", "key": k, "value": "allow-all", "op": "contains"}
],
"actions": [
{"type": "set-protocols", "ViewerProtocolPolicy": "https-only"}
],
},
session_factory=factory,
)
resources = p.run()
self.assertEqual(len(resources), 1)
expr = jmespath.compile(k)
r = expr.search(resources[0])
self.assertTrue("allow-all" in r)
client = local_session(factory).client("cloudfront")
resp = client.list_distributions()
self.assertEqual(
resp["DistributionList"]["Items"][0]["DefaultCacheBehavior"][
"ViewerProtocolPolicy"
],
"https-only",
)
def test_azure_function_event_mode_incorrect_event_type(self):
with self.sign_out_patch():
with self.assertRaises(PolicyValidationError):
self.load_policy({
'name': 'test-azure-serverless-mode',
'resource': 'azure.vm',
'mode': {
'type': FUNCTION_EVENT_TRIGGER_MODE,
'events': [
'CosmosDbWrite',
]
}
}, validate=True)
def test_filter_validation_no_blacklist(self):
self.assertRaises(
PolicyValidationError,
self.load_policy,
{
"name": "test-ssl-ciphers",
"resource": "elb",
"filters": [{"type": "ssl-policy"}],
},
session_factory=None,
validate=False,
)
def test_extra_keys(self):
p = StructureParser()
with self.assertRaises(PolicyValidationError) as ecm:
p.validate({'accounts': []})
self.assertTrue(str(ecm.exception).startswith('Policy files top level keys'))
def test_error_unregistered_action_type(self):
self.assertRaises(
PolicyValidationError, ActionRegistry("test.actions").factory, "foo", None
)
def xtest_policy_run(self):
manager.resources.register("dummy", DummyResource)
self.addCleanup(manager.resources.unregister, "dummy")
self.output_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.output_dir)
collection = self.load_policy_set(
{"policies": [{"name": "process-instances", "resource": "dummy"}]},
{"output_dir": self.output_dir},
)
p = collection.policies[0]
p()
self.assertEqual(len(p.ctx.metrics.data), 3)
def test_format_event(self):
event = {"message": "This is a test", "timestamp": 1234567891011}
event_json = (
'{\n "timestamp": 1234567891011, \n' ' "message": "This is a test"\n}'
)
self.assertEqual(json.loads(utils.format_event(event)), json.loads(event_json))