Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
workflow_id = workflow["id"]
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 0)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 1)
self.assertEqual(invocations[0]["id"], invocation_id)
def test_import_publish_workflow_from_local_path(self):
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
imported_wf = self.gi.workflows.import_workflow_from_local_path(path, publish=True)
self.assertIsInstance(imported_wf, dict)
self.assertFalse(imported_wf['deleted'])
self.assertTrue(imported_wf['published'])
@test_util.skip_unless_galaxy('release_19.09')
def test_update_dataset_tags(self):
datasets = self.gi.libraries.upload_file_contents(self.library['id'], FOO_DATA)
dataset_show = self.gi.libraries.show_dataset(self.library['id'], datasets[0]['id'])
self.assertEqual(dataset_show['tags'], "")
updated_dataset = self.gi.libraries.update_library_dataset(datasets[0]['id'], tags=["name:foobar", "barfoo"])
dataset_show = self.gi.libraries.show_dataset(self.library['id'], updated_dataset['id'])
self.assertEqual(dataset_show['tags'], 'name:foobar, barfoo')
@test_util.skip_unless_galaxy('release_19.09')
def test_upload_file_contents_with_tags(self):
datasets = self.gi.libraries.upload_file_contents(self.library['id'], FOO_DATA, tags=["name:foobar", "barfoo"])
dataset_show = self.gi.libraries.show_dataset(self.library['id'], datasets[0]['id'])
self.assertEqual(dataset_show['tags'], 'name:foobar, barfoo')
"""
Tests the functionality of the Blend CloudMan API. These tests require working
credentials to supported cloud infrastructure.
"""
from bioblend.cloudman import CloudManConfig, CloudManInstance
from . import CloudmanTestBase, test_util
@test_util.skip_unless_cloudman()
class TestCloudmanServices(CloudmanTestBase.CloudmanTestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.cfg = CloudManConfig(cls.access_key, cls.secret_key, cls.cluster_name, cls.ami_id, cls.instance_type, cls.password, cloud_metadata=cls.cloud_metadata)
cls.cmi = CloudManInstance.launch_instance(cls.cfg)
def setUp(self):
self.cmi = self.__class__.cmi
def test_get_status(self):
status = self.cmi.get_status()
self.assertIsNotNone(status)
def test_get_nodes(self):
def test_invocation(self):
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
dataset = {'src': 'hda', 'id': dataset1_id}
invocation = self.gi.workflows.invoke_workflow(
workflow['id'],
inputs={'Input 1': dataset, 'Input 2': dataset},
history_id=history_id,
inputs_by='name',
)
invocation_id = invocation["id"]
invocations = self.gi.invocations.get_invocations()
self.assertEqual(len(invocations), 1)
self.assertEqual(invocations[0]["id"], invocation_id)
import os
import bioblend
import bioblend.galaxy
from . import test_util
from .test_util import unittest
bioblend.set_stream_logger('test', level='INFO')
BIOBLEND_TEST_JOB_TIMEOUT = int(os.environ.get("BIOBLEND_TEST_JOB_TIMEOUT", "60"))
@test_util.skip_unless_galaxy()
class GalaxyTestBase(unittest.TestCase):
def setUp(self):
galaxy_key = os.environ['BIOBLEND_GALAXY_API_KEY']
galaxy_url = os.environ['BIOBLEND_GALAXY_URL']
self.gi = bioblend.galaxy.GalaxyInstance(url=galaxy_url, key=galaxy_key)
def _test_dataset(self, history_id, contents="1\t2\t3", **kwds):
tool_output = self.gi.tools.paste_content(contents, history_id, **kwds)
return tool_output["outputs"][0]["id"]
def _wait_and_verify_dataset(self, dataset_id, expected_contents, timeout_seconds=BIOBLEND_TEST_JOB_TIMEOUT):
dataset_contents = self.gi.datasets.download_dataset(dataset_id, maxwait=timeout_seconds)
self.assertEqual(dataset_contents, expected_contents)
def test_upload_file(self):
history = self.gi.histories.create_history(name="test_upload_file history")
fn = test_util.get_abspath("test_util.py")
file_name = "test1"
tool_output = self.gi.tools.upload_file(
fn,
# First param could be a regular path also of course...
history_id=history["id"],
file_name=file_name,
dbkey="?",
file_type="txt",
)
self._wait_for_and_verify_upload(tool_output, file_name, fn, expected_dbkey="?")
def test_import_history(self):
path = test_util.get_abspath(os.path.join('data', 'Galaxy-History-test.tar.gz'))
self.gi.histories.import_history(file_path=path)
@test_util.skip_unless_tool("cat")
def test_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
def invocation_steps_by_order_index():
invocation = self.gi.invocations.show_invocation(invocation_id)
return dict((s["order_index"], s) for s in invocation["steps"])