Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_store_isRestricted_flag():
# Store a file with access requirements
path = utils.make_bogus_binary_file()
schedule_for_cleanup(path)
entity = File(path, name='Secret human data', parent=project)
# We don't want to spam ACT with test emails
with patch('synapseclient.client.Synapse._createAccessRequirementIfNone') as intercepted:
entity = syn.store(entity, isRestricted=True)
assert_true(intercepted.called)
assert_raises(SynapseHTTPError, syn.getProvenance, output[file_entity.id])
schedule_for_cleanup(output[file_entity.id])
# Test: setProvenance = Existing
output_URL = synapseutils.copy(syn, externalURL_entity.id, destinationId=second_folder.id, setProvenance="existing")
output_prov = syn.getProvenance(output_URL[externalURL_entity.id])
schedule_for_cleanup(output_URL[externalURL_entity.id])
assert_equals(output_prov['name'], prov['name'])
assert_equals(output_prov['used'], prov['used'])
# ------------------------------------
# TEST COPY LINKS
# ------------------------------------
second_file = utils.make_bogus_data_file()
# schedule_for_cleanup(filename)
second_file_entity = syn.store(File(second_file, parent=project_entity))
link_entity = Link(second_file_entity.id, parent=folder_entity.id)
link_entity = syn.store(link_entity)
copied_link = synapseutils.copy(syn, link_entity.id, destinationId=second_folder.id)
old = syn.get(link_entity.id, followLink=False)
new = syn.get(copied_link[link_entity.id], followLink=False)
assert_equals(old.linksTo['targetId'], new.linksTo['targetId'])
schedule_for_cleanup(second_file_entity.id)
schedule_for_cleanup(link_entity.id)
schedule_for_cleanup(copied_link[link_entity.id])
time.sleep(3)
assert_raises(ValueError, synapseutils.copy, syn, link_entity.id, destinationId=second_folder.id)
"Oaxaca", "Cancún", "Curaçao", "जोधपुर",
"অসম", "ལྷ་ས།", "ཐིམ་ཕུ་", "دبي", "አዲስ አበባ",
"São Paulo", "Buenos Aires", "Cartagena",
"Amsterdam", "Venice", "Rome", "Dubrovnik",
"Sarajevo", "Madrid", "Barcelona", "Paris",
"Αθήνα", "Ρόδος", "København", "Zürich",
"金沢市", "서울", "แม่ฮ่องสอน", "Москва"]
text = "Places I wanna go:\n"
while len(text.encode('utf-8')) < multipart_upload_module.MIN_PART_SIZE:
text += ", ".join(random.choice(cities) for i in range(5000)) + "\n"
fhid = multipart_upload_string(syn, text)
# Download the file and compare it with the original
junk = File(parent=project, dataFileHandleId=fhid)
junk.properties.update(syn._createEntity(junk.properties))
(tmp_f, tmp_path) = tempfile.mkstemp()
schedule_for_cleanup(tmp_path)
junk['path'] = syn._downloadFileHandle(fhid, junk['id'], "FileEntity", tmp_path)
with open(junk.path, encoding='utf-8') as f:
retrieved_text = f.read()
assert_equals(retrieved_text, text)
def test_syncFromSynapse__project_contains_empty_folder():
project = Project(name="the project", parent="whatever", id="syn123")
file = File(name="a file", parent=project, id="syn456")
folder = Folder(name="a folder", parent=project, id="syn789")
with patch.object(syn, "getChildren", side_effect=[[folder, file], []]) as patch_syn_get_children,\
patch.object(syn, "get", side_effect=[folder, file]) as patch_syn_get:
assert_equals([file], synapseutils.syncFromSynapse(syn, project))
expected_get_children_agrs = [call(project['id']), call(folder['id'])]
assert_list_equal(expected_get_children_agrs, patch_syn_get_children.call_args_list)
expected_get_args = [
call(folder['id'], downloadLocation=None, ifcollision='overwrite.local', followLink=False),
call(file['id'], downloadLocation=None, ifcollision='overwrite.local', followLink=False)]
assert_list_equal(expected_get_args, patch_syn_get.call_args_list)
def test_store_with_flags():
# -- CreateOrUpdate flag for Projects --
# If we store a project with the same name, it should become an update
projUpdate = Project(project.name)
projUpdate.updatedThing = 'Yep, sho\'nuf it\'s updated!'
projUpdate = syn.store(projUpdate, createOrUpdate=True)
assert_equals(project.id, projUpdate.id)
assert_equals(projUpdate.updatedThing, ['Yep, sho\'nuf it\'s updated!'])
# Store a File
filepath = utils.make_bogus_binary_file()
schedule_for_cleanup(filepath)
origBogus = File(filepath, name='Bogus Test File', parent=project)
origBogus = syn.store(origBogus, createOrUpdate=True)
assert_equals(origBogus.versionNumber, 1)
# Modify existing annotations by createOrUpdate
del projUpdate['parentId']
del projUpdate['id']
projUpdate.updatedThing = 'Updated again'
projUpdate.addedThing = 'Something new'
projUpdate = syn.store(projUpdate, createOrUpdate=True)
assert_equals(project.id, projUpdate.id)
assert_equals(projUpdate.updatedThing, ['Updated again'])
# -- ForceVersion flag --
# Re-store the same thing and don't up the version
mutaBogus = syn.store(origBogus, forceVersion=False)
assert_equals(mutaBogus.versionNumber, 1)
def test_store_activity():
# Create a File and an Activity
path = utils.make_bogus_binary_file()
schedule_for_cleanup(path)
entity = File(path, name='Hinkle horn honking holes', parent=project)
honking = Activity(name='Hinkle horn honking',
description='Nettlebed Cave is a limestone cave located on the South Island of New Zealand.')
honking.used('http://www.flickr.com/photos/bevanbfree/3482259379/')
honking.used('http://www.flickr.com/photos/bevanbfree/3482185673/')
# This doesn't set the ID of the Activity
entity = syn.store(entity, activity=honking)
# But this does
honking = syn.getProvenance(entity.id)
# Verify the Activity
assert_equals(honking['name'], 'Hinkle horn honking')
assert_equals(len(honking['used']), 2)
assert_equals(honking['used'][0]['concreteType'], 'org.sagebionetworks.repo.model.provenance.UsedURL')
assert_false(honking['used'][0]['wasExecuted'])
if path[0].isalpha() and path[1] == ':':
# A Windows file URL looks like this: file:///c:/foo/bar/bat.txt
expected_url = 'file:///' + path.replace("\\", "/")
else:
expected_url = 'file://' + path
assert_equals(bogus.externalURL, expected_url, 'URL: %s\nExpected %s' % (bogus.externalURL, expected_url))
# A file path that doesn't exist should still work
bogus = File('/path/to/local/file1.xyz', parentId=project.id, synapseStore=False)
bogus = syn.store(bogus)
assert_raises(IOError, syn.get, bogus)
assert_false(bogus.synapseStore)
# Try a URL
bogus = File('http://dev-versions.synapse.sagebase.org/synapsePythonClient', parent=project, synapseStore=False)
bogus = syn.store(bogus)
bogus = syn.get(bogus)
assert_false(bogus.synapseStore)
def synapseStore(file_dirs,root,parent_syn,executed_urls,used):
for file in file_dirs:
file_dir = root+'/'+file
file = synapseclient.File(file_dir, parent=parent_syn)
file = syn.store(file,executed=executed_urls,used=used)
fileHandle = synapseclient.utils.find_data_file_handle(bundle)
createdBy = fileHandle['createdBy']
# CHECK: If the user created the file, copy the file by using fileHandleId else copy the fileHandle
if profile.ownerId == createdBy:
newdataFileHandleId = ent.dataFileHandleId
else:
copiedFileHandle = copyFileHandles(syn, [fileHandle], ["FileEntity"], [bundle['entity']['id']],
[fileHandle['contentType']], [fileHandle['fileName']])
# Check if failurecodes exist
copyResult = copiedFileHandle['copyResults'][0]
if copyResult.get("failureCode") is not None:
raise ValueError("%s dataFileHandleId: %s" % (copyResult["failureCode"],
copyResult['originalFileHandleId']))
newdataFileHandleId = copyResult['newFileHandle']['id']
new_ent = File(dataFileHandleId=newdataFileHandleId, name=ent.name, parentId=destinationId)
# Set annotations here
if not skipCopyAnnotations:
new_ent.annotations = ent.annotations
# Store provenance if act is not None
if act is not None:
new_ent = syn.store(new_ent, activity=act)
else:
new_ent = syn.store(new_ent)
# Leave this return statement for test
return new_ent['id']