Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_object_life_cycle(self):
# Test object life cycle methods by using a volume.
label = "cb-objlifecycle-{0}".format(helpers.get_uuid())
test_vol = None
with cb_helpers.cleanup_action(lambda: test_vol.delete()):
test_vol = self.provider.storage.volumes.create(
label, 1,
helpers.get_provider_test_data(self.provider, "placement"))
# Waiting for an invalid timeout should raise an exception
with self.assertRaises(AssertionError):
test_vol.wait_for([VolumeState.ERROR], timeout=-1, interval=1)
with self.assertRaises(AssertionError):
test_vol.wait_for([VolumeState.ERROR], timeout=1, interval=-1)
# If interval < timeout, an exception should be raised
with self.assertRaises(AssertionError):
test_vol.wait_for([VolumeState.ERROR], timeout=10, interval=20)
test_vol.wait_till_ready()
# Hitting a terminal state should raise an exception
def cleanup_network(network):
"""
Delete the supplied network, first deleting any contained subnets.
"""
if network:
try:
for sn in network.subnets:
with cb_helpers.cleanup_action(lambda: cleanup_subnet(sn)):
pass
finally:
network.delete()
network.wait_for([NetworkState.UNKNOWN],
terminal_states=[NetworkState.ERROR])
def test_key_pair_properties(self):
name = 'cb-kpprops-{0}'.format(helpers.get_uuid())
kp = self.provider.security.key_pairs.create(name=name)
with cb_helpers.cleanup_action(lambda: kp.delete()):
self.assertIsNotNone(
kp.material,
"KeyPair material is empty but it should not be.")
# get the keypair again - keypair material should now be empty
kp = self.provider.security.key_pairs.get(kp.id)
self.assertIsNone(kp.material,
"Keypair material should now be empty")
def wrapper(self, *args, **kwargs):
op_func = stringToOperator(op)
if op_func(sys.version_info, (major, minor)):
self.skipTest(
"Skipping test because python version {0} is {1} expected"
" version {2}".format(sys.version_info[:2],
op, (major, minor)))
func(self, *args, **kwargs)
return wrapper
return wrap
TEST_DATA_CONFIG = {
"AWSCloudProvider": {
# Match the ami value with entry in custom_amis.json for use with moto
"image": cb_helpers.get_env('CB_IMAGE_AWS', 'ami-aa2ea6d0'),
"vm_type": cb_helpers.get_env('CB_VM_TYPE_AWS', 't2.nano'),
"placement": cb_helpers.get_env('CB_PLACEMENT_AWS', 'us-east-1a'),
},
'OpenStackCloudProvider': {
'image': cb_helpers.get_env('CB_IMAGE_OS',
'c66bdfa1-62b1-43be-8964-e9ce208ac6a5'),
"vm_type": cb_helpers.get_env('CB_VM_TYPE_OS', 'm1.tiny'),
"placement": cb_helpers.get_env('CB_PLACEMENT_OS', 'nova'),
},
'GCPCloudProvider': {
'image': cb_helpers.get_env(
'CB_IMAGE_GCP',
'https://www.googleapis.com/compute/v1/projects/ubuntu-os-cloud/'
'global/images/ubuntu-1710-artful-v20180126'),
'vm_type': cb_helpers.get_env('CB_VM_TYPE_GCP', 'f1-micro'),
'placement': cb_helpers.get_env('GCP_DEFAULT_ZONE', 'us-central1-a'),
def create_provider_instance(self):
provider_name = cb_helpers.get_env("CB_TEST_PROVIDER", "aws")
factory = CloudProviderFactory()
provider_class = factory.get_provider_class(provider_name)
config = {'default_wait_interval':
self.get_provider_wait_interval(provider_class),
'default_result_limit': 5}
return provider_class(config)
test_bucket = None
def create_bucket_obj(name):
obj = test_bucket.objects.create(name)
# TODO: This is wrong. We shouldn't have to have a separate
# call to upload some content before being able to delete
# the content. Maybe the create_object method should accept
# the file content as a parameter.
obj.upload("dummy content")
return obj
def cleanup_bucket_obj(bucket_obj):
if bucket_obj:
bucket_obj.delete()
with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
name = "cb-crudbucketobj-{0}".format(helpers.get_uuid())
test_bucket = self.provider.storage.buckets.create(name)
sit.check_crud(self, test_bucket.objects, BucketObject,
"cb-bucketobj", create_bucket_obj,
cleanup_bucket_obj, skip_name_check=True)
def cleanup_router(router, gateway):
with cb_helpers.cleanup_action(lambda: router.delete()):
with cb_helpers.cleanup_action(lambda: gateway.delete()):
router.detach_subnet(subnet)
router.detach_gateway(gateway)
@cb_helpers.deprecated_alias(old_param='new_param')
def custom_func(new_param=None, old_param=None):
param_values['new_param'] = new_param
param_values['old_param'] = old_param
def create(self, name, public_key_material=None):
OpenStackKeyPair.assert_valid_resource_name(name)
existing_kp = self.find(name=name)
if existing_kp:
raise DuplicateResourceException(
'Keypair already exists with name {0}'.format(name))
private_key = None
if not public_key_material:
public_key_material, private_key = cb_helpers.generate_key_pair()
kp = self.provider.nova.keypairs.create(name,
public_key=public_key_material)
cb_kp = OpenStackKeyPair(self.provider, kp)
cb_kp.material = private_key
return cb_kp