Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parms['type'] = rptype
if(readonly == "true"):
parms['read_only'] = readonly
body = json.dumps(parms)
else:
parms = {
'name': snaplabel
}
if(readonly == "true"):
parms['read_only'] = readonly
body = json.dumps(parms)
# REST api call
(s, h) = common.service_json_request(
self.__ipAddr, self.__port,
"POST",
Snapshot.URI_SNAPSHOT_LIST.format(otype, typename, ouri), body)
o = common.json_decode(s)
task = None
if(otype == Snapshot.BLOCK):
task = o["task"][0]
else:
task = o
if(sync):
return (
self.block_until_complete(
otype,
task['resource']['id'],
def login_failed_ip_list(self):
(s, h) = common.service_json_request(self.__ipAddr,
self.__port, "GET", LoginFailedIP.URI_LOGIN_FAILED_IPS, None)
o = common.json_decode(s)
if(o):
return o
return None
def s3_bucket_list(namespace, projectname, uid, secret):
_headers = dict()
_headers['x-emc-namespace'] = namespace
_headers['Date'] = formatdate()
#use projectname if the api supports it
#check this below line
_headers = common.s3_hmac_base64_sig('GET', None, None , uid, secret , 'application/json' , _headers, None )
(s, h) = common.service_json_request(self.__ipAddr, S3_PORT , "GET",
self.URI_S3_SERVICE_BASE , None, None, False ,
'application/json', None, _headers)
o = common.json_decode(s)
return o
def get_shares_by_uri(self, uri):
'''
Get a snapshot shares based on snapshot uri
Parameters:
uri: uri of snapshot
'''
(s, h) = common.service_json_request(
self.__ipAddr, self.__port,
"GET",
Snapshot.URI_FILE_SNAPSHOT_SHARES.format(uri),
None)
if(not s):
return None
else:
return common.json_decode(s)
def delete(self, host_uri, detach_storage=False):
'''
Makes a REST API call to delete a storage system by its UUID
'''
uri = Host.URI_HOST_DEACTIVATE.format(host_uri)
if(detach_storage):
uri = uri + "?detach-storage=true"
(s, h) = common.service_json_request(
self.__ipAddr, self.__port, "POST",
uri,
None)
return
def list_all(self, tenant):
restapi = self.URI_COMPUTE_HOST
tenant_obj = Tenant(self.__ipAddr, self.__port)
if(tenant is None):
tenant_uri = tenant_obj.tenant_getid()
else:
tenant_uri = tenant_obj.tenant_query(tenant)
restapi = restapi + "?tenant=" + tenant_uri
(s, h) = common.service_json_request(
self.__ipAddr, self.__port,
"GET",
restapi,
None)
o = common.json_decode(s)
return o['host']
is_storageport_exist = False
else:
raise e
if(is_storageport_exist):
raise SOSError(SOSError.ENTRY_ALREADY_EXISTS_ERR,
"storageport with name " + port_name +
" already exists ")
body = json.dumps(
{'name': port_name,
'port_network_id': port_id,
'transport_type': transport_type
})
common.service_json_request(self.__ipAddr, self.__port,
"POST",
Storageport.URI_STORAGEPORT_LIST.format(storage_system_uri),
body)
def show_by_uri(self, uri, xml=False):
(s, h) = common.service_json_request(
self.__ipAddr, self.__port,
'GET', StorageProvider.URI_STORAGEPROVIDER_DETAILS.format(uri),
None, None)
o = common.json_decode(s)
if(not o["inactive"]):
if(xml):
(s, h) = common.service_json_request(
self.__ipAddr, self.__port, 'GET',
StorageProvider.URI_STORAGEPROVIDER_DETAILS.format(uri),
None, None, xml)
return s
else:
return o
return None
def computevpool_list(self):
(s, h) = common.service_json_request(
self.__ipAddr, self.__port,
"GET",
ComputeVpool.URI_COMPUTE_VPOOL,
None)
o = common.json_decode(s)
return o['computevirtualpool']
def delete_computesystem(self, computesystem):
uri = self.query_computesystem(computesystem)
(s, h) = common.service_json_request(self.__ipAddr,
self.__port, "POST",
ComputeSystem.URI_COMPUTE_SYSTEM_DELETE.format(uri),
None)
return common.json_decode(s)