Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
lambda: ChunksizeAdjuster(min_size=1))
self.adjuster_patch.start()
def get_chunksize(filesize):
return ChunksizeAdjuster().adjust_chunksize(DEFAULT_CHUNKSIZE, filesize)
'extra_args': call_args.extra_args,
}
)
)
# Submit requests to upload the parts of the file.
part_futures = []
extra_part_args = self._extra_upload_part_args(call_args.extra_args)
# Get any tags that need to be associated to the submitted task
# for upload the data
upload_part_tag = self._get_upload_task_tag(
upload_input_manager, 'upload_part')
size = transfer_future.meta.size
adjuster = ChunksizeAdjuster()
chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size)
part_iterator = upload_input_manager.yield_upload_part_bodies(
transfer_future, chunksize)
for part_number, fileobj in part_iterator:
part_futures.append(
self._transfer_coordinator.submit(
request_executor,
UploadPartTask(
transfer_coordinator=self._transfer_coordinator,
main_kwargs={
'client': client,
'fileobj': fileobj,
'bucket': call_args.bucket,
'key': call_args.key,
'part_number': part_number,
def _calculate_etag(file_path):
"""
Attempts to calculate a local file's ETag the way S3 does:
- Normal uploads: MD5 of the file
- Multi-part uploads: MD5 of the (binary) MD5s of the parts, dash, number of parts
We can't know how the file was actually uploaded - but we're assuming it was done using
the default settings, which we get from `s3_transfer_config`.
"""
size = pathlib.Path(file_path).stat().st_size
with open(file_path, 'rb') as fd:
if size <= s3_transfer_config.multipart_threshold:
contents = fd.read()
etag = hashlib.md5(contents).hexdigest()
else:
adjuster = ChunksizeAdjuster()
chunksize = adjuster.adjust_chunksize(s3_transfer_config.multipart_chunksize, size)
hashes = []
while True:
contents = fd.read(chunksize)
if not contents:
break
hashes.append(hashlib.md5(contents).digest())
etag = '%s-%d' % (hashlib.md5(b''.join(hashes)).hexdigest(), len(hashes))
return '"%s"' % etag
if extra_args:
params.update(extra_args)
resp = s3_client.copy_object(**params)
ctx.progress(size)
version_id = resp.get('VersionId') # Absent in unversioned buckets.
ctx.done(make_s3_url(dest_bucket, dest_key, version_id))
else:
resp = s3_client.create_multipart_upload(
Bucket=dest_bucket,
Key=dest_key,
)
upload_id = resp['UploadId']
adjuster = ChunksizeAdjuster()
chunksize = adjuster.adjust_chunksize(s3_transfer_config.multipart_chunksize, size)
chunk_offsets = list(range(0, size, chunksize))
lock = Lock()
remaining = len(chunk_offsets)
parts = [None] * remaining
def upload_part(i, start, end):
nonlocal remaining
part_id = i + 1
part = s3_client.upload_part_copy(
CopySource=src_params,
CopySourceRange=f'bytes={start}-{end-1}',
Bucket=dest_bucket,
Key=dest_key,
resp = s3_client.put_object(
Body=fd,
Bucket=dest_bucket,
Key=dest_key,
)
version_id = resp.get('VersionId') # Absent in unversioned buckets.
ctx.done(make_s3_url(dest_bucket, dest_key, version_id))
else:
resp = s3_client.create_multipart_upload(
Bucket=dest_bucket,
Key=dest_key,
)
upload_id = resp['UploadId']
adjuster = ChunksizeAdjuster()
chunksize = adjuster.adjust_chunksize(s3_transfer_config.multipart_chunksize, size)
chunk_offsets = list(range(0, size, chunksize))
lock = Lock()
remaining = len(chunk_offsets)
parts = [None] * remaining
def upload_part(i, start, end):
nonlocal remaining
part_id = i + 1
with OSUtils().open_file_chunk_reader(src_path, start, end-start, [ctx.progress]) as fd:
part = s3_client.upload_part(
Body=fd,
Bucket=dest_bucket,
Key=dest_key,
request_executor,
CreateMultipartUploadTask(
transfer_coordinator=self._transfer_coordinator,
main_kwargs={
'client': client,
'bucket': call_args.bucket,
'key': call_args.key,
'extra_args': create_multipart_extra_args,
}
)
)
# Determine how many parts are needed based on filesize and
# desired chunksize.
part_size = config.multipart_chunksize
adjuster = ChunksizeAdjuster()
part_size = adjuster.adjust_chunksize(
part_size, transfer_future.meta.size)
num_parts = int(
math.ceil(transfer_future.meta.size / float(part_size)))
# Submit requests to upload the parts of the file.
part_futures = []
progress_callbacks = get_callbacks(transfer_future, 'progress')
for part_number in range(1, num_parts + 1):
extra_part_args = self._extra_upload_part_args(
call_args.extra_args)
# The part number for upload part starts at 1 while the
# range parameter starts at zero, so just subtract 1 off of
# the part number
extra_part_args['CopySourceRange'] = calculate_range_parameter(