Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
super(BaseUploadInputManagerTest, self).setUp()
self.osutil = OSUtils()
self.config = TransferConfig()
self.recording_subscriber = RecordingSubscriber()
self.subscribers.append(self.recording_subscriber)
def test_uses_bandwidth_limiter(self):
self.content = b'a' * 1024 * 1024
self.stream = six.BytesIO(self.content)
self.config = TransferConfig(
max_request_concurrency=1, max_bandwidth=len(self.content)/2)
self._manager = TransferManager(self.client, self.config)
self.add_head_object_response()
self.add_successful_get_object_responses()
start = time.time()
future = self.manager.download(
self.bucket, self.key, self.filename, self.extra_args)
future.result()
# This is just a smoke test to make sure that the limiter is
# being used and not necessary its exactness. So we set the maximum
# bandwidth to len(content)/2 per sec and make sure that it is
# noticeably slower. Ideally it will take more than two seconds, but
# given tracking at the beginning of transfers are not entirely
# accurate setting at the initial start of a transfer, we give us
def test_upload_with_bandwidth_limiter(self):
self.content = b'a' * 1024 * 1024
with open(self.filename, 'wb') as f:
f.write(self.content)
self.config = TransferConfig(
max_request_concurrency=1, max_bandwidth=len(self.content)/2)
self._manager = TransferManager(self.client, self.config)
self.add_put_object_response_with_default_expected_params()
start = time.time()
future = self.manager.upload(self.filename, self.bucket, self.key)
future.result()
# This is just a smoke test to make sure that the limiter is
# being used and not necessary its exactness. So we set the maximum
# bandwidth to len(content)/2 per sec and make sure that it is
# noticeably slower. Ideally it will take more than two seconds, but
# given tracking at the beginning of transfers are not entirely
# accurate setting at the initial start of a transfer, we give us
# some flexibility by setting the expected time to half of the
# theoretical time to take.
self.assertGreaterEqual(time.time() - start, 1)
def setUp(self):
super(BaseDownloadTest, self).setUp()
self.config = TransferConfig(max_request_concurrency=1)
self._manager = TransferManager(self.client, self.config)
# Create a temporary directory to write to
self.tempdir = tempfile.mkdtemp()
self.filename = os.path.join(self.tempdir, 'myfile')
# Initialize some default arguments
self.bucket = 'mybucket'
self.key = 'mykey'
self.extra_args = {}
self.subscribers = []
# Create a stream to read from
self.content = b'my content'
self.stream = six.BytesIO(self.content)
def setUp(self):
super(BaseCopyTest, self).setUp()
self.config = TransferConfig(
max_request_concurrency=1,
multipart_chunksize=MIN_UPLOAD_CHUNKSIZE,
multipart_threshold=MIN_UPLOAD_CHUNKSIZE * 4
)
self._manager = TransferManager(self.client, self.config)
# Initialize some default arguments
self.bucket = 'mybucket'
self.key = 'mykey'
self.copy_source = {
'Bucket': 'mysourcebucket',
'Key': 'mysourcekey'
}
self.extra_args = {}
self.subscribers = []
def setUp(self):
super(TestDownload, self).setUp()
self.multipart_threshold = 5 * 1024 * 1024
self.config = TransferConfig(
multipart_threshold=self.multipart_threshold
)
# The purpose of this test is to make sure if an error is raised
# in the body of the context manager, incomplete transfers will
# be cancelled with value of the exception wrapped by a CancelledError
# NOTE: The fact that delete() was chosen to test this is arbitrary
# other than it is the easiet to set up for the stubber.
# The specific operation is not important to the purpose of this test.
num_transfers = 100
futures = []
for _ in range(num_transfers):
self.stubber.add_response('delete_object', {})
manager = TransferManager(
self.client,
TransferConfig(
max_request_concurrency=1, max_submission_concurrency=1)
)
try:
with manager:
for i in range(num_transfers):
futures.append(manager.delete('mybucket', 'mykey'))
raise KeyboardInterrupt()
except KeyboardInterrupt:
# At least one of the submitted futures should have been
# cancelled.
with self.assertRaisesRegexp(
CancelledError, 'KeyboardInterrupt()'):
for future in futures:
future.result()
def setUp(self):
super(BaseUploadTest, self).setUp()
# TODO: We do not want to use the real MIN_UPLOAD_CHUNKSIZE
# when we're adjusting parts.
# This is really wasteful and fails CI builds because self.contents
# would normally use 10MB+ of memory.
# Until there's an API to configure this, we're patching this with
# a min size of 1. We can't patch MIN_UPLOAD_CHUNKSIZE directly
# because it's already bound to a default value in the
# chunksize adjuster. Instead we need to patch out the
# chunksize adjuster class.
self.adjuster_patch = mock.patch(
's3transfer.upload.ChunksizeAdjuster',
lambda: ChunksizeAdjuster(min_size=1))
self.adjuster_patch.start()
self.config = TransferConfig(max_request_concurrency=1)
self._manager = TransferManager(self.client, self.config)
# Create a temporary directory with files to read from
self.tempdir = tempfile.mkdtemp()
self.filename = os.path.join(self.tempdir, 'myfile')
self.content = b'my content'
with open(self.filename, 'wb') as f:
f.write(self.content)
# Initialize some default arguments
self.bucket = 'mybucket'
self.key = 'mykey'
self.extra_args = {}
self.subscribers = []
def __init__(self, client, config=None, osutil=None, executor_cls=None):
"""A transfer manager interface for Amazon S3
:param client: Client to be used by the manager
:param config: TransferConfig to associate specific configurations
:param osutil: OSUtils object to use for os-related behavior when
using with transfer manager.
:type executor_cls: s3transfer.futures.BaseExecutor
:param executor_cls: The class of executor to use with the transfer
manager. By default, concurrent.futures.ThreadPoolExecutor is used.
"""
self._client = client
self._config = config
if config is None:
self._config = TransferConfig()
self._osutil = osutil
if osutil is None:
self._osutil = OSUtils()
self._coordinator_controller = TransferCoordinatorController()
# A counter to create unique id's for each transfer submitted.
self._id_counter = 0
# The executor responsible for making S3 API transfer requests
self._request_executor = BoundedExecutor(
max_size=self._config.max_request_queue_size,
max_num_threads=self._config.max_request_concurrency,
tag_semaphores={
IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
self._config.max_in_memory_upload_chunks),
IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
self._config.max_in_memory_download_chunks)
:type config: boto3.s3.transfer.TransferConfig
:param config: The transfer config to use
:type osutil: s3transfer.utils.OSUtils
:param osutil: The os utility to use
:rtype: s3transfer.manager.TransferManager
:returns: A transfer manager based on parameters provided
"""
executor_cls = None
if not config.use_threads:
executor_cls = NonThreadedExecutor
return TransferManager(client, config, osutil, executor_cls)
class TransferConfig(S3TransferConfig):
ALIAS = {
'max_concurrency': 'max_request_concurrency',
'max_io_queue': 'max_io_queue_size'
}
def __init__(self,
multipart_threshold=8 * MB,
max_concurrency=10,
multipart_chunksize=8 * MB,
num_download_attempts=5,
max_io_queue=100,
io_chunksize=256 * KB,
use_threads=True):
"""Configuration object for managed S3 transfers
:param multipart_threshold: The transfer size threshold for which