Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
header_parameters['Content-Type'] = 'application/xml; charset=utf-8'
header_parameters['x-ms-version'] = self._serialize.header("self._config.version", self._config.version, 'str')
if request_id is not None:
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id", request_id, 'str')
# Construct body
body_content = self._serialize.body(storage_service_properties, 'StorageServiceProperties')
# Construct and send request
request = self._client.put(url, query_parameters, header_parameters, body_content)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
set_properties.metadata = {'url': '/'}
header_parameters['If-Modified-Since'] = self._serialize.header("if_modified_since", if_modified_since, 'rfc-1123')
if if_unmodified_since is not None:
header_parameters['If-Unmodified-Since'] = self._serialize.header("if_unmodified_since", if_unmodified_since, 'rfc-1123')
if if_match is not None:
header_parameters['If-Match'] = self._serialize.header("if_match", if_match, 'str')
if if_none_match is not None:
header_parameters['If-None-Match'] = self._serialize.header("if_none_match", if_none_match, 'str')
# Construct and send request
request = self._client.put(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-lease-id': self._deserialize('str', response.headers.get('x-ms-lease-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
change_lease.metadata = {'url': '/{containerName}/{blob}'}
# Construct and send request
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-sku-name': self._deserialize(models.SkuName, response.headers.get('x-ms-sku-name')),
'x-ms-account-kind': self._deserialize(models.AccountKind, response.headers.get('x-ms-account-kind')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
get_account_info.metadata = {'url': '/'}
# Construct and send request
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'x-ms-meta': self._deserialize('{str}', response.headers.get('x-ms-meta')),
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-lease-duration': self._deserialize(models.LeaseDurationType, response.headers.get('x-ms-lease-duration')),
'x-ms-lease-state': self._deserialize(models.LeaseStateType, response.headers.get('x-ms-lease-state')),
'x-ms-lease-status': self._deserialize(models.LeaseStatusType, response.headers.get('x-ms-lease-status')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-blob-public-access': self._deserialize('str', response.headers.get('x-ms-blob-public-access')),
'x-ms-has-immutability-policy': self._deserialize('bool', response.headers.get('x-ms-has-immutability-policy')),
'x-ms-has-legal-hold': self._deserialize('bool', response.headers.get('x-ms-has-legal-hold')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
get_properties.metadata = {'url': '/{containerName}'}
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-copy-id': self._deserialize('str', response.headers.get('x-ms-copy-id')),
'x-ms-copy-status': self._deserialize(models.CopyStatusType, response.headers.get('x-ms-copy-status')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
start_copy_from_url.metadata = {'url': '/{containerName}/{blob}'}
if metadata is not None:
header_parameters['x-ms-meta'] = self._serialize.header("metadata", metadata, 'str')
if access is not None:
header_parameters['x-ms-blob-public-access'] = self._serialize.header("access", access, 'str')
header_parameters['x-ms-version'] = self._serialize.header("self._config.version", self._config.version, 'str')
if request_id is not None:
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id", request_id, 'str')
# Construct and send request
request = self._client.put(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
create.metadata = {'url': '/{containerName}'}
# Construct and send request
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'x-ms-meta': self._deserialize('{str}', response.headers.get('x-ms-meta')),
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-lease-duration': self._deserialize(models.LeaseDurationType, response.headers.get('x-ms-lease-duration')),
'x-ms-lease-state': self._deserialize(models.LeaseStateType, response.headers.get('x-ms-lease-state')),
'x-ms-lease-status': self._deserialize(models.LeaseStatusType, response.headers.get('x-ms-lease-status')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-blob-public-access': self._deserialize('str', response.headers.get('x-ms-blob-public-access')),
'x-ms-has-immutability-policy': self._deserialize('bool', response.headers.get('x-ms-has-immutability-policy')),
'x-ms-has-legal-hold': self._deserialize('bool', response.headers.get('x-ms-has-legal-hold')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
get_properties.metadata = {'url': '/{containerName}'}
header_parameters['If-Modified-Since'] = self._serialize.header("if_modified_since", if_modified_since, 'rfc-1123')
if if_unmodified_since is not None:
header_parameters['If-Unmodified-Since'] = self._serialize.header("if_unmodified_since", if_unmodified_since, 'rfc-1123')
if if_match is not None:
header_parameters['If-Match'] = self._serialize.header("if_match", if_match, 'str')
if if_none_match is not None:
header_parameters['If-None-Match'] = self._serialize.header("if_none_match", if_none_match, 'str')
# Construct and send request
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 206]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
header_dict = {}
deserialized = None
if response.status_code == 200:
deserialized = response.stream_download()
header_dict = {
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-meta': self._deserialize('{str}', response.headers.get('x-ms-meta')),
'Content-Length': self._deserialize('long', response.headers.get('Content-Length')),
'Content-Type': self._deserialize('str', response.headers.get('Content-Type')),
'Content-Range': self._deserialize('str', response.headers.get('Content-Range')),
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Content-MD5': self._deserialize('bytearray', response.headers.get('Content-MD5')),
'Content-Encoding': self._deserialize('str', response.headers.get('Content-Encoding')),
'Cache-Control': self._deserialize('str', response.headers.get('Cache-Control')),
'Content-Disposition': self._deserialize('str', response.headers.get('Content-Disposition')),