Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Next, we need to set up our two direction getters
"""
"""
Example #1: Bootstrap direction getter with CSD Model
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
boot_dg_csd = BootDirectionGetter.from_data(data, csd_model, max_angle=30.,
sphere=small_sphere)
boot_streamline_generator = LocalTracking(boot_dg_csd, stopping_criterion,
seeds, affine, step_size=.5)
streamlines = Streamlines(boot_streamline_generator)
sft = StatefulTractogram(streamlines, hardi_img, Space.RASMM)
save_trk(sft, "tractogram_bootstrap_dg.trk")
if has_fury:
r = window.Renderer()
r.add(actor.line(streamlines, colormap.line_colors(streamlines)))
window.record(r, out_path='tractogram_bootstrap_dg.png', size=(800, 800))
if interactive:
window.show(r)
"""
.. figure:: tractogram_bootstrap_dg.png
:align: center
**Corpus Callosum Bootstrap Probabilistic Direction Getter**
)
self.streamline_generator = ParticleFilteringTracking(
self.pdg,
self.tiss_classifier,
self.seeds,
self.stream_affine,
max_cross=maxcrossing,
step_size=0.5,
maxlen=1000,
pft_back_tracking_dist=2,
pft_front_tracking_dist=1,
particle_count=15,
return_all=True,
)
print("Reconstructing tractogram streamlines...")
self.streamlines = Streamlines(self.streamline_generator)
return self.streamlines
# Particle Filtering Tractography
pft_streamline_generator = ParticleFilteringTracking(dg,
cmc_classifier,
seeds,
affine,
max_cross=1,
step_size=step_size,
maxlen=1000,
pft_back_tracking_dist=2,
pft_front_tracking_dist=1,
particle_count=15,
return_all=False)
# streamlines = list(pft_streamline_generator)
streamlines = Streamlines(pft_streamline_generator)
save_trk("pft_streamline.trk", streamlines, affine, shape)
renderer.clear()
renderer.add(actor.line(streamlines, cmap.line_colors(streamlines)))
window.record(renderer, out_path='pft_streamlines.png', size=(600, 600))
"""
.. figure:: pft_streamlines.png
:align: center
**Particle Filtering Tractography**
"""
# Local Probabilistic Tractography
prob_streamline_generator = LocalTracking(dg,
segment_idxs.append(list(range(NR_SEGMENTS * ANTI_INTERPOL_MULT)))
segment_idxs = np.array(segment_idxs)
elif algorithm == "distance_map":
metric = AveragePointwiseEuclideanMetric()
qb = QuickBundles(threshold=100., metric=metric)
clusters = qb.cluster(streamlines)
centroids = Streamlines(clusters.centroids)
_, segment_idxs = cKDTree(centroids.data, 1, copy_data=True).query(streamlines, k=1)
elif algorithm == "cutting_plane":
streamlines_resamp = fiber_utils.resample_fibers(streamlines, NR_SEGMENTS * ANTI_INTERPOL_MULT)
metric = AveragePointwiseEuclideanMetric()
qb = QuickBundles(threshold=100., metric=metric)
clusters = qb.cluster(streamlines_resamp)
centroid = Streamlines(clusters.centroids)[0]
# index of the middle cluster
middle_idx = int(NR_SEGMENTS / 2) * ANTI_INTERPOL_MULT
middle_point = centroid[middle_idx]
segment_idxs = fiber_utils.get_idxs_of_closest_points(streamlines, middle_point)
# Align along the middle and assign indices
segment_idxs_eqlen = []
for idx, sl in enumerate(streamlines):
sl_middle_pos = segment_idxs[idx]
before_elems = sl_middle_pos
after_elems = len(sl) - sl_middle_pos
base_idx = 1000 # use higher index to avoid negative numbers for area below middle
r = range((base_idx - before_elems), (base_idx + after_elems))
segment_idxs_eqlen.append(r)
segment_idxs = segment_idxs_eqlen
# Add extra point otherwise coloring BUG
return np.logical_or(warped_roi1, warped_roi2)
print("Generating Seed Masks...")
seed_masks = mask_from_bundle_ROI(bundles, mapping)
print("Getting Tracks...")
if not op.exists('dti_streamlines.trk'):
streamlines = list(aft.track(dti_params['params'], seed_mask=seed_masks))
aus.write_trk('./dti_streamlines.trk', streamlines, affine=img.affine)
else:
tg = nib.streamlines.load('./dti_streamlines.trk').tractogram
streamlines = tg.apply_affine(np.linalg.inv(img.affine)).streamlines
streamlines = dts.Streamlines(dtu.move_streamlines(
[s for s in streamlines if s.shape[0] > 100],
np.linalg.inv(img.affine)))
scene = window.Scene()
scene.add(actor.line(streamlines, line_colors(streamlines)))
scene.add(actor.contour_from_roi(seed_masks, img.affine, [0, 1, 1], 0.5))
window.show(scene)
vdc, denom = 0, 1
while n:
denom *= base
n, remainder = divmod(n, base)
vdc += remainder / denom
return vdc
[x_mul, y_mul, z_mul] = [vdc(i) for i in range(1, 4)]
adjusted_affine = affine_map.affine.copy()
adjusted_affine[0][3] = -adjusted_affine[0][3]*x_mul
adjusted_affine[1][3] = -adjusted_affine[1][3]*y_mul
adjusted_affine[2][3] = -adjusted_affine[2][3]*z_mul
# Deform streamlines, isocenter, and remove streamlines outside brain
streams_final_filt = Streamlines(utils.target_line_based(
transform_streamlines(transform_streamlines(
[sum(d, s) for d, s in zip(values_from_volume(mapping.get_forward_field(), streams_in_curr_grid,
ref_grid_aff), streams_in_curr_grid)],
np.linalg.inv(adjusted_affine)), np.linalg.inv(warped_fa_img.affine)), np.eye(4), brain_mask, include=True))
# Remove streamlines with negative voxel indices
lin_T, offset = _mapping_to_voxel(np.eye(4))
streams_final_filt_final = []
for sl in streams_final_filt:
inds = np.dot(sl, lin_T)
inds += offset
if not inds.min().round(decimals=6) < 0:
streams_final_filt_final.append(sl)
# Save streamlines
stf = StatefulTractogram(streams_final_filt_final, reference=warped_fa_img, space=Space.RASMM, shifted_origin=True)
mins = np.min(centroid_matrix, axis=0)
close_clusters_indices = list(np.where(mins != np.inf)[0])
close_clusters = self.cluster_map[close_clusters_indices]
neighb_indices = [cluster.indices for cluster in close_clusters]
neighb_streamlines = Streamlines(chain(*close_clusters))
nb_neighb_streamlines = len(neighb_streamlines)
if nb_neighb_streamlines == 0:
if self.verbose:
logger.info('You have no neighbor streamlines... ' +
'No bundle recognition')
return Streamlines([]), []
if self.verbose:
logger.info(' Number of neighbor streamlines %d' %
(nb_neighb_streamlines,))
logger.info(' Duration %0.3f sec. \n' % (time() - t,))
return neighb_streamlines, neighb_indices
"""
save_trk("bootstrap_dg_CSD.trk", streamlines, affine, labels.shape)
"""
Example #2: Closest peak direction getter with CSD Model
"""
from dipy.direction import ClosestPeakDirectionGetter
pmf = csd_fit.odf(small_sphere).clip(min=0)
peak_dg = ClosestPeakDirectionGetter.from_pmf(pmf, max_angle=30.,
sphere=small_sphere)
peak_streamline_generator = LocalTracking(peak_dg, classifier, seeds, affine,
step_size=.5)
streamlines = Streamlines(peak_streamline_generator)
renderer.clear()
renderer.add(actor.line(streamlines, cmap.line_colors(streamlines)))
window.record(renderer, out_path='closest_peak_dg_CSD.png', size=(600, 600))
"""
.. figure:: closest_peak_dg_CSD.png
:align: center
**Corpus Callosum Closest Peak Deterministic Direction Getter**
We have created a set of streamlines using the closest peak direction getter,
which is a type of deterministic tracking. If you repeat the fiber tracking
(keeping all inputs the same) you will get exactly the same set of streamlines.
We can save the streamlines as a Trackvis file so it can be loaded into other
software for visualization or further analysis.
"""
if rng is None:
rng = np.random.RandomState()
if verbose:
logger.info('Static streamlines size {}'.format(len(static)))
logger.info('Moving streamlines size {}'.format(len(moving)))
def check_range(streamline, gt=greater_than, lt=less_than):
if (length(streamline) > gt) & (length(streamline) < lt):
return True
else:
return False
streamlines1 = Streamlines(static[np.array([check_range(s)
for s in static])])
streamlines2 = Streamlines(moving[np.array([check_range(s)
for s in moving])])
if verbose:
logger.info('Static streamlines after length reduction {}'
.format(len(streamlines1)))
logger.info('Moving streamlines after length reduction {}'
.format(len(streamlines2)))
if select_random is not None:
rstreamlines1 = select_random_set_of_streamlines(streamlines1,
select_random,
rng=rng)
else:
rstreamlines1 = streamlines1
# streamlines = FiberUtils.resample_fibers(streamlines, NR_SEGMENTS * ANTI_INTERPOL_MULT)
# remove = int((NR_SEGMENTS * ANTI_INTERPOL_MULT) * 0.15) # remove X% in beginning and end
# streamlines = np.array(streamlines)[:, remove:-remove, :]
# streamlines = list(streamlines)
if algorithm == "equal_dist":
segment_idxs = []
for i in range(len(streamlines)):
segment_idxs.append(list(range(NR_SEGMENTS * ANTI_INTERPOL_MULT)))
segment_idxs = np.array(segment_idxs)
elif algorithm == "distance_map":
metric = AveragePointwiseEuclideanMetric()
qb = QuickBundles(threshold=100., metric=metric)
clusters = qb.cluster(streamlines)
centroids = Streamlines(clusters.centroids)
_, segment_idxs = cKDTree(centroids.data, 1, copy_data=True).query(streamlines, k=1)
elif algorithm == "cutting_plane":
streamlines_resamp = fiber_utils.resample_fibers(streamlines, NR_SEGMENTS * ANTI_INTERPOL_MULT)
metric = AveragePointwiseEuclideanMetric()
qb = QuickBundles(threshold=100., metric=metric)
clusters = qb.cluster(streamlines_resamp)
centroid = Streamlines(clusters.centroids)[0]
# index of the middle cluster
middle_idx = int(NR_SEGMENTS / 2) * ANTI_INTERPOL_MULT
middle_point = centroid[middle_idx]
segment_idxs = fiber_utils.get_idxs_of_closest_points(streamlines, middle_point)
# Align along the middle and assign indices
segment_idxs_eqlen = []
for idx, sl in enumerate(streamlines):
sl_middle_pos = segment_idxs[idx]