Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# to fill it to minimum part size
upload_buffer.append(
current_intent, upload_buffer.end_offset + missing_length
)
# completely flush the upload buffer
for upload_buffer_part in self._buff_split(upload_buffer):
yield self._get_upload_part(upload_buffer_part)
# split current intent (copy source) to parts and yield
copy_parts = self._get_copy_parts(
current_intent,
start_offset=upload_buffer.end_offset,
end_offset=current_end,
)
for part in copy_parts:
yield part
upload_buffer = UploadBuffer(current_end)
else:
# this is a upload source or "small copy" source - either way we just add them to upload buffer
upload_buffer.append(current_intent, current_end)
upload_buffer_parts = list(self._buff_split(upload_buffer))
# we flush all parts excluding last one - we may want to extend
# this last part with "incoming" intent in next loop run
for upload_buffer_part in upload_buffer_parts[:-1]:
yield self._get_upload_part(upload_buffer_part)
upload_buffer = upload_buffer_parts[-1]
current_intent = intent
current_end = fragment_end
if current_intent is None:
# this is a sentinel - there would be no more fragments - we have to flush upload buffer
for upload_buffer_part in self._buff_split(upload_buffer):
yield self._get_upload_part(upload_buffer_part)
def _get_emerge_parts(self, intent_fragments_iterator):
# This is where the magic happens. Instead of describing typical inputs and outputs here,
# We've put them in tests. It is recommended to read those tests before trying to comprehend
# the implementation details of this function.
min_part_size = self.min_part_size
# this stores current intent that we need to process - we may get
# it in fragments so we want to glue just by updating `current_end`
current_intent = None
current_end = 0
upload_buffer = UploadBuffer(0)
for intent, fragment_end in intent_fragments_iterator:
if current_intent is None:
# this is a first loop run - just initialize current intent
current_intent = intent
current_end = fragment_end
continue
if intent is current_intent:
# new intent is the same as previously processed intent, so lets glue them together
# this happens when the caller splits N overlapping intents into overlapping fragments
# and two fragments from the same intent end up streaming into here one after the other
current_end = fragment_end
continue
# incoming intent is different - this means that now we have to decide what to do:
# if this is a copy intent and we want to copy it server-side, then we have to
def _buff_partition(self, upload_buffer):
""" Split upload buffer to two parts (smaller upload buffers).
In result left part cannot be splitted more, and nothing can be assumed about right part.
:rtype tuple(b2sdk.transfer.emerge.planner.planner.UploadBuffer,
b2sdk.transfer.emerge.planner.planner.UploadBuffer):
"""
left_buff = UploadBuffer(upload_buffer.start_offset)
buff_start = upload_buffer.start_offset
for idx, (intent, fragment_end) in enumerate(upload_buffer.iter_items()):
candidate_size = fragment_end - buff_start
if candidate_size > self.recommended_upload_part_size:
right_fragment_size = candidate_size - self.recommended_upload_part_size
left_buff.append(intent, fragment_end - right_fragment_size)
return left_buff, upload_buffer.get_slice(
start_idx=idx, start_offset=left_buff.end_offset
)
else:
left_buff.append(intent, fragment_end)
if candidate_size == self.recommended_upload_part_size:
return left_buff, upload_buffer.get_slice(start_idx=idx + 1)
return left_buff, UploadBuffer(left_buff.end_offset)
left_buff = UploadBuffer(upload_buffer.start_offset)
buff_start = upload_buffer.start_offset
for idx, (intent, fragment_end) in enumerate(upload_buffer.iter_items()):
candidate_size = fragment_end - buff_start
if candidate_size > self.recommended_upload_part_size:
right_fragment_size = candidate_size - self.recommended_upload_part_size
left_buff.append(intent, fragment_end - right_fragment_size)
return left_buff, upload_buffer.get_slice(
start_idx=idx, start_offset=left_buff.end_offset
)
else:
left_buff.append(intent, fragment_end)
if candidate_size == self.recommended_upload_part_size:
return left_buff, upload_buffer.get_slice(start_idx=idx + 1)
return left_buff, UploadBuffer(left_buff.end_offset)