Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_density_map():
"""
Test for create_density_map functionality
"""
from pynets.dmri import track
base_dir = str(Path(__file__).parent/"examples")
dir_path = base_dir + '/001/dmri'
dwi_file = dir_path + '/HARDI150.nii.gz'
dwi_img = nib.load(dwi_file)
# Load output from test_filter_streamlines: dictionary of streamline info
streamlines_trk = dir_path + '/tractography/streamlines_Default_csa_10_5mm_curv[2_4_6]_step[0.1_0.2_0.5].trk'
streamlines = nib.streamlines.load(streamlines_trk).streamlines
conn_model = 'csa'
target_samples = 10
node_size = 5
curv_thr_list = [2, 4, 6]
step_list = [0.1, 0.2, 0.5]
network = 'Default'
roi = None
directget = 'prob'
max_length = 200
[streams, dir_path, dm_path] = track.create_density_map(dwi_img, dir_path, streamlines, conn_model,
target_samples, node_size, curv_thr_list, step_list,
network, roi, directget, max_length)
assert streams is not None
def main():
parser = _build_arg_parser()
args = parser.parse_args()
assert_inputs_exist(parser, args.in_bundle)
assert_outputs_exist(parser, args, args.out_bundle, args.remaining_bundle)
if args.alpha <= 0 or args.alpha > 1:
parser.error('--alpha should be ]0, 1]')
tractogram = nib.streamlines.load(args.in_bundle)
if int(tractogram.header['nb_streamlines']) == 0:
logging.warning("Bundle file contains no streamline")
return
check_tracts_same_format(parser, [args.in_bundle, args.out_bundle,
args.remaining_bundle])
streamlines = tractogram.streamlines
summary = outliers_removal_using_hierarchical_quickbundles(streamlines)
outliers, inliers = prune(streamlines, args.alpha, summary)
inliers_streamlines = tractogram.streamlines[inliers]
inliers_data_per_streamline = tractogram.tractogram.data_per_streamline[inliers]
inliers_data_per_point = tractogram.tractogram.data_per_point[inliers]
bundles using local and global streamline-based registration
and clustering, NeuroImage, 2017.
"""
io_it = self.get_io_iterator()
logging.info("QuickBundlesX clustering is in use")
logging.info('QBX thresholds {0}'.format(qbx_thr))
for static_file, moving_file, out_moved_file, out_affine_file, \
static_centroids_file, moving_centroids_file, \
moved_centroids_file in io_it:
logging.info('Loading static file {0}'.format(static_file))
logging.info('Loading moving file {0}'.format(moving_file))
static_obj = nib.streamlines.load(static_file)
moving_obj = nib.streamlines.load(moving_file)
static, static_header = static_obj.streamlines, static_obj.header
moving, moving_header = moving_obj.streamlines, moving_obj.header
moved, affine, centroids_static, centroids_moving = \
slr_with_qbx(
static, moving, x0, rm_small_clusters=rm_small_clusters,
greater_than=greater_than, less_than=less_than,
qbx_thr=qbx_thr)
logging.info('Saving output file {0}'.format(out_moved_file))
new_tractogram = nib.streamlines.Tractogram(moved,
affine_to_rasmm=np.eye(4))
nib.streamlines.save(new_tractogram, out_moved_file,
header=moving_header)
Parameters
----------
stream_path : str
Path for the input streamline.trk file(s).
qa_out_path_path : str
Path for the output QA imgae file(s)
brain_path : str
Path for the reference brain scan, in order to get the scan volume.
"""
#Use window to visualize the streamlines
r = window.renderer()
#Load the streamline.trk file
streamlines_mni_load = nib.streamlines.load(stream_path).streamlines
streamlines_mni_in = Streamlines(streamlines_mni_load)
streamlines_actor = actor.line(
streamlines_mni_in,
colormap.line_colors(streamlines_mni_in),
lod_points=10000,
depth_cue=True,
linewidth=0.2,
fake_tube=True,
opacity=0.3,
)
r.add(streamlines_actor)
#window.show(r)
showmng = window.ShowManager(r)
#window.record function can rotate the 3D-image, then get the snapshot of the specific angle.
# Use "trk_legacy" for zenodo dataset v1.1.0 and below
# Use "trk" for zenodo dataset v1.2.0
tracking_format = "trk"
ref_img = nib.load(ref_img_path)
ref_affine = ref_img.get_affine()
ref_shape = ref_img.get_data().shape
streams, hdr = trackvis.read(file_in)
streamlines = [s[0] for s in streams] # list of 2d ndarrays
if tracking_format == "trk_legacy":
streams, hdr = trackvis.read(file_in)
streamlines = [s[0] for s in streams]
else:
sl_file = nib.streamlines.load(file_in)
streamlines = sl_file.streamlines
#Upsample Streamlines (very important, especially when using DensityMap Threshold. Without upsampling eroded results)
max_seq_len = abs(ref_affine[0, 0] / 4)
streamlines = list(utils_trk.subsegment(streamlines, max_seq_len))
# Remember: Does not count if a fibers has no node inside of a voxel -> upsampling helps, but not perfect
# Counts the number of unique streamlines that pass through each voxel -> oversampling does not distort result
dm = utils_trk.density_map(streamlines, ref_shape, affine=ref_affine)
# Create Binary Map
dm_binary = dm > 0 # Using higher Threshold problematic, because tends to remove valid parts (sparse fibers)
dm_binary_c = dm_binary
#Filter Blobs (might remove valid parts) -> do not use
#dm_binary_c = remove_small_blobs(dm_binary_c, threshold=10)
- dimensions ndarray (3,), int16, volume shape for each axis
- voxel_sizes ndarray (3,), float32, size of voxel for each axis
- voxel_order, string, Typically 'RAS' or 'LPS'
"""
is_nifti = False
is_trk = False
is_sft = False
if isinstance(reference, str):
try:
header = nib.load(reference).header
is_nifti = True
except nib.filebasedimages.ImageFileError:
pass
try:
header = nib.streamlines.load(reference, lazy_load=True).header
_, extension = os.path.splitext(reference)
if extension == '.trk':
is_trk = True
except ValueError:
pass
elif isinstance(reference, nib.nifti1.Nifti1Image):
header = reference.header
is_nifti = True
elif isinstance(reference, nib.streamlines.trk.TrkFile):
header = reference.header
is_trk = True
elif isinstance(reference, nib.nifti1.Nifti1Header):
header = reference
is_nifti = True
elif isinstance(reference, dict) and 'magic_number' in reference:
header = reference
parser = _build_arg_parser()
args = parser.parse_args()
assert_inputs_exist(parser, args.in_tractogram)
assert_outputs_exist(parser, args, args.out_tractogram)
check_tracts_same_format(parser, args.in_tractogram, args.out_tractogram)
if args.error_rate < 0.001 or args.error_rate > 1:
logging.warning(
'You are using an error rate of {}.\n'
'We recommend setting it between 0.001 and 1.\n'
'0.001 will do almost nothing to the streamlines\n'
'while 1 will highly compress/linearize the streamlines'
.format(args.error_rate))
in_tractogram = nib.streamlines.load(args.in_tractogram, lazy_load=True)
compressed_streamlines = compress_streamlines_wrapper(in_tractogram,
args.error_rate)
out_tractogram = LazyTractogram(compressed_streamlines,
affine_to_rasmm=np.eye(4))
nib.streamlines.save(out_tractogram, args.out_tractogram,
header=in_tractogram.header)
assert_inputs_exist(parser, [args.tractogram_filename])
assert_outputs_exist(parser, args, [args.seed_density_filename])
tracts_format = detect_format(args.tractogram_filename)
if tracts_format is TckFile:
raise ValueError("Invalid input streamline file format " +
"(must be trk): {0}".format(args.tractogram_filename))
max_ = np.iinfo(np.int16).max
if args.binary is not None and (args.binary <= 0 or args.binary > max_):
parser.error('The value of --binary ({}) '
'must be greater than 0 and smaller or equal to {}'
.format(args.binary, max_))
# Load tractogram and load seeds
tracts_file = load(args.tractogram_filename, args.lazy_load)
if 'seeds' in tracts_file.tractogram.data_per_streamline:
seeds = tracts_file.tractogram.data_per_streamline['seeds']
else:
parser.error('Tractogram does not contain seeds')
# Transform seeds if they're all in memory
if not args.lazy_load:
seeds = apply_affine(np.linalg.inv(tracts_file.affine), seeds)
# Create seed density map
shape = tracts_file.header[Field.DIMENSIONS]
seed_density = np.zeros(shape, dtype=np.int32)
for seed in seeds:
# If seeds are lazily loaded, we have to transform them
# as they get loaded
if args.lazy_load:
trk_file = "tmp.trk"
tractogram = Tractogram(points,
data_per_point={'scalars': scalars},
affine_to_rasmm=np.eye(4))
TrkFile(tractogram).save(trk_file)
streamlines_old = [d[0] - 0.5
for d in tv.read(trk_file, points_space="rasmm")[0]]
scalars_old = [d[1]
for d in tv.read(trk_file, points_space="rasmm")[0]]
mtime_old = measure('tv.read(trk_file, points_space="rasmm")', repeat)
msg = "Old: Loaded {:,} streamlines with scalars in {:6.2f}"
print(msg.format(NB_STREAMLINES, mtime_old))
trk = nib.streamlines.load(trk_file, lazy_load=False)
scalars_new = trk.tractogram.data_per_point['scalars']
mtime_new = measure('nib.streamlines.load(trk_file, lazy_load=False)',
repeat)
msg = "New: Loaded {:,} streamlines with scalars in {:6.2f}"
print(msg.format(NB_STREAMLINES, mtime_new))
print(f"Speedup of {mtime_old / mtime_new:2f}")
for s1, s2 in zip(scalars_new, scalars_old):
assert_array_equal(s1, s2)
metrics = img_utils.scale_to_range(metrics, range=(0, 99)) # range needs to be same as segments in colormap
orientation = dataset_specific_utils.get_optimal_orientation_for_bundle(bundle)
# Load mask
beginnings_img = nib.load(endings_path)
beginnings = beginnings_img.get_data()
for i in range(1):
beginnings = binary_dilation(beginnings)
# Load trackings
if tracking_format == "trk_legacy":
streams, hdr = trackvis.read(bundle_path)
streamlines = [s[0] for s in streams]
else:
sl_file = nib.streamlines.load(bundle_path)
streamlines = sl_file.streamlines
streamlines = list(transform_streamlines(streamlines, np.linalg.inv(beginnings_img.affine)))
# Reduce streamline count
streamlines = streamlines[::2]
# Reorder to make all streamlines have same start region
streamlines = fiber_utils.add_to_each_streamline(streamlines, 0.5)
streamlines_new = []
for idx, sl in enumerate(streamlines):
startpoint = sl[0]
# Flip streamline if not in right order
if beginnings[int(startpoint[0]), int(startpoint[1]), int(startpoint[2])] == 0:
sl = sl[::-1, :]
streamlines_new.append(sl)
streamlines = fiber_utils.add_to_each_streamline(streamlines_new, -0.5)