Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def export_workflow_file(self):
"""Copy or extract workflow file and add it to the tutorial directory."""
if not os.path.exists(self.wf_dir):
os.makedirs(self.wf_dir)
if not os.path.exists(os.path.join(self.wf_dir, 'index.md')):
with open(os.path.join(self.wf_dir, 'index.md'), 'w') as handle:
handle.write('---\nlayout: workflow-list\n---\n')
if self.init_wf_fp:
shutil.copy(self.init_wf_fp, self.wf_fp)
elif self.init_wf_id:
gi = galaxy.GalaxyInstance(self.training.galaxy_url, key=self.training.galaxy_api_key)
gi.workflows.export_workflow_to_local_path(
self.init_wf_id,
self.wf_fp,
use_default_filename=False)
def gi(port=None, url=None, key=None):
"""Return a bioblend ``GalaxyInstance`` for Galaxy on this port."""
ensure_module()
if key is None:
key = DEFAULT_MASTER_API_KEY
if port is None:
url = url
else:
url = "http://localhost:%d" % int(port)
return galaxy.GalaxyInstance(
url=url,
key=key
)
def get_wf_tools_from_running_galaxy(kwds):
"""Get the workflow dictionary from a running Galaxy instance with the workflow installed there"""
gi = galaxy.GalaxyInstance(kwds['galaxy_url'], key=kwds['galaxy_api_key'])
wf = gi.workflows.export_workflow_dict(kwds['workflow_id'])
tools = get_wf_tool_description(wf, gi)
return wf, tools
def get_hands_on_boxes_from_running_galaxy(wf_id, galaxy_url, galaxy_api_key):
"""Get the workflow dictionary from a running Galaxy instance with the workflow installed on it."""
gi = galaxy.GalaxyInstance(galaxy_url, key=galaxy_api_key)
wf = gi.workflows.export_workflow_dict(wf_id)
tuto_body = format_wf_steps(wf, gi)
return tuto_body
def add_workflow_file(kwds, tuto_dir):
"""Copy or extract workflow file and add it to the tutorial directory"""
wf_dir = os.path.join(tuto_dir, "workflows")
# copy / extract workflow
wf_filepath = os.path.join(wf_dir, "init_workflow.ga")
if kwds["workflow"]:
shutil.copy(kwds["workflow"], wf_filepath)
else:
gi = galaxy.GalaxyInstance(kwds['galaxy_url'], key=kwds['galaxy_api_key'])
gi.workflows.export_workflow_to_local_path(kwds['workflow_id'],
wf_filepath,
use_default_filename=False)
# remove empty workflow file if there
empty_wf_filepath = os.path.join(wf_dir, "empty_workflow.ga")
if os.path.exists(empty_wf_filepath):
os.remove(empty_wf_filepath)