Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
args = parse_args()
web_protocol, fqdn, ftp_protocol, port, api_key = \
args.protocol, args.fqdn, args.ftp_protocol, args.port, args.api_key
if args.http_auth_username:
GalaxyInstance.make_delete_request = inject_auth(make_delete_request, args.http_auth_username, args.http_auth_password)
GalaxyInstance.make_post_request = inject_auth(make_post_request, args.http_auth_username, args.http_auth_password)
GalaxyInstance.make_put_request = inject_auth(make_put_request, args.http_auth_username, args.http_auth_password)
GalaxyInstance.make_get_request = inject_auth(GalaxyInstance.make_get_request, args.http_auth_username, args.http_auth_password)
url = web_protocol + "://" + fqdn
new_api_key, user_email, password = create_user(url, api_key)
tmpfile = NamedTemporaryFile()
tmpfile.write("1\n2\n3\n")
if ftp_protocol == "ftp":
ftp_upload_file(fqdn, user_email, password, port, tmpfile.name)
else:
sftp_upload_file(fqdn, user_email, password, port, tmpfile.name)
if not successfull_upload(url, new_api_key, tmpfile.name):
sys.exit("{ftp_protocol} upload to galaxy server {fqdn} failed.".format(ftp_protocol = ftp_protocol, fqdn = fqdn))
def _gi(args):
gi = galaxy.GalaxyInstance(args.host, key=args.api_key)
name = "histexport-user-%d" % random.randint(0, 1000000)
user = gi.users.create_local_user(name, "%s@galaxytesting.dev" % name, "pass123")
user_id = user["id"]
api_key = gi.users.create_user_apikey(user_id)
user_gi = galaxy.GalaxyInstance(args.host, api_key)
return user_gi
def _gi(args):
gi = galaxy.GalaxyInstance(args.host, key=args.api_key)
name = "wftest-user-%d" % random.randint(0, 1000000)
user = gi.users.create_local_user(name, "%s@galaxytesting.dev" % name, "pass123")
user_id = user["id"]
api_key = gi.users.create_user_apikey(user_id)
user_gi = galaxy.GalaxyInstance(args.host, api_key)
return user_gi
def setUp(self):
self.GALAXY_HISTORY_ID = str(uuid.uuid4())
self.GALAXY_DATASET_ID = str(uuid.uuid4())
self.GALAXY_DATASET_FILESIZE = 1024
self.MISCELLANEOUS_STRING = "Coffee is tasty"
self.galaxy_instance = GalaxyInstanceFactory(api_key=str(uuid.uuid4()))
self.show_history_mock = mock.patch.object(
galaxy.histories.HistoryClient, "show_history"
).start()
self.show_dataset_mock = mock.patch.object(
galaxy.histories.HistoryClient, "show_dataset"
).start()
self.history_content_entry = {
"name": "Test History Content Entry",
"url": "www.example.com/history_content_entry",
"type": "file",
"id": self.GALAXY_DATASET_ID
}
def test_update_workflow_name(self):
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
wf = self.gi.workflows.import_workflow_from_local_path(path)
new_name = 'new name'
updated_wf = self.gi.workflows.update_workflow(wf['id'], name=new_name)
self.assertEqual(updated_wf['name'], new_name)
import tempfile
import uuid
from ssl import SSLError
from urllib.error import URLError
from urllib.request import urlopen
import bioblend
import bioblend.galaxy.objects.galaxy_instance as galaxy_instance
import bioblend.galaxy.objects.wrappers as wrappers
from bioblend.galaxy import dataset_collections
from . import test_util
from .test_util import unittest
bioblend.set_stream_logger('test', level='INFO')
socket.setdefaulttimeout(10.0)
SAMPLE_FN = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
SAMPLE_WF_COLL_FN = test_util.get_abspath(os.path.join('data', 'paste_columns_collections.ga'))
SAMPLE_WF_PARAMETER_INPUT_FN = test_util.get_abspath(os.path.join('data', 'workflow_with_parameter_input.ga'))
FOO_DATA = 'foo\nbar\n'
FOO_DATA_2 = 'foo2\nbar2\n'
SAMPLE_WF_DICT = {
'deleted': False,
'id': '9005c5112febe774',
'inputs': {
'571': {'label': 'Input Dataset', 'value': ''},
'572': {'label': 'Input Dataset', 'value': ''},
},
'model_class': 'StoredWorkflow',
'name': 'paste_columns',
'published': False,
'steps': {
'571': {
def test_import_publish_workflow_dict(self):
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
with open(path, 'r') as f:
wf_dict = json.load(f)
imported_wf = self.gi.workflows.import_workflow_dict(wf_dict, publish=True)
self.assertIsInstance(imported_wf, dict)
self.assertFalse(imported_wf['deleted'])
self.assertTrue(imported_wf['published'])
def test_get_workflows(self):
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
wf = self.gi.workflows.import_workflow_from_local_path(path)
all_wfs = self.gi.workflows.get_workflows()
self.assertGreater(len(all_wfs), 0)
wf_data = self.gi.workflows.get_workflows(workflow_id=wf['id'])[0]
self.assertEqual(wf['id'], wf_data['id'])
self.assertEqual(wf['name'], wf_data['name'])
self.assertEqual(wf['url'], wf_data['url'])
wf_data_list = self.gi.workflows.get_workflows(name=wf['name'])
self.assertTrue(any(_['id'] == wf['id'] for _ in wf_data_list))
def test_workflow_scheduling(self):
path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
workflow = self.gi.workflows.import_workflow_from_local_path(path)
workflow_id = workflow["id"]
history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
dataset1_id = self._test_dataset(history_id)
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 0)
invocation = self.gi.workflows.invoke_workflow(
workflow["id"],
inputs={"0": {"src": "hda", "id": dataset1_id}},
)
invocation_id = invocation["id"]
invocations = self.gi.workflows.get_invocations(workflow_id)
self.assertEqual(len(invocations), 1)
self.assertEqual(invocations[0]["id"], invocation_id)
def test_import_publish_workflow_from_local_path(self):
path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
imported_wf = self.gi.workflows.import_workflow_from_local_path(path, publish=True)
self.assertIsInstance(imported_wf, dict)
self.assertFalse(imported_wf['deleted'])
self.assertTrue(imported_wf['published'])