Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ecs(self):
# init client
client = AcsClient(
os.environ['ACCESS_KEY_ID'],
os.environ['ACCESS_KEY_SECRET'],
"cn-hangzhou"
)
mylogger.info("Init client success")
# get demo instance attributes
image_id, security_group_id = TestEcsIntegration.get_demo_ecs_attributes(client)
# create
instance_id = TestEcsIntegration.create_instance(client, image_id, security_group_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
# start
TestEcsIntegration.start_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Running')
# stop
TestEcsIntegration.stop_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
os.environ['ACCESS_KEY_SECRET'],
"cn-hangzhou"
)
mylogger.info("Init client success")
# get demo instance attributes
image_id, security_group_id = TestEcsIntegration.get_demo_ecs_attributes(client)
# create
instance_id = TestEcsIntegration.create_instance(client, image_id, security_group_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
# start
TestEcsIntegration.start_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Running')
# stop
TestEcsIntegration.stop_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
# delete
TestEcsIntegration.delete_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Deleted')
image_id, security_group_id = TestEcsIntegration.get_demo_ecs_attributes(client)
# create
instance_id = TestEcsIntegration.create_instance(client, image_id, security_group_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
# start
TestEcsIntegration.start_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Running')
# stop
TestEcsIntegration.stop_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
# delete
TestEcsIntegration.delete_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Deleted')
def test_ecs(self):
# init client
client = AcsClient(
os.environ['ACCESS_KEY_ID'],
os.environ['ACCESS_KEY_SECRET'],
"cn-hangzhou"
)
mylogger.info("Init client success")
# get demo instance attributes
image_id, security_group_id = TestEcsIntegration.get_demo_ecs_attributes(client)
# create
instance_id = TestEcsIntegration.create_instance(client, image_id, security_group_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
# start
TestEcsIntegration.start_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Running')
# stop
TestEcsIntegration.stop_instance(client, instance_id)
# init client
client = AcsClient(
os.environ['ACCESS_KEY_ID'],
os.environ['ACCESS_KEY_SECRET'],
"cn-hangzhou"
)
mylogger.info("Init client success")
# get demo instance attributes
image_id, security_group_id = TestEcsIntegration.get_demo_ecs_attributes(client)
# create
instance_id = TestEcsIntegration.create_instance(client, image_id, security_group_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
# start
TestEcsIntegration.start_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Running')
# stop
TestEcsIntegration.stop_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
# delete
TestEcsIntegration.delete_instance(client, instance_id)
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
# start
TestEcsIntegration.start_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Running')
# stop
TestEcsIntegration.stop_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Stopped')
# delete
TestEcsIntegration.delete_instance(client, instance_id)
# wait
TestEcsIntegration.wait_for_instance(client, instance_id, 'Deleted')
"""
Generate signed header
:param region_id: String
:param ak: String
:param secret: String
:return: Dict
"""
sign_params = self._get_sign_params()
if self.get_content() is not None:
md5_str = md5_tool.get_md5_base64_str(self.get_content())
self.add_header('Content-MD5', md5_str)
if 'RegionId' not in sign_params.keys():
sign_params['RegionId'] = region_id
self.add_header('x-acs-region-id', region_id)
signed_headers = roa_signer.get_signature_headers(
sign_params,
ak,
secret,
self.get_accept_format(),
self.get_headers(),
self.get_uri_pattern(),
self.get_path_params(),
self.get_method())
return signed_headers
def get_url(self, region_id, ak=None, secret=None):
"""
Compose request url without domain
:param region_id: String
:return: String
"""
sign_params = self.get_query_params()
# if region_id not in sign_params.keys():
# sign_params['RegionId'] = region_id
url = roa_signer.get_url(
self.get_uri_pattern(),
sign_params,
self.get_path_params())
return url
def __build_query_string(uri, queries):
sorted_map = sorted(list(queries.items()), key=lambda queries: queries[0])
if len(sorted_map) > 0:
uri += "?"
for (k, v) in sorted_map:
uri += k
if v is not None:
uri += "="
uri += v
uri += roa_signature_composer.QUERY_SEPARATOR
if uri.find(roa_signature_composer.QUERY_SEPARATOR) >= 0:
uri = uri[0:(len(uri) - 1)]
return uri
def __build_query_string(uri, queries):
sorted_map = sorted(list(queries.items()), key=lambda queries: queries[0])
if len(sorted_map) > 0:
uri += "?"
for (k, v) in sorted_map:
uri += k
if v is not None:
uri += "="
uri += v
uri += roa_signature_composer.QUERY_SEPARATOR
if uri.find(roa_signature_composer.QUERY_SEPARATOR) >= 0:
uri = uri[0:(len(uri) - 1)]
return uri