Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@test_util.skip_unless_tool("cat")
def test_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
workflow_id = workflow["id"]
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 0)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
invocations = self.gi.workflows.get_invocations(workflow_id)
@test_util.skip_unless_tool("random_lines1")
def test_run_random_lines(self):
# Run second test case from randomlines.xml
history_id = self.gi.histories.create_history(name="test_run_random_lines history")["id"]
with open(test_util.get_abspath(os.path.join("data", "1.bed"))) as f:
contents = f.read()
dataset_id = self._test_dataset(history_id, contents=contents)
tool_inputs = inputs().set(
"num_lines", "1"
).set(
"input", dataset(dataset_id)
).set(
"seed_source", conditional().set(
"seed_source_selector", "set_seed"
).set(
"seed", "asdf"
)
@test_util.skip_unless_tool("cat1")
@test_util.skip_unless_tool("cat")
def test_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
workflow_id = workflow["id"]
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 0)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
@test_util.skip_unless_tool("cat1")
def test_run_cat1(self):
history_id = self.gi.histories.create_history(name="test_run_cat1 history")["id"]
dataset1_id = self._test_dataset(history_id, contents="1 2 3")
dataset2_id = self._test_dataset(history_id, contents="4 5 6")
dataset3_id = self._test_dataset(history_id, contents="7 8 9")
tool_inputs = inputs().set(
"input1", dataset(dataset1_id)
).set(
"queries", repeat().instance(
inputs().set("input2", dataset(dataset2_id))
).instance(
inputs().set("input2", dataset(dataset3_id))
)
)
tool_output = self.gi.tools.run_tool(
history_id=history_id,
@test_util.skip_unless_tool("cat1")
@test_util.skip_unless_tool("cat")
def test_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
def invocation_steps_by_order_index():
invocation = self.gi.invocations.show_invocation(invocation_id)
return dict((s["order_index"], s) for s in invocation["steps"])
@test_util.skip_unless_tool("cat")
def test_cancelling_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
workflow_id = workflow["id"]
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 0)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
invocations = self.gi.workflows.get_invocations(workflow_id)
@test_util.skip_unless_tool("cat1")
@test_util.skip_unless_tool("cat")
def test_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
workflow_id = workflow["id"]
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 0)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
@test_util.skip_unless_tool("cat")
def test_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
workflow_id = workflow["id"]
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 0)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
invocations = self.gi.workflows.get_invocations(workflow_id)
@test_util.skip_unless_tool("cat")
def test_cancelling_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
workflow_id = workflow["id"]
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 0)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
invocations = self.gi.workflows.get_invocations(workflow_id)