Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def resolve_job_ref(job_id, name, describe={}):
try:
job_desc = dxpy.api.job_describe(job_id)
except Exception as details:
raise ResolutionError(str(details))
project = job_desc['project']
describe['project'] = project
if job_desc['state'] != 'done':
raise ResolutionError('the job ' + job_id + ' is ' + job_desc['state'] + ', and it must be in the done state for its outputs to be accessed')
index = None
if '.' in name:
try:
actual_name, str_index = name.rsplit('.', 1)
index = int(str_index)
name = actual_name
except ValueError:
pass
def invite(self, invitee, level, send_email=True, **kwargs):
"""
:param invitee: Username (of the form "user-USERNAME") or email address of person to be invited to the project; use "PUBLIC" to make the project publicly available (in which case level must be set to "VIEW").
:type invitee: string
:param level: Permissions level that the invitee would get ("VIEW", "UPLOAD", "CONTRIBUTE", or "ADMINISTER")
:type level: string
:param send_email: Determines whether user receives email notifications regarding the project invitation
:type send_email: boolean
Invites the specified user to have access to the project.
"""
return dxpy.api.project_invite(self._dxid,
{"invitee": invitee, "level": level,
"suppressEmailNotification": not send_email},
**kwargs)
elif exect.startswith("app-"):
app_regional_options = dxpy.api.app_describe(exect, input_params={"fields": {"regionalOptions": True}})
app_regions = set(app_regional_options['regionalOptions'].keys())
if not workflow_enabled_regions.issubset(app_regions):
additional_workflow_regions = workflow_enabled_regions - app_regions
mesg = "The app {} is enabled in regions {} while the global workflow in {}.".format(
exect, ", ".join(app_regions), ", ".join(workflow_enabled_regions))
mesg += " The workflow will not be able to run in {}.".format(", ".join(additional_workflow_regions))
mesg += " If you are a developer of the app, you can enable the app in {} to run the workflow in that region(s).".format(
", ".join(additional_workflow_regions))
logger.warn(mesg)
elif exect.startswith("workflow-"):
# We recurse to check the regions of the executables of the inner workflow
inner_workflow_spec = dxpy.api.workflow_describe(exect)
_assert_executable_regions_match(workflow_enabled_regions, inner_workflow_spec)
elif exect.startswith("globalworkflow-"):
raise WorkflowBuilderException("Building a global workflow with nested global workflows is not yet supported")
def get_dxlink_filesizes(dxfiles):
"""Run dx describe on a list of DNAnexus dxlink inputs to get the
corresponding file sizes.
Args:
dxfiles (list): DXFile objects, dxlinks, or file IDs
Returns:
list: corresponding filesizes in bytes, output of 'dx describe'
command
"""
ids = [as_dxfile_id(f) for f in dxfiles]
descriptions = dxpy.api.system_describe_data_objects(
{"objects": ids}, always_retry=True
)
sizes = dict(
(d["describe"]["id"], d["describe"]["size"])
for d in descriptions["results"]
)
return [sizes[i] for i in ids]
def decrease_perms(self, member, level, **kwargs):
"""
:param member: Username (of the form "user-USERNAME") of the project member whose permissions will be decreased.
:type member: string
:param level: Permissions level that the member will have after this operation (None, "VIEW", "UPLOAD", or "CONTRIBUTE")
:type level: string or None
Decreases the permissions that the specified user has in the project.
"""
input_hash = {}
input_hash[member] = level
return dxpy.api.project_decrease_permissions(self._dxid,
input_hash,
**kwargs)
if modified_after is not None:
query["modified"]["after"] = dxpy.utils.normalize_time_input(modified_after)
if modified_before is not None:
query["modified"]["before"] = dxpy.utils.normalize_time_input(modified_before)
if created_after is not None or created_before is not None:
query["created"] = {}
if created_after is not None:
query["created"]["after"] = dxpy.utils.normalize_time_input(created_after)
if created_before is not None:
query["created"]["before"] = dxpy.utils.normalize_time_input(created_before)
if describe is not None:
query["describe"] = describe
if limit is not None:
query["limit"] = limit
return _find(dxpy.api.systemFindApps, query, limit, return_handler, **kwargs)
:rtype: dict or None
This method parses a string that is expected to perhaps refer to
an app object. If found, its describe hash will be returned. For
more information on the contents of this hash, see the API
documentation. [TODO: external link here]
'''
alias = None
if not path.startswith('app-'):
path = 'app-' + path
if '/' in path:
alias = path[path.find('/') + 1:]
path = path[:path.find('/')]
try:
return dxpy.api.app_describe(path, alias=alias)
except dxpy.DXAPIError:
return None
raise AppBuilderException('Could not create %s' % (tried_versions,))
# Set categories appropriately.
categories_to_set = app_spec.get("categories", [])
existing_categories = dxpy.api.app_list_categories(app_id)['categories']
categories_to_add = set(categories_to_set).difference(set(existing_categories))
categories_to_remove = set(existing_categories).difference(set(categories_to_set))
if categories_to_add:
dxpy.api.app_add_categories(app_id, input_params={'categories': list(categories_to_add)})
if categories_to_remove:
dxpy.api.app_remove_categories(app_id, input_params={'categories': list(categories_to_remove)})
# Set developers list appropriately, but only if provided.
developers_to_set = app_spec.get("developers")
if developers_to_set is not None:
existing_developers = dxpy.api.app_list_developers(app_id)['developers']
developers_to_add = set(developers_to_set) - set(existing_developers)
developers_to_remove = set(existing_developers) - set(developers_to_set)
skip_updating_developers = False
if developers_to_add or developers_to_remove:
parts = []
if developers_to_add:
parts.append('the following developers will be added: ' + ', '.join(sorted(developers_to_add)))
if developers_to_remove:
parts.append('the following developers will be removed: ' + ', '.join(sorted(developers_to_remove)))
developer_change_message = '; and '.join(parts)
if confirm:
if INTERACTIVE_CLI:
try:
print('***')
print(fill('WARNING: ' + developer_change_message))
def add_tags(self, tags, **kwargs):
"""
:param tags: Tags to add to the app
:type tags: array
Adds the specified application name tags (aliases) to this app.
The current user must be a developer of the app.
"""
if self._dxid is not None:
return dxpy.api.app_add_tags(self._dxid, input_params={"tags": tags}, **kwargs)
else:
return dxpy.api.app_add_tags('app-' + self._name, alias=self._alias,
input_params={"tags": tags}, **kwargs)
for i, val in enumerate(line):
if i not in index_2_column:
continue
col_name = index_2_column[i]
klass = input_classes[col_name]
val_w_correct_type, ref_files = _type_convert(val.strip(), klass)
all_files += ref_files
columns[col_name].append(val_w_correct_type)
# Create an array of batch_ids
batch_ids = [line[batch_index].strip() for line in lines]
# call validate
#
# Output: list of dictionaries, each dictionary corresponds to expanded batch call
expanded_args = dxpy.api.applet_validate_batch(executable.get_id(),
{ "batchInput": columns,
"commonInput": input_json,
"files": all_files,
"instanceTypes": [] })
## future proofing
if isinstance(expanded_args, dict):
assert('expandedBatch' in expanded_args)
launch_args = expanded_args['expandedBatch']
else:
launch_args = expanded_args
if len(launch_args) != len(batch_ids):
raise Exception("Mismatch in number of launch_args vs. batch_ids ({} != {})"
.format(len(launch_args), len(batch_ids)))
return { "launch_args": launch_args,