Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def cleanup(items):
"""cleanup junk created during testing"""
for item in reversed(items):
if isinstance(item, Entity) or utils.is_synapse_id(item) or hasattr(item, 'deleteURI'):
try:
syn.delete(item)
except Exception as ex:
if hasattr(ex, 'response') and ex.response.status_code in [404, 403]:
pass
else:
print("Error cleaning up entity: " + str(ex))
elif isinstance(item, six.string_types):
if os.path.exists(item):
try:
if os.path.isdir(item):
shutil.rmtree(item)
else: # Assume that remove will work on anything besides folders
os.remove(item)
except Exception as ex:
print(ex)
def printEntity(self, entity):
"""Pretty prints an Entity."""
if utils.is_synapse_id(entity):
entity = self._getEntity(entity)
try:
print json.dumps(entity, sort_keys=True, indent=2)
except TypeError:
print str(entity)
def _getBenefactor(self, entity):
"""An Entity gets its ACL from its benefactor."""
if utils.is_synapse_id(entity) or synapseclient.entity.is_synapse_entity(entity):
return self.restGET('/entity/%s/benefactor' % id_of(entity))
return entity
:param x: a list of column headers, a Schema, a TableSchema's Synapse ID, or a string prefix
:Return: a generator of Column objects
"""
if x is None:
uri = '/column'
for result in self._GET_paginated(uri, limit=limit, offset=offset):
yield Column(**result)
elif isinstance(x, (list, tuple)):
for header in x:
try:
int(header)
yield self.getColumn(header)
except ValueError:
pass
elif isinstance(x, Schema) or utils.is_synapse_id(x):
uri = '/entity/{id}/column'.format(id=id_of(x))
for result in self._GET_paginated(uri, limit=limit, offset=offset):
yield Column(**result)
elif isinstance(x, basestring):
uri = '/column?prefix=' + x
for result in self._GET_paginated(uri, limit=limit, offset=offset):
yield Column(**result)
else:
ValueError("Can't get columns for a %s" % type(x))
- :py:func:`synapseutils.sync.syncToSynapse`
Example:
Download and print the paths of all downloaded files::
entities = syncFromSynapse(syn, "syn1234")
for f in entities:
print(f.path)
"""
# initialize the result list
if allFiles is None:
allFiles = list()
# perform validation check on user input
if is_synapse_id(entity):
entity = syn.get(entity, downloadLocation=path, ifcollision=ifcollision, followLink=followLink)
if isinstance(entity, File):
allFiles.append(entity)
return allFiles
entity_id = id_of(entity)
if not is_container(entity):
raise ValueError("The provided id: %s is neither a container nor a File" % entity_id)
# get the immediate children as iterator
children = syn.getChildren(entity_id)
# process each child
for child in children:
if is_container(child):