Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_update_workflow_name(self):
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
wf = self.gi.workflows.import_workflow_from_local_path(path)
new_name = 'new name'
updated_wf = self.gi.workflows.update_workflow(wf['id'], name=new_name)
self.assertEqual(updated_wf['name'], new_name)
import tempfile
import uuid
from ssl import SSLError
from urllib.error import URLError
from urllib.request import urlopen
import bioblend
import bioblend.galaxy.objects.galaxy_instance as galaxy_instance
import bioblend.galaxy.objects.wrappers as wrappers
from bioblend.galaxy import dataset_collections
from . import test_util
from .test_util import unittest
bioblend.set_stream_logger('test', level='INFO')
socket.setdefaulttimeout(10.0)
SAMPLE_FN = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
SAMPLE_WF_COLL_FN = test_util.get_abspath(os.path.join('data', 'paste_columns_collections.ga'))
SAMPLE_WF_PARAMETER_INPUT_FN = test_util.get_abspath(os.path.join('data', 'workflow_with_parameter_input.ga'))
FOO_DATA = 'foo\nbar\n'
FOO_DATA_2 = 'foo2\nbar2\n'
SAMPLE_WF_DICT = {
'deleted': False,
'id': '9005c5112febe774',
'inputs': {
'571': {'label': 'Input Dataset', 'value': ''},
'572': {'label': 'Input Dataset', 'value': ''},
},
'model_class': 'StoredWorkflow',
'name': 'paste_columns',
'published': False,
'steps': {
'571': {
def test_import_publish_workflow_dict(self):
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
with open(path, 'r') as f:
wf_dict = json.load(f)
imported_wf = self.gi.workflows.import_workflow_dict(wf_dict, publish=True)
self.assertIsInstance(imported_wf, dict)
self.assertFalse(imported_wf['deleted'])
self.assertTrue(imported_wf['published'])
def test_get_workflows(self):
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
wf = self.gi.workflows.import_workflow_from_local_path(path)
all_wfs = self.gi.workflows.get_workflows()
self.assertGreater(len(all_wfs), 0)
wf_data = self.gi.workflows.get_workflows(workflow_id=wf['id'])[0]
self.assertEqual(wf['id'], wf_data['id'])
self.assertEqual(wf['name'], wf_data['name'])
self.assertEqual(wf['url'], wf_data['url'])
wf_data_list = self.gi.workflows.get_workflows(name=wf['name'])
self.assertTrue(any(_['id'] == wf['id'] for _ in wf_data_list))
def test_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
workflow_id = workflow["id"]
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 0)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 1)
self.assertEqual(invocations[0]["id"], invocation_id)
def test_import_publish_workflow_from_local_path(self):
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
imported_wf = self.gi.workflows.import_workflow_from_local_path(path, publish=True)
self.assertIsInstance(imported_wf, dict)
self.assertFalse(imported_wf['deleted'])
self.assertTrue(imported_wf['published'])
def test_import_export_workflow_from_local_path(self):
with self.assertRaises(Exception):
self.gi.workflows.import_workflow_from_local_path(None)
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
imported_wf = self.gi.workflows.import_workflow_from_local_path(path)
self.assertIsInstance(imported_wf, dict)
self.assertEqual(imported_wf['name'], 'paste_columns')
self.assertTrue(imported_wf['url'].startswith('/api/workflows/'))
self.assertFalse(imported_wf['deleted'])
self.assertFalse(imported_wf['published'])
with self.assertRaises(Exception):
self.gi.workflows.export_workflow_to_local_path(None, None, None)
export_dir = tempfile.mkdtemp(prefix='bioblend_test_')
try:
self.gi.workflows.export_workflow_to_local_path(imported_wf['id'], export_dir)
dir_contents = os.listdir(export_dir)
self.assertEqual(len(dir_contents), 1)
export_path = os.path.join(export_dir, dir_contents[0])
with open(export_path, 'r') as f:
exported_wf_dict = json.load(f)
def test_upload_file_dbkey(self):
history = self.gi.histories.create_history(name="test_upload_file history")
fn = test_util.get_abspath("test_util.py")
file_name = "test1"
dbkey = "hg19"
tool_output = self.gi.tools.upload_file(
fn,
history_id=history["id"],
file_name=file_name,
dbkey=dbkey,
file_type="txt",
)
self._wait_for_and_verify_upload(tool_output, file_name, fn, expected_dbkey=dbkey)