Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_large_file_upload(file_to_upload_size=11*utils.KB, filepath=None):
clean_up_file = False
try:
project = syn.store(Project("File Upload Load Test " + datetime.now().strftime("%Y-%m-%d %H%M%S%f")))
if filepath:
## keep a file around so we don't have to regenerate it.
if not os.path.exists(filepath):
filepath = utils.make_bogus_binary_file(file_to_upload_size, filepath=filepath, printprogress=True)
else:
## generate a temporary file and clean it up when we're done
clean_up_file = True
filepath = utils.make_bogus_binary_file(file_to_upload_size, printprogress=True)
try:
junk = syn.store(File(filepath, parent=project))
fh = syn._getFileHandle(junk['dataFileHandleId'])
syn.printEntity(fh)
finally:
try:
if 'junk' in locals():
syn.delete(junk)
except Exception:
def test_large_file_upload(file_to_upload_size=11*utils.KB, filepath=None):
clean_up_file = False
try:
project = syn.store(Project("File Upload Load Test " + datetime.now().strftime("%Y-%m-%d %H%M%S%f")))
if filepath:
## keep a file around so we don't have to regenerate it.
if not os.path.exists(filepath):
filepath = utils.make_bogus_binary_file(file_to_upload_size, filepath=filepath, printprogress=True)
else:
## generate a temporary file and clean it up when we're done
clean_up_file = True
filepath = utils.make_bogus_binary_file(file_to_upload_size, printprogress=True)
try:
junk = syn.store(File(filepath, parent=project))
fh = syn._getFileHandle(junk['dataFileHandleId'])
syn.printEntity(fh)
finally:
try:
if 'junk' in locals():
syn.delete(junk)
except Exception:
print(traceback.format_exc())
finally:
try:
if 'filepath' in locals() and clean_up_file:
def test_upload_speed(uploadSize=60 + 777771, threadCount=5):
import time
fh = None
filepath = utils.make_bogus_binary_file(uploadSize*MB)
try:
t0 = time.time()
fh = syn._uploadToFileHandleService(filepath, threadCount=threadCount)
dt = time.time()-t0
finally:
try:
os.remove(filepath)
except Exception:
print(traceback.format_exc())
if fh:
syn._deleteFileHandle(fh)
return dt
def test_synStore_sftpIntegration():
"""Creates a File Entity on an sftp server and add the external url. """
filepath = utils.make_bogus_binary_file(1*MB - 777771)
try:
file = syn.store(File(filepath, parent=project))
file2 = syn.get(file)
assert_equals(file.externalURL, file2.externalURL)
assert_equals(urlparse(file2.externalURL).scheme, 'sftp')
tmpdir = tempfile.mkdtemp()
schedule_for_cleanup(tmpdir)
# test that we got an MD5 à la SYNPY-185
assert_is_not_none(file2.md5)
fh = syn._getFileHandle(file2.dataFileHandleId)
assert_is_not_none(fh['contentMd5'])
assert_equals(file2.md5, fh['contentMd5'])
finally:
try:
def manually_check_retry_on_key_does_not_exist():
## This is a manual test -- don't know how to automate this one.
## To run: nosetests -vs tests/integration/test_chunked_upload.py:manually_check_retry_on_key_does_not_exist
## We're testing the retrying of key-does-not-exist errors from S3.
## Expected behavior: Retries several times, getting a error message:
## 'The specified key does not exist.', then fails with a stack trace.
i = 1
filepath = utils.make_bogus_binary_file(6*MB)
try:
token = syn._createChunkedFileUploadToken(filepath, 'application/octet-stream')
chunkRequest, url = syn._createChunkedFileUploadChunkURL(i, token)
## never upload the chunk, so we will get an error 'The specified key does not exist.'
chunkResult = syn._addChunkToFile(chunkRequest)
finally:
os.remove(filepath)
def test_randomly_failing_parts():
FAILURE_RATE = 1.0/3.0
fhid = None
multipart_upload_module.MIN_PART_SIZE = 5*MB
multipart_upload_module.MAX_RETRIES = 20
filepath = utils.make_bogus_binary_file(multipart_upload_module.MIN_PART_SIZE*2 + 777771)
normal_put_chunk = None
def _put_chunk_or_fail_randomly(url, chunk, verbose=False):
if random.random() < FAILURE_RATE:
raise IOError("Ooops! Artificial upload failure for testing.")
else:
return normal_put_chunk(url, chunk, verbose)
# Mock _put_chunk to fail randomly
normal_put_chunk = multipart_upload_module._put_chunk
multipart_upload_module._put_chunk = _put_chunk_or_fail_randomly
try:
fhid = multipart_upload(syn, filepath)
def test_round_trip():
fh = None
filepath = utils.make_bogus_binary_file(6*MB + 777771)
print 'Made bogus file: ', filepath
try:
fh = syn._chunkedUploadFile(filepath)
# print 'FileHandle:'
# syn.printEntity(fh)
# Download the file and compare it with the original
junk = File(filepath, parent=project, dataFileHandleId=fh['id'])
junk.properties.update(syn._createEntity(junk.properties))
junk.update(syn._downloadFileEntity(junk, filepath))
assert filecmp.cmp(filepath, junk.path)
finally:
try:
if 'junk' in locals():
syn.delete(junk)
def test_single_thread_upload():
synapseclient.config.single_threaded = True
try:
filepath = utils.make_bogus_binary_file(multipart_upload_module.MIN_PART_SIZE * 2 + 1)
assert_is_not_none(multipart_upload(syn, filepath))
finally:
synapseclient.config.single_threaded = False