Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
latent_time = compute_shared_time(t)
else:
roots = roots[:4]
latent_time = np.ones(shape=(len(roots), adata.n_obs))
for i, root in enumerate(roots):
t, t_ = root_time(t, root=root)
latent_time[i] = compute_shared_time(t)
latent_time = scale(np.mean(latent_time, axis=0))
if fates[0] is not None:
fates = fates[:4]
latent_time_ = np.ones(shape=(len(fates), adata.n_obs))
for i, fate in enumerate(fates):
t, t_ = root_time(t, root=fate)
latent_time_[i] = 1 - compute_shared_time(t)
latent_time = scale(latent_time + 0.2 * scale(np.mean(latent_time_, axis=0)))
tl = latent_time
tc = conn.dot(latent_time)
z = tl.dot(tc) / tc.dot(tc)
tl_conf = (1 - np.abs(tl / np.max(tl) - tc * z / np.max(tl))) ** 2
idx_low_confidence = tl_conf < min_confidence
if weight_diffusion is not None:
w = weight_diffusion
latent_time = (1 - w) * latent_time + w * vpt
latent_time[idx_low_confidence] = vpt[idx_low_confidence]
else:
conn_new = conn.copy()
conn_new[:, idx_low_confidence] = 0
conn_new.eliminate_zeros()
def set_iroot(self, root=None):
if (
isinstance(root, str)
and root in self._adata.obs.keys()
and self._adata.obs[root].max() != 0
):
self.iroot = get_connectivities(self._adata).dot(self._adata.obs[root])
self.iroot = scale(self.iroot).argmax()
elif isinstance(root, str) and root in self._adata.obs_names:
self.iroot = np.where(self._adata.obs_names == root)[0][0]
elif isinstance(root, (int, np.integer)) and root < self._adata.n_obs:
self.iroot = root
else:
self.iroot = None
)
kwargs.update({"self_transitions": self_transitions})
categories = [None]
if groupby is not None and groups is None:
categories = adata.obs[groupby].cat.categories
for cat in categories:
groups = cat if cat is not None else groups
cell_subset = groups_to_bool(adata, groups=groups, groupby=groupby)
_adata = adata if groups is None else adata[cell_subset]
connectivities = get_connectivities(_adata, "distances")
T = transition_matrix(_adata, vkey=vkey, backward=True, **kwargs)
eigvecs_roots = eigs(T, eps=eps, perc=[2, 98], random_state=random_state)[1]
roots = csr_matrix.dot(connectivities, eigvecs_roots).sum(1)
roots = scale(np.clip(roots, 0, np.percentile(roots, 98)))
roots = verify_roots(_adata, roots)
write_to_obs(adata, "root_cells", roots, cell_subset)
T = transition_matrix(_adata, vkey=vkey, backward=False, **kwargs)
eigvecs_ends = eigs(T, eps=eps, perc=[2, 98], random_state=random_state)[1]
ends = csr_matrix.dot(connectivities, eigvecs_ends).sum(1)
ends = scale(np.clip(ends, 0, np.percentile(ends, 98)))
write_to_obs(adata, "end_points", ends, cell_subset)
n_roots, n_ends = eigvecs_roots.shape[1], eigvecs_ends.shape[1]
groups_str = f" ({groups})" if isinstance(groups, str) else ""
roots_str = f"{n_roots} {'regions' if n_roots > 1 else 'region'}"
ends_str = f"{n_ends} {'regions' if n_ends > 1 else 'region'}"
logg.info(
f" identified {roots_str} of root cells "
z = tl.dot(tc) / tc.dot(tc)
tl_conf = (1 - np.abs(tl / np.max(tl) - tc * z / np.max(tl))) ** 2
idx_low_confidence = tl_conf < min_confidence
if weight_diffusion is not None:
w = weight_diffusion
latent_time = (1 - w) * latent_time + w * vpt
latent_time[idx_low_confidence] = vpt[idx_low_confidence]
else:
conn_new = conn.copy()
conn_new[:, idx_low_confidence] = 0
conn_new.eliminate_zeros()
latent_time = conn_new.dot(latent_time)
latent_time = scale(latent_time)
if t_max is not None:
latent_time *= t_max
adata.obs["latent_time"] = latent_time
logg.info(" finished", time=True, end=" " if settings.verbosity > 2 else "\n")
logg.hint("added \n" " 'latent_time', shared time (adata.obs)")
return adata if copy else None
if min_corr_diffusion is not None:
corr = vcorrcoef(t.T, vpt)
t = t[:, np.array(corr >= min_corr_diffusion, dtype=bool)]
if root_key in adata.uns.keys():
root = adata.uns[root_key]
t, t_ = root_time(t, root=root)
latent_time = compute_shared_time(t)
else:
roots = roots[:4]
latent_time = np.ones(shape=(len(roots), adata.n_obs))
for i, root in enumerate(roots):
t, t_ = root_time(t, root=root)
latent_time[i] = compute_shared_time(t)
latent_time = scale(np.mean(latent_time, axis=0))
if fates[0] is not None:
fates = fates[:4]
latent_time_ = np.ones(shape=(len(fates), adata.n_obs))
for i, fate in enumerate(fates):
t, t_ = root_time(t, root=fate)
latent_time_[i] = 1 - compute_shared_time(t)
latent_time = scale(latent_time + 0.2 * scale(np.mean(latent_time_, axis=0)))
tl = latent_time
tc = conn.dot(latent_time)
z = tl.dot(tc) / tc.dot(tc)
tl_conf = (1 - np.abs(tl / np.max(tl) - tc * z / np.max(tl))) ** 2
idx_low_confidence = tl_conf < min_confidence
vpt.compute_transitions()
vpt.compute_eigen(n_comps=n_dcs)
vpt.set_iroot(root_key)
vpt.compute_pseudotime()
dpt_root = vpt.pseudotime
vpt.set_iroot(end_key)
vpt.compute_pseudotime(inverse=True)
dpt_end = vpt.pseudotime
# merge dpt_root and inverse dpt_end together
vpt.pseudotime = np.nan_to_num(dpt_root) + np.nan_to_num(dpt_end)
vpt.pseudotime[np.isfinite(dpt_root) & np.isfinite(dpt_end)] /= 2
vpt.pseudotime = scale(vpt.pseudotime)
vpt.pseudotime[np.isnan(dpt_root) & np.isnan(dpt_end)] = np.nan
if "n_branchings" in kwargs and kwargs["n_branchings"] > 0:
vpt.branchings_segments()
else:
vpt.indices = vpt.pseudotime.argsort()
if f"{vkey}_pseudotime" not in adata.obs.keys():
pseudotime = np.empty(adata.n_obs)
pseudotime[:] = np.nan
else:
pseudotime = adata.obs[f"{vkey}_pseudotime"].values
pseudotime[cell_subset] = vpt.pseudotime
adata.obs[f"{vkey}_pseudotime"] = np.array(pseudotime, dtype=np.float64)
if save_diffmap:
groups = cat if cat is not None else groups
cell_subset = groups_to_bool(adata, groups=groups, groupby=groupby)
_adata = adata if groups is None else adata[cell_subset]
connectivities = get_connectivities(_adata, "distances")
T = transition_matrix(_adata, vkey=vkey, backward=True, **kwargs)
eigvecs_roots = eigs(T, eps=eps, perc=[2, 98], random_state=random_state)[1]
roots = csr_matrix.dot(connectivities, eigvecs_roots).sum(1)
roots = scale(np.clip(roots, 0, np.percentile(roots, 98)))
roots = verify_roots(_adata, roots)
write_to_obs(adata, "root_cells", roots, cell_subset)
T = transition_matrix(_adata, vkey=vkey, backward=False, **kwargs)
eigvecs_ends = eigs(T, eps=eps, perc=[2, 98], random_state=random_state)[1]
ends = csr_matrix.dot(connectivities, eigvecs_ends).sum(1)
ends = scale(np.clip(ends, 0, np.percentile(ends, 98)))
write_to_obs(adata, "end_points", ends, cell_subset)
n_roots, n_ends = eigvecs_roots.shape[1], eigvecs_ends.shape[1]
groups_str = f" ({groups})" if isinstance(groups, str) else ""
roots_str = f"{n_roots} {'regions' if n_roots > 1 else 'region'}"
ends_str = f"{n_ends} {'regions' if n_ends > 1 else 'region'}"
logg.info(
f" identified {roots_str} of root cells "
f"and {ends_str} of end points {groups_str}."
)
logg.info(" finished", time=True, end=" " if settings.verbosity > 2 else "\n")
logg.hint(
"added\n"
" 'root_cells', root cells of Markov diffusion process (adata.obs)\n"