Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _delete_bucket(self, b):
for attempt in retry_s3():
with attempt:
try:
for upload in b.list_multipart_uploads():
upload.cancel_upload() # TODO: upgrade this portion to boto3
bucket = s3_boto3_resource.Bucket(compat_bytes(b.name))
bucket.objects.all().delete()
bucket.object_versions.delete()
bucket.delete()
except s3_boto3_resource.meta.client.exceptions.NoSuchBucket:
pass
except S3ResponseError as e:
if e.error_code != 'NoSuchBucket':
raise
"""
assert self.minBucketNameLen <= len(bucket_name) <= self.maxBucketNameLen
assert self.bucketNameRe.match(bucket_name)
log.debug("Binding to job store bucket '%s'.", bucket_name)
def bucket_creation_pending(e):
# https://github.com/BD2KGenomics/toil/issues/955
# https://github.com/BD2KGenomics/toil/issues/995
# https://github.com/BD2KGenomics/toil/issues/1093
return (isinstance(e, (S3CreateError, S3ResponseError))
and e.error_code in ('BucketAlreadyOwnedByYou',
'OperationAborted',
'NoSuchBucket'))
bucketExisted = True
for attempt in retry_s3(predicate=bucket_creation_pending):
with attempt:
try:
bucket = self.s3.get_bucket(bucket_name, validate=True)
except S3ResponseError as e:
if e.error_code == 'NoSuchBucket':
bucketExisted = False
log.debug("Bucket '%s' does not exist.", bucket_name)
if create:
log.debug("Creating bucket '%s'.", bucket_name)
location = region_to_bucket_location(self.region)
bucket = self.s3.create_bucket(bucket_name, location=location)
# It is possible for create_bucket to return but
# for an immediate request for the bucket region to
# produce an S3ResponseError with code
# NoSuchBucket. We let that kick us back up to the
# main retry loop.
consistent_read=True,
query="select version from `%s` where ownerID='%s'" % (
self.filesDomain.name, jobStoreID)))
assert items is not None
if items:
log.debug("Deleting %d file(s) associated with job %s", len(items), jobStoreID)
n = self.itemsPerBatchDelete
batches = [items[i:i + n] for i in range(0, len(items), n)]
for batch in batches:
itemsDict = {item.name: None for item in batch}
for attempt in retry_sdb():
with attempt:
self.filesDomain.batch_delete_attributes(itemsDict)
for item in items:
version = item.get('version')
for attempt in retry_s3():
with attempt:
if version:
self.filesBucket.delete_key(key_name=bytes(item.name), version_id=version)
else:
self.filesBucket.delete_key(key_name=bytes(item.name))
:raises S3ResponseError: If `block` is True and the bucket still doesn't exist after the
retry timeout expires.
"""
assert self.minBucketNameLen <= len(bucket_name) <= self.maxBucketNameLen
assert self.bucketNameRe.match(bucket_name)
log.debug("Binding to job store bucket '%s'.", bucket_name)
def bucket_creation_pending(e):
# https://github.com/BD2KGenomics/toil/issues/955
# https://github.com/BD2KGenomics/toil/issues/995
# https://github.com/BD2KGenomics/toil/issues/1093
return (isinstance(e, (S3CreateError, S3ResponseError))
and e.error_code in ('BucketAlreadyOwnedByYou', 'OperationAborted'))
bucketExisted = True
for attempt in retry_s3(predicate=bucket_creation_pending):
with attempt:
try:
bucket = self.s3.get_bucket(bucket_name, validate=True)
except S3ResponseError as e:
if e.error_code == 'NoSuchBucket':
bucketExisted = False
log.debug("Bucket '%s' does not exist.", bucket_name)
if create:
log.debug("Creating bucket '%s'.", bucket_name)
location = region_to_bucket_location(self.region)
bucket = self.s3.create_bucket(bucket_name, location=location)
assert self.__getBucketRegion(bucket) == self.region
elif block:
raise
else:
return None
def _delete_bucket(self, bucket):
for attempt in retry_s3():
with attempt:
try:
for upload in bucket.list_multipart_uploads():
upload.cancel_upload()
keys = list()
for key in bucket.list_versions():
keys.append((key.name, key.version_id))
bucket.delete_keys(keys, quiet=True)
bucket.delete()
except S3ResponseError as e:
if e.error_code == 'NoSuchBucket':
pass
else:
raise
def __getBucketVersioning(self, bucket):
"""
For newly created buckets get_versioning_status returns an empty dict. In the past we've
seen None in this case. We map both to a return value of False.
Otherwise, the 'Versioning' entry in the dictionary returned by get_versioning_status can
be 'Enabled', 'Suspended' or 'Disabled' which we map to True, None and False
respectively. Note that we've never seen a versioning status of 'Disabled', only the
empty dictionary. Calling configure_versioning with False on a bucket will cause
get_versioning_status to then return 'Suspended' even on a new bucket that never had
versioning enabled.
"""
for attempt in retry_s3():
with attempt:
status = bucket.get_versioning_status()
return self.versionings[status['Versioning']] if status else False
def getSize(self):
"""
Return the size of the referenced item in bytes.
"""
if self.content is not None:
return len(self.content)
elif self.version:
for attempt in retry_s3():
with attempt:
key = self.outer.filesBucket.get_key(compat_bytes(self.fileID), validate=False)
return key.size
else:
return 0
def __getBucketRegion(self, bucket):
for attempt in retry_s3():
with attempt:
return bucket_location_to_region(bucket.get_location())