Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
A dict with name_value pairs to associate with the
container as metadata. Example:{'Category':'test'}
:type metadata: dict(str, str)
:param ~azure.storage.blob.models.PublicAccess public_access:
Possible values include: container, blob.
:param int timeout:
The timeout parameter is expressed in seconds.
:rtype: None
"""
headers = kwargs.pop('headers', {})
headers.update(add_metadata_headers(metadata))
try:
return self._client.container.create(
timeout=timeout,
access=public_access,
cls=return_response_headers,
headers=headers,
error_map=basic_error_map(),
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
**kwargs
):
# type: (...) -> Dict[str, Union[str, datetime]]
"""
:returns: Container-updated property dict (Etag and last modified).
"""
headers = kwargs.pop('headers', {})
headers.update(add_metadata_headers(metadata))
access_conditions = get_access_conditions(lease)
mod_conditions = get_modification_conditions(if_modified_since)
try:
return self._client.container.set_metadata(
timeout=timeout,
lease_access_conditions=access_conditions,
modified_access_conditions=mod_conditions,
cls=return_response_headers,
headers=headers,
error_map=basic_error_map(),
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
def renew(
self, if_modified_since=None, # type: Optional[datetime]
if_unmodified_since=None, # type: Optional[datetime]
if_match=None, # type: Optional[str]
if_none_match=None, # type: Optional[str]
timeout=None, # type: Optional[int]
**kwargs
):
# type: (...) -> None
mod_conditions = get_modification_conditions(
if_modified_since, if_unmodified_since, if_match, if_none_match)
response = self._client.renew_lease(
lease_id=self.id,
timeout=timeout,
modified_access_conditions=mod_conditions,
cls=return_response_headers,
**kwargs)
self.etag = response.get('ETag') # type: str
self.id = response.get('x-ms-lease-id') # type: str
self.last_modified = response.get('Last-Modified') # type: datetime
def _upload_chunk(self, chunk_start, chunk_data):
# avoid uploading the empty pages
if not self._is_chunk_empty(chunk_data):
chunk_end = chunk_start + len(chunk_data) - 1
content_range = 'bytes={0}-{1}'.format(chunk_start, chunk_end)
computed_md5 = None
self.response_headers = self.blob_service.upload_pages(
chunk_data,
content_length=len(chunk_data),
transactional_content_md5=computed_md5,
timeout=self.timeout,
range=content_range,
lease_access_conditions=self.lease_access_conditions,
modified_access_conditions=self.modified_access_conditions,
validate_content=self.validate_content,
cls=return_response_headers,
**self.request_options
)
if not self.parallel:
self.modified_access_conditions = get_modification_conditions(
if_match=self.response_headers['ETag'])
return None
def get_account_information(self, timeout=None):
# type: (Optional[int]) -> Dict[str, str]
"""
:returns: A dict of account information (SKU and account type).
"""
try:
response = self._client.service.get_account_info(cls=return_response_headers)
return {
'SKU': response.get('x-ms-sku-name'),
'AccountType': response.get('x-ms-account-kind')
}
except StorageErrorException as error:
process_storage_error(error)
if_modified_since=None, # type: Optional[datetime]
if_unmodified_since=None, # type: Optional[datetime]
if_match=None, # type: Optional[str]
if_none_match=None, # type: Optional[str]
timeout=None, # type: Optional[int]
**kwargs
):
# type: (...) -> None
mod_conditions = get_modification_conditions(
if_modified_since, if_unmodified_since, if_match, if_none_match)
response = self._client.change_lease(
lease_id=self.id,
proposed_lease_id=proposed_lease_id,
timeout=timeout,
modified_access_conditions=mod_conditions,
cls=return_response_headers,
**kwargs)
self.etag = response.get('ETag') # type: str
self.id = response.get('x-ms-lease-id') # type: str
self.last_modified = response.get('Last-Modified') # type: datetime
def get_account_information(self, timeout=None):
# type: (Optional[int]) -> Dict[str, str]
"""
Gets information related to the storage account.
The information can also be retrieved if the user has a SAS to a container or blob.
:returns: A dict of account information (SKU and account type).
:rtype: dict(str, str)
"""
try:
response = self._client.service.get_account_info(cls=return_response_headers)
except StorageErrorException as error:
process_storage_error(error)
return {
'SKU': response.get('x-ms-sku-name'),
'AccountType': response.get('x-ms-account-kind')
}
:returns: Blob-updated property dict (Etag and last modified).
"""
if self.blob_type != BlobType.PageBlob:
raise TypeError("This operation is only available for PageBlob type blobs.")
access_conditions = get_access_conditions(lease)
mod_conditions = get_modification_conditions(
if_modified_since, if_unmodified_since, if_match, if_none_match)
if content_length is None:
raise ValueError("A content length must be specified for a Page Blob.")
try:
return self._client.page_blob.resize(
blob_content_length=content_length,
timeout=timeout,
lease_access_conditions=access_conditions,
modified_access_conditions=mod_conditions,
cls=return_response_headers,
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
def _upload_chunk(self, chunk_offset, chunk_data):
if not hasattr(self, 'current_length'):
self.response_headers = self.blob_service.append_block(
chunk_data,
content_length=len(chunk_data),
timeout=self.timeout,
lease_access_conditions=self.lease_access_conditions,
modified_access_conditions=self.modified_access_conditions,
validate_content=self.validate_content,
append_position_access_conditions=self.append_conditions,
cls=return_response_headers,
**self.request_options
)
self.current_length = int(self.response_headers['x-ms-blob-append-offset'])
else:
self.append_conditions.append_position = self.current_length + chunk_offset
self.response_headers = self.blob_service.append_block(
chunk_data,
content_length=len(chunk_data),
timeout=self.timeout,
lease_access_conditions=self.lease_access_conditions,
modified_access_conditions=self.modified_access_conditions,
validate_content=self.validate_content,
append_position_access_conditions=self.append_conditions,
cls=return_response_headers,
**self.request_options
)