Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'module_path': 'test-puppet-modules'})
provisioner.provision()
assert mock_copy_dir.call_count == 2
assert (Path('test-puppet-modules'),
PurePosixPath(provisioner._guest_module_path)) in {
mock_copy_dir.call_args_list[0][0],
mock_copy_dir.call_args_list[1][0]}
assert mock_run.call_count == 2
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
"puppet apply --modulepath {}:{} --detailed-exitcodes --manifestdir {} {}".format(
PurePosixPath(provisioner._guest_module_path),
PurePosixPath(provisioner._guest_default_module_path),
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'mani.pp')]
if ".cwlprov" in rel_path:
# Our own!
conformsTo = [
prov_conforms_to[extension],
CWLPROV_VERSION,
]
else:
# Some other PROV
# TODO: Recognize ProvOne etc.
conformsTo = prov_conforms_to[extension]
return (mediatype, conformsTo)
aggregates = [] # type: List[Aggregate]
for path in self.bagged_size.keys():
temp_path = PurePosixPath(path)
folder = temp_path.parent
filename = temp_path.name
# NOTE: Here we end up aggregating the abstract
# data items by their sha1 hash, so that it matches
# the entity() in the prov files.
# TODO: Change to nih:sha-256; hashes
# https://tools.ietf.org/html/rfc6920#section-7
aggregate_dict = {
"uri": "urn:hash::sha1:" + filename,
"bundledAs": {
# The arcp URI is suitable ORE proxy; local to this Research Object.
# (as long as we don't also aggregate it by relative path!)
"uri": self.base_uri + path,
# relate it to the data/ path
def enumerate_http_resources(package, package_path):
with (package_path / 'resource.json').open() as json_file:
resource = json.load(json_file)
for name, url in resource.get('images', {}).items():
if name != 'screenshots':
yield url, pathlib.PurePosixPath(package, 'images')
for name, url in resource.get('assets', {}).get('uris', {}).items():
yield url, pathlib.PurePosixPath(package, 'uris')
def _process_sfdx_release(self, zip_file, version):
"""Process an SFDX ZIP file for objects and fields"""
for f in zip_file.namelist():
path = PurePosixPath(f)
if f.startswith("force-app/main/default/objects"):
if path.suffixes == [".object-meta", ".xml"]:
sobject_name = path.name[: -len(".object-meta.xml")]
self._process_object_element(
sobject_name, ET.fromstring(zip_file.read(f)), version
)
elif path.suffixes == [".field-meta", ".xml"]:
# To get the sObject name, we need to remove the `/fields/SomeField.field-meta.xml`
# and take the last path component
sobject_name = path.parent.parent.stem
self._process_field_element(
sobject_name, ET.fromstring(zip_file.read(f)), version
)
relative_path = pathlib.PurePosixPath(tarinfo.name)
else:
relative_path = pathlib.PurePosixPath(tarinfo.name).relative_to(relative_to) # pylint: disable=redefined-variable-type
if str(relative_path) in ignore_files:
ignore_files.remove(str(relative_path))
else:
destination = destination_dir.resolve() / pathlib.Path(*relative_path.parts)
if tarinfo.issym() and not symlink_supported:
# In this situation, TarFile.makelink() will try to create a copy of the
# target. But this fails because TarFile.members is empty
# But if symlinks are not supported, it's safe to assume that symlinks
# aren't needed. The only situation where this happens is on Windows.
continue
if tarinfo.islnk():
# Derived from TarFile.extract()
relative_target = pathlib.PurePosixPath(
tarinfo.linkname).relative_to(relative_to)
tarinfo._link_target = str( # pylint: disable=protected-access
destination_dir.resolve() / pathlib.Path(*relative_target.parts))
if destination.is_symlink():
destination.unlink()
tar_file_obj._extract_member(tarinfo, str(destination)) # pylint: disable=protected-access
except Exception as exc:
print("Exception thrown for tar member {}".format(tarinfo.name))
raise exc
# add_courses
for semester, sn in self._edit['add_courses']:
path = PurePosixPath('/', s['dir_root_courses'], semester)
node = self.open(path, edit_check=False)
course = WebCourseDirectory(self, node, semester, sn)
course.fetch()
assert course.ready == True
node.add(course.name, course)
node.add(sn, InternalLink(self, node, course.name))
# delete_files
for path in self._edit['delete_files']:
node = self.open(path, edit_check=False)
if node is self.root:
raise ValueError('不可以刪除根資料夾')
node.parent.unlink(PurePosixPath(path).name)
# 完成。下次 open 不會再進來了
del self._edit
# Get the actual cat picture
async with self.bot.http.get(cat_url) as resp:
# Get the data as a byte array (bytes object)
cat_data = await resp.read()
# Construct a byte stream from the data.
# This is necessary because the bytes object is immutable, but we need to add a "name" attribute to set the
# filename. This facilitates the setting of said attribute without altering behavior.
cat_stream = io.BytesIO(cat_data)
# Set the name of the cat picture before sending.
# This is necessary for Telethon to detect the file type and send it as a photo/GIF rather than just a plain
# unnamed file that doesn't render as media in clients.
# We abuse pathlib to extract the filename section here for convenience, since URLs are *mostly* POSIX paths
# with the exception of the protocol part, which we don't care about here.
cat_stream.name = PurePosixPath(cat_url).name
return cat_stream
def resolve_file_path(doc_uri, target_path):
_path = pathlib.PurePosixPath(target_path)
if not _path.is_absolute():
base_path = pathlib.Path(urllib.parse.urlparse(doc_uri).path).parent
else:
base_path = "."
_path = pathlib.Path(base_path / _path).resolve().absolute()
logger.debug(f"Resolved URI: {_path.as_uri()}")
return _path
def api_attach_log(self, task: dict, file_obj: FileType) -> bool:
"""Store the POSTed task log as a file in the storage backend.
Also updates the task itself to have a reference to the file.
:return: Whether this file was new (False) or overwrote a pre-existing
log file (True).
"""
blob = self.logfile_blob(task)
self._log.debug('Storing log for task %s in storage blob %s of project %s',
task['_id'], blob.name, task['project'])
preexisting = blob.exists()
blob.create_from_file(file_obj, content_type='application/gzip')
blob.update_filename(pathlib.PurePosixPath(blob.name).name,
is_attachment=False)
blob.update_content_type('text/plain', 'gzip')
self._log.info('Stored log for task %s in storage blob %s of project %s',
task['_id'], blob.name, task['project'])
tasks_coll = self.collection()
tasks_coll.update_one({'_id': task['_id']}, {'$set': {
'log_file': {
'backend': blob.bucket.backend_name,
'file_path': blob.name,
},
}})
return preexisting
img = np.transpose(img,(1,0,2,3))
img = img[args.InputCh,:,:,:]
img = input_normalization(img, args)
if len(args.ResizeRatio)>0:
img = resize(img, (1, args.ResizeRatio[0], args.ResizeRatio[1], args.ResizeRatio[2]), method='cubic')
for ch_idx in range(img.shape[0]):
struct_img = img[ch_idx,:,:,:] # note that struct_img is only a view of img, so changes made on struct_img also affects img
struct_img = (struct_img - struct_img.min())/(struct_img.max() - struct_img.min())
img[ch_idx,:,:,:] = struct_img
# apply the model
output_img = model_inference(model, img, softmax, args)
for ch_idx in range(len(args.OutputCh)//2):
writer = omeTifWriter.OmeTifWriter(args.OutputDir + pathlib.PurePosixPath(fn).stem +'_seg_'+ str(args.OutputCh[2*ch_idx])+'.ome.tif')
if args.Threshold<0:
writer.save(output_img[ch_idx].astype(float))
else:
out = output_img[ch_idx] > args.Threshold
out = out.astype(np.uint8)
out[out>0]=255
writer.save(out)
print(f'Image {fn} has been segmented')