Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def delete(self, obj):
"""
Removes an object from Synapse.
:param obj: An existing object stored on Synapse
such as Evaluation, File, Project, WikiPage etc
"""
# Handle all strings as the Entity ID for backward compatibility
if isinstance(obj, basestring):
self.restDELETE(uri='/entity/%s' % id_of(obj))
elif hasattr(obj, "_synapse_delete"):
return obj._synapse_delete(self)
else:
try:
self.restDELETE(obj.deleteURI())
except AttributeError as ex1:
SynapseError("Can't delete a %s" % type(obj))
def _findEntityIdByNameAndParent(self, name, parent=None):
"""
Find an Entity given its name and parent ID.
:returns: the Entity ID or None if not found
"""
if parent is None:
parent = ROOT_ENTITY
qr = self.query('select id from entity where name=="%s" and parentId=="%s"' % (name, id_of(parent)))
if qr.get('totalNumberOfResults', 0) == 1:
return qr['results'][0]['entity.id']
else:
return None
def _getEntity(self, entity, version=None):
"""
Get an entity from Synapse.
:param entity: A Synapse ID, a dictionary representing an Entity, or a Synapse Entity object
:param version: The version number to fetch
:returns: A dictionary containing an Entity's properties
"""
uri = '/entity/'+id_of(entity)
if version:
uri += '/version/%d' % version
return self.restGET(uri)
def getWikiHeaders(self, owner):
"""
Retrieves the header of all Wiki's belonging to the owner.
:param owner: An Evaluation or Entity
:returns: A list of Objects with three fields: id, title and parentId.
"""
uri = '/entity/%s/wikiheadertree' % id_of(owner)
return [DictObject(**header) for header in self.restGET(uri)['results']]
def getTableColumns(self, table, limit=100, offset=0):
"""
Retrieve the column models used in the given table schema.
"""
uri = '/entity/{id}/column'.format(id=id_of(table))
for result in self._GET_paginated(uri, limit=limit, offset=offset):
yield Column(**result)
:param associateObjectIds: List of associated object Ids: If copying a file, the objectId is the synapse id,
and if copying a wiki attachment, the object id is the wiki subpage id.
(Must be the same length as fileHandles)
:param contentTypes: List of content types (Can change a filetype of a filehandle).
:param fileNames: List of filenames (Can change a filename of a filehandle).
:return: List of batch filehandle copy results, can include failureCodes: UNAUTHORIZED and
NOT_FOUND
"""
if (len(fileHandles) != len(associateObjectTypes) or len(fileHandles) != len(associateObjectIds)
or len(fileHandles) != len(contentTypes) or len(fileHandles) != len(fileNames)):
raise ValueError("Length of fileHandles, associateObjectTypes, and associateObjectIds must be the same")
fileHandles = [synapseclient.utils.id_of(handle) for handle in fileHandles]
copyFileHandleRequest = {"copyRequests": []}
for filehandleId, contentType, fileName, associateObjectType, associateObjectId \
in zip(fileHandles, contentTypes, fileNames, associateObjectTypes, associateObjectIds):
copyFileHandleRequest['copyRequests'].append({"newContentType": contentType,
"newFileName": fileName,
"originalFile": {"associateObjectType": associateObjectType,
"fileHandleId": filehandleId,
"associateObjectId": associateObjectId}})
copiedFileHandles = syn.restPOST('/filehandles/copy', body=json.dumps(copyFileHandleRequest),
endpoint=syn.fileHandleEndpoint)
_copy_cached_file_handles(syn.cache, copiedFileHandles)
return copiedFileHandles
:returns: the new or updated ACL
.. code-block:: python
{'resourceAccess': [
{'accessType': ['READ'],
'principalId': 222222}
]}
"""
if hasattr(entity, 'putACLURI'):
return self.restPUT(entity.putACLURI(), json.dumps(acl))
else:
# Get benefactor. (An entity gets its ACL from its benefactor.)
entity_id = id_of(entity)
uri = '/entity/%s/benefactor' % entity_id
benefactor = self.restGET(uri)
# Update or create new ACL
uri = '/entity/%s/acl' % entity_id
if benefactor['id']==entity_id:
return self.restPUT(uri, json.dumps(acl))
else:
return self.restPOST(uri,json.dumps(acl))
def _getFileHandle(self, fileHandle):
"""Retrieve a fileHandle from the fileHandle service (experimental)."""
uri = "/fileHandle/%s" % (id_of(fileHandle),)
return self.restGET(uri, endpoint=self.fileHandleEndpoint)
:param modify_benefactor: Set as True when modifying a benefactor's ACL
:param warn_if_inherits: Set as False, when creating a new ACL.
Trying to modify the ACL of an Entity that
inherits its ACL will result in a warning
:param overwrite: By default this function overwrites existing
permissions for the specified user. Set this
flag to False to add new permissions nondestructively.
:returns: an Access Control List object
Valid access types are: CREATE, READ, UPDATE, DELETE, CHANGE_PERMISSIONS, DOWNLOAD, PARTICIPATE, SUBMIT
"""
benefactor = self._getBenefactor(entity)
if benefactor['id'] != id_of(entity):
if modify_benefactor:
entity = benefactor
elif warn_if_inherits:
sys.stderr.write('Warning: Creating an ACL for entity %s, '
'which formerly inherited access control '
'from a benefactor entity, "%s" (%s).\n'
% (id_of(entity), benefactor['name'], benefactor['id']))
acl = self._getACL(entity)
principalId = self._getUserbyPrincipalIdOrName(principalId)
# Find existing permissions
permissions_to_update = None
for permissions in acl['resourceAccess']:
if 'principalId' in permissions and permissions['principalId'] == principalId: