Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_copyFileHandleAndchangeFileMetadata():
project_entity = syn.store(Project(name=str(uuid.uuid4())))
schedule_for_cleanup(project_entity.id)
filename = utils.make_bogus_data_file()
attachname = utils.make_bogus_data_file()
schedule_for_cleanup(filename)
schedule_for_cleanup(attachname)
file_entity = syn.store(File(filename, parent=project_entity))
schedule_for_cleanup(file_entity.id)
wiki = Wiki(owner=project_entity, title='A Test Wiki', markdown="testing",
attachments=[attachname])
wiki = syn.store(wiki)
wikiattachments = syn._getFileHandle(wiki.attachmentFileHandleIds[0])
# CHECK: Can batch copy two file handles (wiki attachments and file entity)
copiedFileHandles = synapseutils.copyFileHandles(syn, [file_entity.dataFileHandleId,
wiki.attachmentFileHandleIds[0]],
[file_entity.concreteType.split(".")[-1], "WikiAttachment"],
[file_entity.id, wiki.id],
def test_syncFromSynapse():
"""This function tests recursive download as defined in syncFromSynapse
most of the functionality of this function are already tested in the
tests/integration/test_command_line_client::test_command_get_recursive_and_query
which means that the only test if for path=None
"""
# Create a Project
project_entity = syn.store(synapseclient.Project(name=str(uuid.uuid4())))
schedule_for_cleanup(project_entity.id)
# Create a Folder in Project
folder_entity = syn.store(Folder(name=str(uuid.uuid4()), parent=project_entity))
# Create and upload two files in Folder
uploaded_paths = []
for i in range(2):
f = utils.make_bogus_data_file()
uploaded_paths.append(f)
schedule_for_cleanup(f)
syn.store(File(f, parent=folder_entity))
# Add a file in the project level as well
f = utils.make_bogus_data_file()
uploaded_paths.append(f)
schedule_for_cleanup(f)
old = syn.get(link_entity.id, followLink=False)
new = syn.get(copied_link[link_entity.id], followLink=False)
assert_equals(old.linksTo['targetId'], new.linksTo['targetId'])
schedule_for_cleanup(second_file_entity.id)
schedule_for_cleanup(link_entity.id)
schedule_for_cleanup(copied_link[link_entity.id])
time.sleep(3)
assert_raises(ValueError, synapseutils.copy, syn, link_entity.id, destinationId=second_folder.id)
# ------------------------------------
# TEST COPY TABLE
# ------------------------------------
second_project = syn.store(Project(name=str(uuid.uuid4())))
schedule_for_cleanup(second_project.id)
cols = [Column(name='n', columnType='DOUBLE', maximumSize=50),
Column(name='c', columnType='STRING', maximumSize=50),
Column(name='i', columnType='INTEGER')]
data = [[2.1, 'foo', 10],
[2.2, 'bar', 20],
[2.3, 'baz', 30]]
schema = syn.store(Schema(name='Testing', columns=cols, parent=project_entity.id))
syn.store(RowSet(schema=schema, rows=[Row(r) for r in data]))
table_map = synapseutils.copy(syn, schema.id, destinationId=second_project.id)
copied_table = syn.tableQuery('select * from %s' % table_map[schema.id])
rows = copied_table.asRowSet()['rows']
# TEST: Check if all values are the same
for i, row in enumerate(rows):
def test_syncFromSynapse__project_contains_empty_folder():
project = Project(name="the project", parent="whatever", id="syn123")
file = File(name="a file", parent=project, id="syn456")
folder = Folder(name="a folder", parent=project, id="syn789")
with patch.object(syn, "getChildren", side_effect=[[folder, file], []]) as patch_syn_get_children,\
patch.object(syn, "get", side_effect=[folder, file]) as patch_syn_get:
assert_equals([file], synapseutils.syncFromSynapse(syn, project))
expected_get_children_agrs = [call(project['id']), call(folder['id'])]
assert_list_equal(expected_get_children_agrs, patch_syn_get_children.call_args_list)
expected_get_args = [
call(folder['id'], downloadLocation=None, ifcollision='overwrite.local', followLink=False),
call(file['id'], downloadLocation=None, ifcollision='overwrite.local', followLink=False)]
assert_list_equal(expected_get_args, patch_syn_get.call_args_list)
schedule_for_cleanup(proj_for_cleanup)
project = Project(name, b=3, c=4)
project = syn.store(project)
assert_equals(project.a, [1])
assert_equals(project.b, [3])
assert_equals(project.c, [4])
project = syn.get(project.id)
assert_equals(project.a, [1])
assert_equals(project.b, [3])
assert_equals(project.c, [4])
project = Project(name, c=5, d=6)
assert_raises(Exception, syn.store, project, createOrUpdate=False)
def test_walk():
walked = []
firstfile = utils.make_bogus_data_file()
schedule_for_cleanup(firstfile)
project_entity = syn.store(Project(name=str(uuid.uuid4())))
schedule_for_cleanup(project_entity.id)
folder_entity = syn.store(Folder(name=str(uuid.uuid4()), parent=project_entity))
schedule_for_cleanup(folder_entity.id)
second_folder = syn.store(Folder(name=str(uuid.uuid4()), parent=project_entity))
schedule_for_cleanup(second_folder.id)
file_entity = syn.store(File(firstfile, parent=project_entity))
schedule_for_cleanup(file_entity.id)
walked.append(((project_entity.name, project_entity.id),
[(folder_entity.name, folder_entity.id), (second_folder.name, second_folder.id)],
[(file_entity.name, file_entity.id)]))
nested_folder = syn.store(Folder(name=str(uuid.uuid4()), parent=folder_entity))
schedule_for_cleanup(nested_folder.id)
secondfile = utils.make_bogus_data_file()
schedule_for_cleanup(secondfile)
def test_syncFromSynapse__children_contain_non_file():
proj = syn.store(Project(name="test_syncFromSynapse_children_non_file" + str(uuid.uuid4())))
schedule_for_cleanup(proj)
temp_file = utils.make_bogus_data_file()
schedule_for_cleanup(temp_file)
file_entity = syn.store(File(temp_file, name="temp_file_test_syncFromSynapse_children_non_file" + str(uuid.uuid4()),
parent=proj))
syn.store(Schema(name="table_test_syncFromSynapse", parent=proj))
temp_folder = tempfile.mkdtemp()
schedule_for_cleanup(temp_folder)
files_list = synapseutils.syncFromSynapse(syn, proj, temp_folder)
assert_equals(1, len(files_list))
assert_equals(file_entity, files_list[0])
def test_large_file_upload(file_to_upload_size=11*utils.KB, filepath=None):
clean_up_file = False
try:
project = syn.store(Project("File Upload Load Test " + datetime.now().strftime("%Y-%m-%d %H%M%S%f")))
if filepath:
## keep a file around so we don't have to regenerate it.
if not os.path.exists(filepath):
filepath = utils.make_bogus_binary_file(file_to_upload_size, filepath=filepath, printprogress=True)
else:
## generate a temporary file and clean it up when we're done
clean_up_file = True
filepath = utils.make_bogus_binary_file(file_to_upload_size, printprogress=True)
try:
junk = syn.store(File(filepath, parent=project))
fh = syn._getFileHandle(junk['dataFileHandleId'])
syn.printEntity(fh)