Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.assertTrue(results.is_truncated)
self.assertTrue(results.supports_total)
self.assertEqual(results.total_results, 4)
self.assertEqual(results.data, objects)
# A list with limit=2 and marker=2
results = ClientPagedResultList(self.provider, objects, 2, 2)
self.assertListEqual(results, list(itertools.islice(objects, 2, 4)))
self.assertEqual(results.marker, None)
self.assertFalse(results.is_truncated)
self.assertTrue(results.supports_total)
self.assertEqual(results.total_results, 4)
self.assertEqual(results.data, objects)
# A list with limit=2 and marker=3
results = ClientPagedResultList(self.provider, objects, 2, 3)
self.assertListEqual(results, list(itertools.islice(objects, 3, 4)))
self.assertFalse(results.is_truncated)
self.assertEqual(results.marker, None)
self.assertEqual(results.data, objects)
self.assertFalse(results.supports_server_paging, "Client paged result"
" lists should return False for server paging.")
def test_object_life_cycle(self):
# Test object life cycle methods by using a volume.
label = "cb-objlifecycle-{0}".format(helpers.get_uuid())
test_vol = None
with cb_helpers.cleanup_action(lambda: test_vol.delete()):
test_vol = self.provider.storage.volumes.create(
label, 1,
helpers.get_provider_test_data(self.provider, "placement"))
# Waiting for an invalid timeout should raise an exception
with self.assertRaises(AssertionError):
test_vol.wait_for([VolumeState.ERROR], timeout=-1, interval=1)
with self.assertRaises(AssertionError):
test_vol.wait_for([VolumeState.ERROR], timeout=1, interval=-1)
# If interval < timeout, an exception should be raised
with self.assertRaises(AssertionError):
test_vol.wait_for([VolumeState.ERROR], timeout=10, interval=20)
test_vol.wait_till_ready()
# Hitting a terminal state should raise an exception
def test_cleanup_action_body_and_cleanup_has_exception(self):
invoke_order = [""]
def cleanup_func():
invoke_order[0] += "cleanup"
raise Exception("test")
class CustomException(Exception):
pass
with self.assertRaises(CustomException):
with cb_helpers.cleanup_action(lambda: cleanup_func()):
invoke_order[0] += "body_"
raise CustomException()
self.assertEqual(invoke_order[0], "body_cleanup")
def test_upload_download_bucket_content(self):
name = "cbtestbucketobjs-{0}".format(helpers.get_uuid())
test_bucket = self.provider.storage.buckets.create(name)
with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
obj_name = "hello_upload_download.txt"
obj = test_bucket.objects.create(obj_name)
with cb_helpers.cleanup_action(lambda: obj.delete()):
content = b"Hello World. Here's some content."
# TODO: Upload and download methods accept different parameter
# types. Need to make this consistent - possibly provider
# multiple methods like upload_from_file, from_stream etc.
obj.upload(content)
target_stream = BytesIO()
obj.save_content(target_stream)
self.assertEqual(target_stream.getvalue(), content)
target_stream2 = BytesIO()
for data in obj.iter_content():
target_stream2.write(data)
self.assertEqual(target_stream2.getvalue(), content)
temp_dir = tempfile.gettempdir()
file_name = '6GigTest.tmp'
six_gig_file = os.path.join(temp_dir, file_name)
with open(six_gig_file, "wb") as out:
out.truncate(6 * 1024 * 1024 * 1024) # 6 Gig...
with cb_helpers.cleanup_action(lambda: os.remove(six_gig_file)):
download_file = "{0}/cbtestfile-{1}".format(temp_dir, file_name)
bucket_name = "cbtestbucketlargeobjs-{0}".format(
helpers.get_uuid())
test_bucket = self.provider.storage.buckets.create(bucket_name)
with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
test_obj = test_bucket.objects.create(file_name)
with cb_helpers.cleanup_action(lambda: test_obj.delete()):
file_uploaded = test_obj.upload_from_file(six_gig_file)
self.assertTrue(file_uploaded, "Could not upload object?")
with cb_helpers.cleanup_action(
lambda: os.remove(download_file)):
with open(download_file, 'wb') as f:
test_obj.save_content(f)
self.assertTrue(
filecmp.cmp(six_gig_file, download_file),
"Uploaded file != downloaded")
def test_cleanup_action_body_has_exception(self):
invoke_order = [""]
def cleanup_func():
invoke_order[0] += "cleanup"
class CustomException(Exception):
pass
with self.assertRaises(CustomException):
with cb_helpers.cleanup_action(lambda: cleanup_func()):
invoke_order[0] += "body_"
raise CustomException()
self.assertEqual(invoke_order[0], "body_cleanup")
def cleanup_network(network):
"""
Delete the supplied network, first deleting any contained subnets.
"""
if network:
try:
for sn in network.subnets:
with cb_helpers.cleanup_action(lambda: cleanup_subnet(sn)):
pass
finally:
network.delete()
network.wait_for([NetworkState.UNKNOWN],
terminal_states=[NetworkState.ERROR])
test_vol = self.provider.storage.volumes.create(
label, 1,
helpers.get_provider_test_data(self.provider,
"placement"))
with cb_helpers.cleanup_action(lambda: test_vol.delete()):
test_vol.wait_till_ready()
test_snap = test_vol.create_snapshot(label=label,
description=label)
def cleanup_snap(snap):
if snap:
snap.delete()
snap.wait_for([SnapshotState.UNKNOWN],
terminal_states=[SnapshotState.ERROR])
with cb_helpers.cleanup_action(lambda: cleanup_snap(test_snap)):
test_snap.wait_till_ready()
lc = self.provider.compute.instances.create_launch_config()
# Add a new blank volume
lc.add_volume_device(size=1, delete_on_terminate=True)
# Attach an existing volume
lc.add_volume_device(size=1, source=test_vol,
delete_on_terminate=True)
# Add a new volume based on a snapshot
lc.add_volume_device(size=1, source=test_snap,
delete_on_terminate=True)
# Override root volume size
def test_key_pair_properties(self):
name = 'cb-kpprops-{0}'.format(helpers.get_uuid())
kp = self.provider.security.key_pairs.create(name=name)
with cb_helpers.cleanup_action(lambda: kp.delete()):
self.assertIsNotNone(
kp.material,
"KeyPair material is empty but it should not be.")
# get the keypair again - keypair material should now be empty
kp = self.provider.security.key_pairs.get(kp.id)
self.assertIsNone(kp.material,
"Keypair material should now be empty")
def test_floating_ip_properties(self):
# Check floating IP address
gw = helpers.get_test_gateway(
self.provider)
fip = gw.floating_ips.create()
with cb_helpers.cleanup_action(
lambda: helpers.cleanup_gateway(gw)):
with cb_helpers.cleanup_action(lambda: fip.delete()):
fipl = list(gw.floating_ips)
self.assertIn(fip, fipl)
# 2016-08: address filtering not implemented in moto
# empty_ipl = self.provider.network.floating_ips('dummy-net')
# self.assertFalse(
# empty_ipl,
# "Bogus network should not have any floating IPs: {0}"
# .format(empty_ipl))
self.assertFalse(
fip.private_ip,
"Floating IP should not have a private IP value ({0})."
.format(fip.private_ip))
self.assertFalse(
fip.in_use,