Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
exit(1)
else:
job_order_object = {"id": args.workflow}
del cmd_line["job_order"]
job_order_object.update({namemap[k]: v for k, v in cmd_line.items()})
if secret_store and secrets_req:
secret_store.store(
[shortname(sc) for sc in cast(List[str], secrets_req["secrets"])],
job_order_object,
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Parsed job order from command line: %s",
json_dumps(job_order_object, indent=4),
)
for inp in process.tool["inputs"]:
if "default" in inp and (
not job_order_object or shortname(inp["id"]) not in job_order_object
):
if not job_order_object:
job_order_object = {}
job_order_object[shortname(inp["id"])] = inp["default"]
if job_order_object is None:
if process.tool["inputs"]:
if toolparser is not None:
elif obj["class"] == "File":
path = cast(str, obj["location"])
ab = abspath(path, basedir)
if "contents" in obj and path.startswith("_:"):
self._pathmap[path] = MapperEnt(
obj["contents"],
tgt,
"CreateWritableFile" if copy else "CreateFile",
staged,
)
else:
with SourceLine(
obj,
"location",
ValidationException,
_logger.isEnabledFor(logging.DEBUG),
):
deref = ab
if urllib.parse.urlsplit(deref).scheme in ["http", "https"]:
deref = downloadHttpFile(path)
else:
# Dereference symbolic links
st = os.lstat(deref)
while stat.S_ISLNK(st.st_mode):
rl = os.readlink(deref)
deref = (
rl
if os.path.isabs(rl)
else os.path.join(os.path.dirname(deref), rl)
)
st = os.lstat(deref)
def copy_job_order(
job: Union[Process, JobsType], job_order_object: CWLObjectType
) -> CWLObjectType:
"""Create copy of job object for provenance."""
if not isinstance(job, WorkflowJob):
# direct command line tool execution
return job_order_object
customised_job = {} # type: CWLObjectType
# new job object for RO
for each, i in enumerate(job.tool["inputs"]):
with SourceLine(
job.tool["inputs"],
each,
WorkflowException,
_logger.isEnabledFor(logging.DEBUG),
):
iid = shortname(i["id"])
if iid in job_order_object:
customised_job[iid] = copy.deepcopy(job_order_object[iid])
# add the input element in dictionary for provenance
elif "default" in i:
customised_job[iid] = copy.deepcopy(i["default"])
# add the default elements in the dictionary for provenance
else:
pass
return customised_job
def _connect(self, url): # type: (Text) -> Optional[ftplib.FTP]
parse = urllib.parse.urlparse(url)
if parse.scheme == 'ftp':
host, user, passwd, _ = self._parse_url(url)
if (host, user, passwd) in self.cache:
if self.cache[(host, user, passwd)].pwd():
return self.cache[(host, user, passwd)]
ftp = ftplib.FTP_TLS()
ftp.set_debuglevel(1 if _logger.isEnabledFor(logging.DEBUG) else 0)
ftp.connect(host)
ftp.login(user, passwd, secure=not self.insecure)
self.cache[(host, user, passwd)] = ftp
return ftp
return None
def collect_output(
self,
schema: CWLObjectType,
builder: Builder,
outdir: str,
fs_access: StdFsAccess,
compute_checksum: bool = True,
) -> Optional[CWLOutputType]:
r = [] # type: List[CWLOutputType]
empty_and_optional = False
debug = _logger.isEnabledFor(logging.DEBUG)
if "outputBinding" in schema:
binding = cast(
MutableMapping[str, Union[bool, str, List[str]]],
schema["outputBinding"],
)
globpatterns = [] # type: List[str]
revmap = partial(revmap_file, builder, outdir)
if "glob" in binding:
with SourceLine(binding, "glob", WorkflowException, debug):
for gb in aslist(binding["glob"]):
gb = builder.do_eval(gb)
if gb:
globpatterns.extend(aslist(gb))
j = self.make_job_runner(runtimeContext)(
builder,
builder.job,
self.make_path_mapper,
self.requirements,
self.hints,
jobname,
)
j.prov_obj = self.prov_obj
j.successCodes = self.tool.get("successCodes", [])
j.temporaryFailCodes = self.tool.get("temporaryFailCodes", [])
j.permanentFailCodes = self.tool.get("permanentFailCodes", [])
debug = _logger.isEnabledFor(logging.DEBUG)
if debug:
_logger.debug(
"[job %s] initializing from %s%s",
j.name,
self.tool.get("id", ""),
" as part of %s" % runtimeContext.part_of
if runtimeContext.part_of
else "",
)
_logger.debug("[job %s] %s", j.name, json_dumps(builder.job, indent=4))
builder.pathmapper = self.make_path_mapper(
reffiles, builder.stagedir, runtimeContext, True
)
builder.requirements = j.requirements