Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_task_process(self, task):
batch_system = BatchSystems(get_setting("batch_system", default=BatchSystems.lsf, task=task))
if batch_system == BatchSystems.lsf:
process_class = LSFProcess
elif batch_system == BatchSystems.htcondor:
process_class = HTCondorProcess
elif batch_system == BatchSystems.gbasf2:
process_class = Gbasf2Process
elif batch_system == BatchSystems.test:
process_class = TestProcess
elif batch_system == BatchSystems.local:
create_output_dirs(task)
return super()._create_task_process(task)
else:
raise NotImplementedError
return process_class(task=task, scheduler=self._scheduler,
result_queue=self._task_result_queue, worker_timeout=self._config.timeout)
def _create_task_process(self, task):
batch_system = BatchSystems(get_setting("batch_system", default=BatchSystems.lsf, task=task))
if batch_system == BatchSystems.lsf:
process_class = LSFProcess
elif batch_system == BatchSystems.htcondor:
process_class = HTCondorProcess
elif batch_system == BatchSystems.gbasf2:
process_class = Gbasf2Process
elif batch_system == BatchSystems.test:
process_class = TestProcess
elif batch_system == BatchSystems.local:
create_output_dirs(task)
return super()._create_task_process(task)
else:
raise NotImplementedError
return process_class(task=task, scheduler=self._scheduler,
result_queue=self._task_result_queue, worker_timeout=self._config.timeout)
def _create_task_process(self, task):
batch_system = BatchSystems(get_setting("batch_system", default=BatchSystems.lsf, task=task))
if batch_system == BatchSystems.lsf:
process_class = LSFProcess
elif batch_system == BatchSystems.htcondor:
process_class = HTCondorProcess
elif batch_system == BatchSystems.gbasf2:
process_class = Gbasf2Process
elif batch_system == BatchSystems.test:
process_class = TestProcess
elif batch_system == BatchSystems.local:
create_output_dirs(task)
return super()._create_task_process(task)
else:
raise NotImplementedError
return process_class(task=task, scheduler=self._scheduler,
result_queue=self._task_result_queue, worker_timeout=self._config.timeout)
def _create_task_process(self, task):
batch_system = BatchSystems(get_setting("batch_system", default=BatchSystems.lsf, task=task))
if batch_system == BatchSystems.lsf:
process_class = LSFProcess
elif batch_system == BatchSystems.htcondor:
process_class = HTCondorProcess
elif batch_system == BatchSystems.gbasf2:
process_class = Gbasf2Process
elif batch_system == BatchSystems.test:
process_class = TestProcess
elif batch_system == BatchSystems.local:
create_output_dirs(task)
return super()._create_task_process(task)
else:
raise NotImplementedError
return process_class(task=task, scheduler=self._scheduler,
result_queue=self._task_result_queue, worker_timeout=self._config.timeout)
def _create_task_process(self, task):
batch_system = BatchSystems(get_setting("batch_system", default=BatchSystems.lsf, task=task))
if batch_system == BatchSystems.lsf:
process_class = LSFProcess
elif batch_system == BatchSystems.htcondor:
process_class = HTCondorProcess
elif batch_system == BatchSystems.gbasf2:
process_class = Gbasf2Process
elif batch_system == BatchSystems.test:
process_class = TestProcess
elif batch_system == BatchSystems.local:
create_output_dirs(task)
return super()._create_task_process(task)
else:
raise NotImplementedError
return process_class(task=task, scheduler=self._scheduler,
result_queue=self._task_result_queue, worker_timeout=self._config.timeout)
def _create_task_process(self, task):
batch_system = BatchSystems(get_setting("batch_system", default=BatchSystems.lsf, task=task))
if batch_system == BatchSystems.lsf:
process_class = LSFProcess
elif batch_system == BatchSystems.htcondor:
process_class = HTCondorProcess
elif batch_system == BatchSystems.gbasf2:
process_class = Gbasf2Process
elif batch_system == BatchSystems.test:
process_class = TestProcess
elif batch_system == BatchSystems.local:
create_output_dirs(task)
return super()._create_task_process(task)
else:
raise NotImplementedError
return process_class(task=task, scheduler=self._scheduler,