Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_none(self):
prov = stub.StubJobProvider()
job_suc = {'job-id': 'job_suc', 'status': ('SUCCESS', '123')}
job_fail = {'job-id': 'job_fail', 'status': ('FAILURE', '123')}
prov.set_operations([job_suc, job_fail])
tasks = prov.lookup_job_tasks(None)
self.assertEqual(raw_ops(tasks), [job_suc, job_fail])
def test_already_succeeded(self):
prov = stub.StubJobProvider()
prov.set_operations([{'job-id': 'myjob', 'status': ('SUCCESS', '123')}])
establish_chronology(nothing_happens())
ret = dsub_command.wait_after(prov, ['myjob'], 1, True)
self.assertEqual(ret, [])
def test_job_1(self):
self.prov = stub.StubJobProvider()
establish_chronology(self.progressive_chronology())
ret = dsub_command.wait_after(self.prov, ['job-1'], 1, True)
self.assertEqual(ret, [])
def test_get_several(self):
prov = stub.StubJobProvider()
job_suc = {'job-id': 'job_suc', 'status': ('SUCCESS', '123')}
job_fail = {'job-id': 'job_fail', 'status': ('FAILURE', '123')}
job_run = {'job-id': 'job_run', 'status': ('RUNNING', '123')}
prov.set_operations([job_suc, job_fail, job_run])
tasks = prov.lookup_job_tasks(['SUCCESS', 'FAILURE'])
self.assertEqual(raw_ops(tasks), [job_suc, job_fail])
def test_job_2(self):
self.prov = stub.StubJobProvider()
establish_chronology(self.progressive_chronology())
ret = dsub_command.wait_after(self.prov, ['job-2'], 1, True)
self.assertEqual(ret, [['failed to frob']])
def test_succeeds(self):
self.prov = stub.StubJobProvider()
establish_chronology(self.progressive_chronology())
ret = dsub_command.wait_for_any_job(self.prov, ['job-1'], 1)
self.assertEqual(ret, set([]))
id (str): Job ID to be aborted
Returns: None
"""
# Attempt is unused in aborting because only one attempt can be running at
# a time.
proj_id, job_id, task_id, _ = job_ids.api_to_dsub(id, _provider_type())
provider = providers.get_provider(_provider_type(), proj_id, _auth_token())
# TODO(bryancrampton): Add flag to ddel to support deleting only
# 'singleton' tasks.
status = get_job(id).status
# TODO(https://github.com/googlegenomics/dsub/issues/81): Remove this
# provider-specific logic
if isinstance(provider, stub.StubJobProvider):
status = status[0]
if status != job_statuses.ApiStatus.RUNNING:
raise PreconditionFailed(
'Job already in terminal status `{}`'.format(status))
# TODO(https://github.com/googlegenomics/dsub/issues/92): Remove this
# hacky re-routing of stdout once dsub removes it from the python API
deleted = execute_redirect_stdout(
lambda: ddel.ddel_tasks(provider=provider,
job_ids={job_id},
task_ids={task_id} if task_id else None))
if len(deleted) != 1:
raise InternalServerError('Failed to abort dsub job')
Returns:
dict: raw JSON metadata for the aborted job or task
"""
# If task-id is not specified, pass None instead of [None]
task_list = [task_id] if task_id else None
# TODO(bryancrampton): Add flag to ddel to support deleting only
# 'singleton' tasks. For now, this will raise an error if more than one
# jobs or no jobs are found for the given job-id and task-id. Also
# ensure status is not terminal before aborting.
job = self.get_job(provider, job_id, task_id)
status = job['status']
# TODO(https://github.com/googlegenomics/dsub/issues/81): Remove this
# provider-specific logic
if isinstance(provider, stub.StubJobProvider):
status = status[0]
if status != DsubStatus.RUNNING:
raise PreconditionFailed(
'Job already in terminal status `{}`'.format(job['status']))
# TODO(https://github.com/googlegenomics/dsub/issues/92): Remove this
# hacky re-routing of stdout once dsub removes it from the python API
deleted = execute_redirect_stdout(lambda:
ddel.ddel_tasks(
provider=provider, job_list=[job_id], task_list=task_list))
if len(deleted) != 1:
raise InternalServerError('Failed to abort dsub job')
Returns:
JobProvider: Instance of LocalJobProvider, GoogleJobProvider, or
StubJobProvider.
"""
if provider_type in [ProviderType.GOOGLE, ProviderType.GOOGLE_V2]:
return _get_google_provider(project_id, auth_token, provider_type)
elif project_id or auth_token:
raise BadRequest(
'The Local provider does not support the `{}` field .'.format(
'authToken' if auth_token else 'parentId'))
elif provider_type == ProviderType.LOCAL:
# TODO(https://github.com/googlegenomics/dsub/issues/93): Remove
# resources parameter and import
return local.LocalJobProvider(resources)
elif provider_type == ProviderType.STUB:
return stub.StubJobProvider()