Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
self.download_size = self.file_size
except HttpResponseError as error:
if self.offset is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = self.service.download(
validate_content=self.validate_content,
data_stream_total=0,
download_stream_current=0,
**self.request_options
)
except HttpResponseError as error:
process_storage_error(error)
# Set the download size to empty
self.download_size = 0
self.file_size = 0
else:
process_storage_error(error)
# If the file is small, the download is complete at this point.
# If file size is large, download the rest of the file in chunks.
if response.properties.size != self.download_size:
# Lock on the etag. This can be overriden by the user by specifying '*'
if self.request_options.get("modified_access_conditions"):
if not self.request_options["modified_access_conditions"].if_match:
self.request_options["modified_access_conditions"].if_match = response.properties.etag
else:
key_value = self._impl.put_lock(
key=configuration_setting.key,
label=configuration_setting.label,
error_map=error_map,
**kwargs
)
else:
key_value = self._impl.delete_lock(
key=configuration_setting.key,
label=configuration_setting.label,
error_map=error_map,
**kwargs
)
return ConfigurationSetting._from_key_value(key_value)
except ErrorException as error:
raise HttpResponseError(message=error.message, response=error.response)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request)
response = pipeline_response.http_response
error_map = kwargs.pop('error_map', None)
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return response
bank_certificate.name, updated_certificate.properties.updated_on
)
)
print(
"Certificate with name '{0}' was updated with tags '{1}'".format(
bank_certificate.name, updated_certificate.properties.tags
)
)
# The bank account was closed, need to delete its credentials from the Key Vault.
print("\n.. Delete Certificate")
deleted_certificate = client.delete_certificate(name=bank_certificate.name)
print("Deleting Certificate..")
print("Certificate with name '{0}' was deleted.".format(deleted_certificate.name))
except HttpResponseError as e:
print("\nrun_sample has caught an error. {0}".format(e.message))
finally:
print("\nrun_sample done")
def process_content(blob, start_offset, end_offset, require_encryption, key_encryption_key, key_resolver_function):
if key_encryption_key is not None or key_resolver_function is not None:
try:
return _decrypt_blob(
require_encryption,
key_encryption_key,
key_resolver_function,
blob,
start_offset,
end_offset)
except Exception as error:
raise HttpResponseError(
message="Decryption failed.",
response=blob.response,
error=error)
else:
return b"".join(list(blob))
async def _get_key(self, **kwargs: "Any") -> "Optional[KeyVaultKey]":
"""
Get the client's :class:`~azure.keyvault.keys.KeyVaultKey`.
Can be `None`, if the client lacks keys/get permission.
:rtype: :class:`~azure.keyvault.keys.KeyVaultKey` or None
"""
if not (self._key or self._keys_get_forbidden):
try:
self._key = await self._client.get_key(
self._key_id.vault_endpoint, self._key_id.name, self._key_id.version, **kwargs
)
self._allowed_ops = frozenset(self._key.key_operations)
except HttpResponseError as ex:
# if we got a 403, we don't have keys/get permission and won't try to get the key again
# (other errors may be transient)
self._keys_get_forbidden = ex.status_code == 403
return self._key
header_parameters['Sync-Token'] = self._serialize.header("self._config.sync_token", self._config.sync_token, 'str')
if accept_datetime is not None:
header_parameters['Accept-Datetime'] = self._serialize.header("accept_datetime", accept_datetime, 'str')
if if_match is not None:
header_parameters['If-Match'] = self._serialize.header("if_match", if_match, 'str')
if if_none_match is not None:
header_parameters['If-None-Match'] = self._serialize.header("if_none_match", if_none_match, 'str')
# Construct and send request
request = self._client.head(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
response_headers = {
'Sync-Token': self._deserialize('str', response.headers.get('Sync-Token')),
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('str', response.headers.get('Last-Modified')),
}
return cls(response, None, response_headers)
check_key_value.metadata = {'url': '/kv/{key}'}
if data is None:
raise ValueError("Response cannot be None.")
content = b"".join(list(data))
if content and encryption.get("key") is not None or encryption.get("resolver") is not None:
try:
return decrypt_blob(
encryption.get("required"),
encryption.get("key"),
encryption.get("resolver"),
content,
start_offset,
end_offset,
data.response.headers,
)
except Exception as error:
raise HttpResponseError(message="Decryption failed.", response=data.response, error=error)
return content
print("Backup created for secret with name '{0}'.".format(secret.name))
# The storage account secret is no longer in use, so you delete it.
client.delete_secret(secret.name)
# To ensure secret is deleted on the server side.
print("\nDeleting secret...")
time.sleep(20)
print("Deleted Secret with name '{0}'".format(secret.name))
# In future, if the secret is required again, we can use the backup value to restore it in the Key Vault.
print("\n3. Restore the secret using the backed up secret bytes")
secret = client.restore_secret(secret_backup)
print("Restored Secret with name '{0}'".format(secret.name))
except HttpResponseError as e:
print("\nrun_sample has caught an error. {0}".format(e.message))
finally:
print("\nrun_sample done")
if data is None:
raise ValueError("Response cannot be None.")
content = b"".join(list(data))
if content and encryption.get("key") is not None or encryption.get("resolver") is not None:
try:
return decrypt_blob(
encryption.get("required"),
encryption.get("key"),
encryption.get("resolver"),
content,
start_offset,
end_offset,
data.response.headers,
)
except Exception as error:
raise HttpResponseError(message="Decryption failed.", response=data.response, error=error)
return content