Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def add_file_to_cache(i, cache_root_dir):
"""
Helper function for use in test_cache_concurrent_access
"""
my_cache = cache.Cache(cache_root_dir=cache_root_dir)
file_handle_ids = [1001, 1002, 1003, 1004, 1005]
random.shuffle(file_handle_ids)
for file_handle_id in file_handle_ids:
cache_dir = my_cache.get_cache_dir(file_handle_id)
file_path = os.path.join(cache_dir, "file_handle_%d_process_%02d.junk" % (file_handle_id, i))
utils.touch(file_path)
my_cache.add(file_handle_id, file_path)
def test_round_trip():
fhid = None
filepath = utils.make_bogus_binary_file(multipart_upload_module.MIN_PART_SIZE + 777771)
try:
fhid = multipart_upload(syn, filepath)
# Download the file and compare it with the original
junk = File(parent=project, dataFileHandleId=fhid)
junk.properties.update(syn._createEntity(junk.properties))
(tmp_f, tmp_path) = tempfile.mkstemp()
schedule_for_cleanup(tmp_path)
junk['path'] = syn._downloadFileHandle(fhid, junk['id'], 'FileEntity', tmp_path)
assert_true(filecmp.cmp(filepath, junk.path))
finally:
try:
if 'junk' in locals():
syn.delete(junk)
@utils.memoize
def getUserProfile(self, id=None, sessionToken=None, refresh=False):
"""
Get the details about a Synapse user.
Retrieves information on the current user if 'id' is omitted.
:param id: The 'userId' (aka 'ownerId') of a user or the userName
:param sessionToken: The session token to use to find the user profile
:param refresh: If set to True will always fetch the data from Synape otherwise
will used cached information
:returns: JSON-object
Example::
my_profile = syn.getUserProfile()
def __getFromFile(self, filepath, limitSearch=None):
"""
Gets a Synapse entityBundle based on the md5 of a local file
See :py:func:`synapseclient.Synapse.get`.
:param filepath: path to local file
:param limitSearch: Limits the places in Synapse where the file is searched for.
"""
results = self.restGET('/entity/md5/%s' %utils.md5_for_file(filepath).hexdigest())['results']
if limitSearch is not None:
#Go through and find the path of every entity found
paths = [self.restGET('/entity/%s/path' %ent['id']) for ent in results]
#Filter out all entities whose path does not contain limitSearch
results = [ent for ent, path in zip(results, paths) if
utils.is_in_path(limitSearch, path)]
if len(results)==0: #None found
raise SynapseError('File %s not found in Synapse' % (filepath,))
elif len(results)>1:
sys.stderr.write('\nWARNING: The file %s is associated with many entities in Synapse. '
'You can limit to a specific project or folder by setting the '
'limitSearch to a synapse Id. Will use the first one returned: '
'%s version %i\n' %(filepath, results[0]['id'], results[0]['versionNumber']))
entity = results[0]
bundle = self._getEntityBundle(entity)
cache.add_local_file_to_cache(path = filepath, **bundle['entity'])
return bundle
Locationables contain a signed S3 URL, which expire after a time,
so the Entity object passed to this method must have been recently acquired from Synapse.
**Deprecated** in favor of FileEntities, but still supported.
:returns: A file info dictionary with keys path, cacheDir, files
"""
results = DictObject()
if 'locations' not in entity or len(entity['locations']) == 0:
return results
location = entity['locations'][0] ## TODO: verify that this doesn't fail for unattached files
url = location['path']
utils.download_file(url, filename)
results.path = filename
if entity['contentType'] == 'application/zip':
# Unpack file
filepath = os.path.join(os.path.dirname(filename), os.path.basename(filename) + '_unpacked')
## TODO: !!!FIX THIS TO BE PATH SAFE! DON'T ALLOW ARBITRARY UNZIPING
z = zipfile.ZipFile(filename, 'r')
z.extractall(filepath) #WARNING!!!NOT SAFE
results['cacheDir'] = filepath
results['files'] = z.namelist()
else:
results['cacheDir'] = os.path.dirname(filename)
results['files'] = [os.path.basename(filename)]
return results
def _get_user_name(self, user_id):
if user_id not in self._user_name_cache:
self._user_name_cache[user_id] = utils.extract_user_name(self.getUserProfile(user_id))
return self._user_name_cache[user_id]
for result in results:
results_found = True
fmt_fields = {'name' : result['entity.name'],
'id' : result['entity.id'],
'padding' : ' ' * indent,
'slash_or_not' : '/' if is_container(result) else ''}
fmt_string = "{id}"
if long_format:
fmt_fields['createdOn'] = utils.from_unix_epoch_time(result['entity.createdOn']).strftime("%Y-%m-%d %H:%M")
fmt_fields['createdBy'] = self._get_user_name(result['entity.createdByPrincipalId'])[:18]
fmt_fields['version'] = result['entity.versionNumber']
fmt_string += " {version:3} {createdBy:>18} {createdOn}"
if show_modified:
fmt_fields['modifiedOn'] = utils.from_unix_epoch_time(result['entity.modifiedOn']).strftime("%Y-%m-%d %H:%M")
fmt_fields['modifiedBy'] = self._get_user_name(result['entity.modifiedByPrincipalId'])[:18]
fmt_string += " {modifiedBy:>18} {modifiedOn}"
fmt_string += " {padding}{name}{slash_or_not}\n"
out.write(fmt_string.format(**fmt_fields))
if (indent==0 or recursive) and is_container(result):
self._list(result['entity.id'], recursive=recursive, long_format=long_format, show_modified=show_modified, indent=indent+2, out=out)
if indent==0 and not results_found:
out.write('No results visible to {username} found for id {id}\n'.format(username=self.username, id=id_of(parent)))
def _generateSignedHeaders(self, url, headers=None):
"""Generate headers signed with the API key."""
if self.username is None or self.apiKey is None:
raise SynapseAuthenticationError("Please login")
if headers is None:
headers = dict(self.default_headers)
headers.update(synapseclient.USER_AGENT)
sig_timestamp = time.strftime(utils.ISO_FORMAT, time.gmtime())
url = urlparse.urlparse(url).path
sig_data = self.username + url + sig_timestamp
signature = base64.b64encode(hmac.new(self.apiKey, sig_data, hashlib.sha1).digest())
sig_header = {'userId' : self.username,
'signatureTimestamp' : sig_timestamp,
'signature' : signature}
headers.update(sig_header)
return headers