Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Run an instance of Girder worker, connected to rabbitmq. The rabbitmq
service must be running.
"""
broker = 'amqp://guest@127.0.0.1'
Setting().set(WorkerSettings.BROKER, broker)
Setting().set(WorkerSettings.BACKEND, broker)
env = os.environ.copy()
env['C_FORCE_ROOT'] = 'true'
proc = subprocess.Popen([
'celery', '-A', 'girder_worker.app', 'worker', '--broker', broker, '--concurrency=1'],
close_fds=True, env=env)
yield True
proc.terminate()
proc.wait()
Setting().unset(WorkerSettings.BROKER)
Setting().unset(WorkerSettings.BACKEND)
Description('Test girder client with no token can\'t access protected resources'))
def test_celery_girder_client_bad_token_fails(self, params):
result = request_private_path.delay('admin', girder_client_token='')
return result.job
def assertAccessDenied(self, response, level, modelName, user=None):
if level == AccessType.READ:
ls = 'Read'
elif level == AccessType.WRITE:
ls = 'Write'
else:
ls = 'Admin'
if user is None:
self.assertStatus(response, 401)
else:
self.assertStatus(response, 403)
self.assertEqual('%s access denied for %s.' % (ls, modelName),
response.json['message'])
@access.public
@describeRoute(
Description('Get something.')
)
def getResource(self, params):
return ['custom REST route']
@access.public
@describeRoute(
Description('Get something.')
)
def getResource(self, params):
return ['custom REST route']
@access.public
@rawResponse
@describeRoute(None)
def rawReturningText(self, params):
setResponseHeader('Content-Type', 'text/plain')
return u'this is not encoded \U0001F44D'
@describeRoute(
Description('Get something.')
)
def getResource(self, params):
return ['custom REST route']
@describeRoute(
Description('Test canceling a queued task'))
def test_traditional_task_cancel_in_queue(self, params):
# Fill up queue
blockers = []
for _ in range(0, multiprocessing.cpu_count()):
blockers .append(cancelable.delay(sleep_interval=0.1))
jobModel = self.model('job', 'jobs')
job = jobModel.createJob(
title='test_traditional_task_cancel',
type='worker', handler='worker_handler',
user=self.getCurrentUser(), public=False, args=(self.girder_worker_run_cancelable,),
kwargs={'inputs': {},
'outputs': {}})
job['kwargs']['jobInfo'] = utils.jobInfoSpec(job)
@describeRoute(
Description('Test basic docker_run.'))
def test_docker_run(self, params):
result = docker_run.delay(
TEST_IMAGE, pull_image=True, container_args=['stdio', '-m', 'hello docker!'],
remove_container=True)
return result.job
@access.public
@rawResponse
@describeRoute(None)
def rawWithDecorator(self, params):
return b'this is a raw response'