Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@loadmodel(model='volume', plugin='cumulus', level=AccessType.ADMIN)
def attach(self, volume, cluster, params):
body = getBodyJson()
self.requireParams(['path'], body)
path = body['path']
profile_id = parse('profileId').find(volume)[0].value
profile, secret_key = _get_profile(profile_id)
girder_callback_info = {
'girder_api_url': cumulus.config.girder.baseUrl,
'girder_token': get_task_token()['_id']}
log_write_url = '%s/volumes/%s/log' % (cumulus.config.girder.baseUrl,
volume['_id'])
p = CloudProvider(dict(secretAccessKey=secret_key, **profile))
@access.user(scope=TokenScope.DATA_WRITE)
@loadmodel(model='case', plugin='digital_slide_archive',
level=AccessType.WRITE)
@describeRoute(
Description('Create or replace case metadata')
.param('id', 'The id of the case', paramType='path')
.param('table', 'The table to update', paramType='path')
.param('body', 'A JSON object containing the metadata to create',
paramType='body')
)
def setCaseMetadata(self, case, table, params):
metadata = self.getBodyJson()
caseModel = self.model('case', 'digital_slide_archive')
for k in metadata:
if not len(k) or '.' in k or k[0] == '$':
raise RestException(
'Invalid key name'
)
@loadmodel(model='annotation', plugin='large_image', getElements=False, level=AccessType.WRITE)
def deleteAnnotation(self, annotation, params):
# Ensure that we have write access to the parent item
item = Item().load(annotation.get('itemId'), force=True)
if item is not None:
Item().requireAccess(
item, user=self.getCurrentUser(), level=AccessType.WRITE)
Annotation().remove(annotation)
@loadmodel(map={'userId': 'user'}, model='user', level=AccessType.READ)
def getSessionFolder(self, user, params):
folder = findSessionFolder(self.getCurrentUser(), user, create=True)
return {'folder': folder}
getSessionFolder.description = (
@loadmodel(model='assetstore')
def create_file(self, assetstore, params):
params = getBodyJson()
self.requireParams(('name', 'itemId', 'size', 'path'), params)
name = params['name']
item_id = params['itemId']
size = int(params['size'])
path = params['path']
user = self.getCurrentUser()
mime_type = params.get('mimeType')
item = ModelImporter.model('item').load(id=item_id, user=user,
level=AccessType.WRITE, exc=True)
file = ModelImporter.model('file').createFile(
name=name, creator=user, item=item, reuseExisting=True,
assetstore=assetstore, mimeType=mime_type, size=size)
@access.public
@loadmodel(model='cohort', plugin='digital_slide_archive',
level=AccessType.READ)
@describeRoute(
Description('List slides in a cohort')
.param('id', 'The id of the cohort', paramType='path')
.pagingParams(defaultSort='name')
)
def cohortListSlides(self, cohort, params):
limit, offset, sort = self.getPagingParameters(params, 'name')
cursor = self.model('slide', 'digital_slide_archive').find({
'tcga.cohort': cohort['name']
}, cursor=True, limit=limit, offset=offset, sort=sort)
return pagedResponse(cursor, limit, offset, sort)
@loadmodel(model='challenge', plugin='covalic', level=AccessType.ADMIN)
@filtermodel(model='challenge', plugin='covalic', addFields={'access'})
@describeRoute(
Description('Set the access control list for a challenge.')
.param('id', 'The ID of the challenge.', paramType='path')
.param('access', 'The access control list as JSON.')
.param('public', 'Whether the challenge should be publicly visible.',
dataType='boolean')
.errorResponse('ID was invalid.')
.errorResponse('Admin permission denied on the challenge.', 403)
)
def updateAccess(self, challenge, params):
self.requireParams('access', params)
public = self.boolParam('public', params, default=False)
Challenge().setPublic(challenge, public)
@loadmodel(map={'userId': 'user'}, model='user', level=AccessType.READ)
def getSourceFolder(self, user, params):
folder = findSourceFolder(self.getCurrentUser(), user, create=True)
return {'folder': folder}
getSourceFolder.description = (
@access.public
@loadmodel(model='item', level=AccessType.WRITE)
def geocodeTweet(self, item, params):
# output is a file id for a geojson file in the item
fileId = self._findFileId(item)
itemId = str(item['_id'])
tmpdir = self._createWorkDir(itemId)
self._downloadItemFiles(itemId, tmpdir)
# WARNING: Expecting only one file per item
num_files = self.model('item').childFiles(item).count()
files = self.model('item').childFiles(item)
if (num_files == 1):
geocodedFile = self._geocodeTweet(files[0], item, tmpdir)
self._addFileToItem(itemId, geocodedFile)
self._cleanWorkDir(tmpdir)
fileId = self._findFileId(item)
return {'_id': fileId}
@loadmodel(model='user', level=AccessType.READ)
def get_profiles(user, params):
user = getCurrentUser()
model = ModelImporter.model('aws', 'cumulus')
limit = params.get('limit', 50)
profiles = model.find_profiles(user['_id'])
profiles = model.filterResultsByPermission(profiles, user, AccessType.READ,
limit=int(limit))
return [model.filter(profile, user) for profile in profiles]