Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@access.public
@rawResponse
@describeRoute(None)
def rawWithDecorator(self, params):
return b'this is a raw response'
@access.public
@loadmodel(model='item', level=AccessType.READ)
def getDataset(self, item, params):
meta = item['meta']
if 'minerva' in meta:
return meta['minerva']
else:
return {}
getDataset.description = (
@access.public
def geocode(self, params):
"""Return a list of geojson points matching the given name."""
folder = self.geonames_folder()
if folder is None:
raise RestException('Geocoding not configured')
results = list(self.model('item').textSearch(
params.get('name'),
user=self.getCurrentUser(),
filters={'folderId': folder},
limit=params.get('limit', 10)
))
geo = {
"type": "FeatureCollection",
"features": []
@access.user
@filtermodel(model='group')
@autoDescribeRoute(
Description('Create a new group.')
.responseClass('Group')
.notes('Must be logged in.')
.param('name', 'Unique name for the group.', strip=True)
.param('description', 'Description of the group.', required=False,
default='', strip=True)
.param('public', 'Whether the group should be publicly visible.',
required=False, dataType='boolean', default=False)
.errorResponse()
.errorResponse('Write access was denied on the parent', 403)
)
def createGroup(self, name, description, public, params):
return self.model('group').createGroup(
name=name, creator=self.getCurrentUser(), description=description, public=public)
@access.user
@loadmodel(map={'bsveSourceId': 'source'}, model='item',
level=AccessType.READ)
def createBsveSearchDataset(self, source, params):
self.requireParams('name', params)
name = params['name']
# get info from the source item
metadata = source.get('meta', {}).get('minerva', {})
if metadata.get('source_type') != 'bsve':
raise RestException('Invalid source type', 403)
bp = metadata
username = bp['username']
apikey = bp['apikey']
secretkey = decryptCredentials(bp['secretkey'])
@access.admin
@loadmodel(model='assetstore')
@describeRoute(
Description('Import a data hierarchy from an HDFS instance.')
.notes('Only site administrators may use this endpoint.')
.param('id', 'The ID of the assetstore representing the HDFS instance.',
paramType='path')
.param('parentId', 'The ID of the parent object in the Girder data '
'hierarchy under which to import the files.')
.param('parentType', 'The type of the parent object to import into.',
enum=('folder', 'user', 'collection'), required=False)
.param('path', 'Root of the directory structure (relative to the root '
'of the HDFS) to import.')
.param('progress', 'Whether to record progress on this operation ('
'default=False)', required=False, dataType='boolean')
.errorResponse()
.errorResponse('You are not an administrator.', 403)
@access.user
@filtermodel(JobModel)
@autoDescribeRoute(
Description('Cancel a job by ID.')
.modelParam('id', 'The ID of the job.', model=JobModel, level=AccessType.WRITE,
includeLog=False)
.errorResponse('ID was invalid.')
.errorResponse('Write access was denied for the job.', 403))
def cancelJob(self, job, params):
return self._model.cancelJob(job)
@access.public
def _authorizeUploadStep(event):
"""
Called before any requests dealing with partially completed uploads. Sets the
request thread user to the authorized upload token creator if the requested
upload is an authorized upload.
"""
token = getCurrentToken()
uploadId = ObjectId(event.info['params'].get('uploadId'))
if token and 'authorizedUploadId' in token and token['authorizedUploadId'] == uploadId:
user = User().load(token['userId'], force=True)
setCurrentUser(user)
@autoDescribeRoute(
Description('Get worker status and task information.')
.notes('Return -1 if the broker is inaccessible.')
)
@access.user(scope=TokenScope.DATA_READ)
def getWorkerStatus(self):
app = getCeleryApp()
result = {}
conn = app.connection_for_read()
try:
conn.ensure_connection(max_retries=1)
except celery.exceptions.OperationalError:
return -1
status = app.control.inspect()
result['report'] = status.report()
result['stats'] = status.stats()
result['ping'] = status.ping()
result['active'] = status.active()
result['reserved'] = status.reserved()
return result
@autoDescribeRoute(
Description('Get worker status and task information.')
.notes('Return -1 if the broker is inaccessible.')
)
@access.user(scope=TokenScope.DATA_READ)
def getWorkerStatus(self):
app = getCeleryApp()
result = {}
conn = app.connection_for_read()
try:
conn.ensure_connection(max_retries=1)
except celery.exceptions.OperationalError:
return -1
status = app.control.inspect()
result['report'] = status.report()
result['stats'] = status.stats()
result['ping'] = status.ping()
result['active'] = status.active()
result['reserved'] = status.reserved()
return result