Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@describeRoute(
Description('Get something.')
)
def getResource(self, params):
return ['custom REST route']
@describeRoute(
Description('Test canceling a queued task'))
def test_traditional_task_cancel_in_queue(self, params):
# Fill up queue
blockers = []
for _ in range(0, multiprocessing.cpu_count()):
blockers .append(cancelable.delay(sleep_interval=0.1))
jobModel = self.model('job', 'jobs')
job = jobModel.createJob(
title='test_traditional_task_cancel',
type='worker', handler='worker_handler',
user=self.getCurrentUser(), public=False, args=(self.girder_worker_run_cancelable,),
kwargs={'inputs': {},
'outputs': {}})
job['kwargs']['jobInfo'] = utils.jobInfoSpec(job)
@describeRoute(
Description('Test basic docker_run.'))
def test_docker_run(self, params):
result = docker_run.delay(
TEST_IMAGE, pull_image=True, container_args=['stdio', '-m', 'hello docker!'],
remove_container=True)
return result.job
@describeRoute(
Description('Look up a resource in the data hierarchy by path.')
.param('path',
'The path of the resource. The path must be an absolute Unix '
'path starting with either "/user/[user name]", for a user\'s '
'resources or "/collection/[collection name]", for resources '
'under a collection.')
.errorResponse('Path is invalid.')
.errorResponse('Path refers to a resource that does not exist.')
.errorResponse('Read access was denied for the resource.', 403)
)
def lookup(self, params):
self.requireParams('path', params)
return self._lookUpPath(params['path'], self.getCurrentUser())
@access.public(scope=TokenScope.DATA_READ)
@loadmodel(model='aperio', plugin='digital_slide_archive',
level=AccessType.READ)
@describeRoute(
Description('Get an Aperio document by id')
.param('id', 'The id of the Aperio item', paramType='path')
)
def getAperio(self, aperio, params):
return aperio
@describeRoute(
Description('Delete cached associated image files from large_image items.')
)
@access.admin
def deleteAssociatedImages(self, params):
return self._deleteCachedImages(None, associatedImages=True)
@describeRoute(
Description('Returns all distinct values for all columns for a given table')
.param('assetstoreId', 'assetstore ID of the target database')
.param('table', 'Table name from the database')
)
def getAllValues(self, assetstore, params):
resp = {}
for i in self._getColumns(assetstore, params):
if i['name'] != 'geom' and i['datatype'] != 'number':
resp[i['name']] = self._getValues(assetstore, {
'column': i['name'],
'table': params['table']})
return resp
@describeRoute(
Description('Update the properties of a challenge.')
.param('id', 'The ID of the challenge.', paramType='path')
.param('name', 'The name for this challenge.', required=False)
.param('description', 'Description for this challenge.', required=False)
.param('instructions', 'Instructions to participants for this '
'challenge.', required=False)
.param('organizers', 'The organizers of the challenge.', required=False)
.param('startDate', 'The start date of the challenge '
'(ISO 8601 format).', dataType='dateTime', required=False)
.param('endDate', 'The end date of the challenge (ISO 8601 format).',
dataType='dateTime', required=False)
.errorResponse('ID was invalid.')
.errorResponse('Write permission denied on the challenge.', 403)
)
def updateChallenge(self, challenge, params):
challenge['name'] = params.get('name', challenge['name']).strip()
@describeRoute(
Description('Get the task ')
.param(
'id',
'The id of task',
required=True, paramType='path')
)
def get(self, task, params):
return task
@describeRoute(
Description('Delete the taskflow')
.param(
'id',
'The id of taskflow',
required=True, paramType='path')
)
def delete(self, taskflow, params):
user = self.getCurrentUser()
status = self._model.status(user, taskflow)
if status == TaskFlowState.RUNNING:
raise RestException('Taskflow is running', 400)
constructor = load_class(taskflow['taskFlowClass'])
token = ModelImporter.model('token').createToken(user=user, days=7)