Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
path = os.path.join(folder, checksum)
# os.rename assumed safe, as our temp file should
# be in same file system as our temp folder
if not os.path.isdir(folder):
os.makedirs(folder)
os.rename(tmp.name, path)
# Relative posix path
# (to avoid \ on Windows)
rel_path = posix_path(os.path.relpath(path, self.folder))
# Register in bagit checksum
if Hasher == hashlib.sha1:
self._add_to_bagit(rel_path, sha1=checksum)
else:
_logger.warning(
"[provenance] Unknown hash method %s for bagit manifest", Hasher
)
# Inefficient, bagit support need to checksum again
self._add_to_bagit(rel_path)
_logger.debug("[provenance] Added data file %s", path)
if timestamp is not None:
createdOn, createdBy = self._self_made(timestamp)
self._file_provenance[rel_path] = cast(
Aggregate, {"createdOn": createdOn, "createdBy": createdBy}
)
_logger.debug("[provenance] Relative path for data file %s", rel_path)
if content_type is not None:
self._content_types[rel_path] = content_type
return rel_path
if not (relative_path is not None and "location" in structure):
# Register in RO; but why was this not picked
# up by used_artefacts?
_logger.info("[provenance] Adding to RO %s", structure["location"])
with self.fsaccess.open(
cast(str, structure["location"]), "rb"
) as fp:
relative_path = self.add_data_file(fp)
checksum = PurePosixPath(relative_path).name
structure["checksum"] = "%s$%s" % (SHA1, checksum)
if relative_path is not None:
# RO-relative path as new location
structure["location"] = str(PurePosixPath("..") / relative_path)
else:
_logger.warning(
"Could not determine RO path for file %s", structure
)
if "path" in structure:
del structure["path"]
if structure.get("class") == "Directory":
# TODO: Generate anonymoys Directory with a "listing"
# pointing to the hashed files
del structure["location"]
for val in structure.values():
try:
self._relativise_files(cast(CWLOutputType, val))
except OSError:
pass
return
value["@id"] = entity.identifier.uri
return entity
coll_id = value.setdefault("@id", uuid.uuid4().urn)
# some other kind of dictionary?
# TODO: also Save as JSON
coll = self.document.entity(
coll_id,
[
(PROV_TYPE, WFPROV["Artifact"]),
(PROV_TYPE, PROV["Collection"]),
(PROV_TYPE, PROV["Dictionary"]),
],
)
if value.get("class"):
_logger.warning("Unknown data class %s.", value["class"])
# FIXME: The class might be "http://example.com/somethingelse"
coll.add_asserted_type(CWLPROV[value["class"]])
# Let's iterate and recurse
coll_attribs = [] # type: List[Tuple[Identifier, ProvEntity]]
for (key, val) in value.items():
v_ent = self.declare_artefact(val)
self.document.membership(coll, v_ent)
m_entity = self.document.entity(uuid.uuid4().urn)
# Note: only support PROV-O style dictionary
# https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
# as prov.py do not easily allow PROV-N extensions
m_entity.add_asserted_type(PROV["KeyEntityPair"])
m_entity.add_attributes(
{PROV["pairKey"]: str(key), PROV["pairEntity"]: v_ent}
)
visit_class(out, ("File", "Directory"), loc_to_path)
# Unsetting the Generation from final output object
visit_class(out, ("File",), MutationManager().unset_generation)
if isinstance(out, str):
stdout.write(out)
else:
stdout.write(json_dumps(out, indent=4, ensure_ascii=False))
stdout.write("\n")
if hasattr(stdout, "flush"):
stdout.flush()
if status != "success":
_logger.warning("Final process status is %s", status)
return 1
_logger.info("Final process status is %s", status)
return 0
except (ValidationException) as exc:
_logger.error(
"Input object failed validation:\n%s", str(exc), exc_info=args.debug
)
return 1
except UnsupportedRequirement as exc:
_logger.error(
"Workflow or tool uses unsupported feature:\n%s",
str(exc),
exc_info=args.debug,
)
return 33
if fs_access.isfile(g)
else "Directory",
}
for g in sorted(
fs_access.glob(fs_access.join(outdir, gb)),
key=cmp_to_key(
cast(
Callable[[str, str], int],
locale.strcoll,
)
),
)
]
)
except (OSError) as e:
_logger.warning(str(e))
except Exception:
_logger.error(
"Unexpected error from fs_access", exc_info=True
)
raise
for files in cast(List[Dict[str, Optional[CWLOutputType]]], r):
rfile = files.copy()
revmap(rfile)
if files["class"] == "Directory":
ll = schema.get("loadListing") or builder.loadListing
if ll and ll != "no_listing":
get_listing(fs_access, files, (ll == "deep_listing"))
else:
if binding.get("loadContents"):
with fs_access.open(
if k not in job:
continue
v = job[k]
dircount = [0]
def inc(d): # type: (List[int]) -> None
d[0] += 1
visit_class(v, ("Directory",), lambda x: inc(dircount))
if dircount[0] == 0:
continue
filecount = [0]
visit_class(v, ("File",), lambda x: inc(filecount))
if filecount[0] > FILE_COUNT_WARNING:
# Long lines in this message are okay, will be reflowed based on terminal columns.
_logger.warning(
strip_dup_lineno(
SourceLine(self.tool["inputs"], i, str).makeError(
"""Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use.
If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior:
$namespaces:
cwltool: "http://commonwl.org/cwltool#"
hints:
cwltool:LoadListingRequirement:
loadListing: shallow_listing
"""
% (filecount[0], k)
)
)
def print_js_hint_messages(js_hint_messages, source_line):
# type: (List[Text], Optional[SourceLine]) -> None
if source_line is not None:
for js_hint_message in js_hint_messages:
_logger.warning(source_line.makeError(js_hint_message))
def var_spool_cwl_detector(
obj: CWLOutputType, item: Optional[Any] = None, obj_key: Optional[Any] = None,
) -> bool:
"""Detect any textual reference to /var/spool/cwl."""
r = False
if isinstance(obj, str):
if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory":
_logger.warning(
SourceLine(item=item, key=obj_key, raise_type=str).makeError(
_VAR_SPOOL_ERROR.format(obj)
)
)
r = True
elif isinstance(obj, MutableMapping):
for mkey, mvalue in obj.items():
r = var_spool_cwl_detector(cast(CWLOutputType, mvalue), obj, mkey) or r
elif isinstance(obj, MutableSequence):
for lkey, lvalue in enumerate(obj):
r = var_spool_cwl_detector(cast(CWLOutputType, lvalue), obj, lkey) or r
return r
cidfile_dir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid"
if runtimeContext.cidfile_prefix is not None:
cidfile_name = str(runtimeContext.cidfile_prefix + "-" + cidfile_name)
cidfile_path = os.path.join(cidfile_dir, cidfile_name)
runtime.append("--cidfile=%s" % cidfile_path)
for key, value in self.environment.items():
runtime.append("--env=%s=%s" % (key, value))
if runtimeContext.strict_memory_limit and not user_space_docker_cmd:
runtime.append("--memory=%dm" % self.builder.resources["ram"])
elif not user_space_docker_cmd:
res_req, _ = self.builder.get_requirement("ResourceRequirement")
if res_req and ("ramMin" in res_req or "ramMax" in res_req):
_logger.warning(
"[job %s] Skipping Docker software container '--memory' limit "
"despite presence of ResourceRequirement with ramMin "
"and/or ramMax setting. Consider running with "
"--strict-memory-limit for increased portability "
"assurance.",
self.name,
)
return runtime, cidfile_path