Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_einsum():
from scvelo.tools.utils import prod_sum_obs, prod_sum_var, norm
Ms, Mu = np.random.rand(5, 4), np.random.rand(5, 4)
assert np.allclose(prod_sum_obs(Ms, Mu), np.sum(Ms * Mu, 0))
assert np.allclose(prod_sum_var(Ms, Mu), np.sum(Ms * Mu, 1))
assert np.allclose(norm(Ms), np.linalg.norm(Ms, axis=1))
if T is None
else T
)
T.setdiag(0)
T.eliminate_zeros()
densify = adata.n_obs < 1e4
TA = T.A if densify else None
with warnings.catch_warnings():
warnings.simplefilter("ignore")
for i in range(adata.n_obs):
indices = T[i].indices
dX = X_emb[indices] - X_emb[i, None] # shape (n_neighbors, 2)
if not retain_scale:
dX /= norm(dX)[:, None]
dX[np.isnan(dX)] = 0 # zero diff in a steady-state
probs = TA[i, indices] if densify else T[i].data
V_emb[i] = probs.dot(dX) - probs.mean() * dX.sum(0)
if retain_scale:
X = (
adata.layers["Ms"]
if "Ms" in adata.layers.keys()
else adata.layers["spliced"]
)
delta = T.dot(X[:, vgenes]) - X[:, vgenes]
if issparse(delta):
delta = delta.A
cos_proj = (V * delta).sum(1) / norm(delta)
V_emb *= np.clip(cos_proj[:, None] * 10, 0, 1)
adata = data.copy() if copy else data
if vkey not in adata.layers.keys():
raise ValueError("You need to run `tl.velocity` first.")
V = np.array(adata.layers[vkey])
tmp_filter = np.invert(np.isnan(np.sum(V, axis=0)))
if f"{vkey}_genes" in adata.var.keys():
tmp_filter &= np.array(adata.var[f"{vkey}_genes"], dtype=bool)
if "spearmans_score" in adata.var.keys():
tmp_filter &= adata.var["spearmans_score"].values > 0.1
V = V[:, tmp_filter]
V -= V.mean(1)[:, None]
V_norm = norm(V)
R = np.zeros(adata.n_obs)
indices = get_indices(dist=get_neighs(adata, "distances"))[0]
for i in range(adata.n_obs):
Vi_neighs = V[indices[i]]
Vi_neighs -= Vi_neighs.mean(1)[:, None]
R[i] = np.mean(
np.einsum("ij, j", Vi_neighs, V[i]) / (norm(Vi_neighs) * V_norm[i])[None, :]
)
adata.obs[f"{vkey}_length"] = V_norm.round(2)
adata.obs[f"{vkey}_confidence"] = np.clip(R, 0, None)
logg.hint(f"added '{vkey}_length' (adata.obs)")
logg.hint(f"added '{vkey}_confidence' (adata.obs)")
tmp_filter = np.invert(np.isnan(np.sum(V, axis=0)))
if f"{vkey}_genes" in adata.var.keys():
tmp_filter &= np.array(adata.var[f"{vkey}_genes"], dtype=bool)
if "spearmans_score" in adata.var.keys():
tmp_filter &= adata.var["spearmans_score"].values > 0.1
V = V[:, tmp_filter]
X = X[:, tmp_filter]
T = transition_matrix(adata, vkey=vkey, scale=scale)
dX = T.dot(X) - X
dX -= dX.mean(1)[:, None]
V -= V.mean(1)[:, None]
norms = norm(dX) * norm(V)
norms += norms == 0
adata.obs[f"{vkey}_confidence_transition"] = prod_sum_var(dX, V) / norms
logg.hint(f"added '{vkey}_confidence_transition' (adata.obs)")
return adata if copy else None
tmp_filter &= np.array(adata.var[f"{vkey}_genes"], dtype=bool)
if "spearmans_score" in adata.var.keys():
tmp_filter &= adata.var["spearmans_score"].values > 0.1
V = V[:, tmp_filter]
V -= V.mean(1)[:, None]
V_norm = norm(V)
R = np.zeros(adata.n_obs)
indices = get_indices(dist=get_neighs(adata, "distances"))[0]
for i in range(adata.n_obs):
Vi_neighs = V[indices[i]]
Vi_neighs -= Vi_neighs.mean(1)[:, None]
R[i] = np.mean(
np.einsum("ij, j", Vi_neighs, V[i]) / (norm(Vi_neighs) * V_norm[i])[None, :]
)
adata.obs[f"{vkey}_length"] = V_norm.round(2)
adata.obs[f"{vkey}_confidence"] = np.clip(R, 0, None)
logg.hint(f"added '{vkey}_length' (adata.obs)")
logg.hint(f"added '{vkey}_confidence' (adata.obs)")
if f"{vkey}_confidence_transition" not in adata.obs.keys():
velocity_confidence_transition(adata, vkey)
return adata if copy else None
logg.switch_verbosity("off")
adata_subset = adata.copy()
subset = random_subsample(adata_subset, fraction=fraction, return_subset=True)
neighbors(adata_subset)
moments(adata_subset)
velocity(adata_subset, vkey=vkey)
logg.switch_verbosity("on")
else:
subset = adata.obs_names.isin(adata_subset.obs_names)
V = adata[subset].layers[vkey]
V_subset = adata_subset.layers[vkey]
score = np.nan * (subset == False)
score[subset] = prod_sum_var(V, V_subset) / (norm(V) * norm(V_subset))
adata.obs[f"{vkey}_score_robustness"] = score
return adata_subset if copy else None
if t0 >= 0 and t1 > 0:
t1_idx = np.where(self.t0 == t1)[0]
if len(t1_idx) > len(neighs_idx):
t1_idx = np.random.choice(
t1_idx, len(neighs_idx), replace=False
)
if len(t1_idx) > 0:
neighs_idx = np.unique(np.concatenate([neighs_idx, t1_idx]))
dX = self.X[neighs_idx] - self.X[i, None] # 60% of runtime
if self.sqrt_transform:
dX = np.sqrt(np.abs(dX)) * np.sign(dX)
val = cosine_correlation(dX, self.V[i]) # 40% of runtime
if self.compute_uncertainties:
dX /= norm(dX)[:, None]
uncertainties.extend(np.nansum(dX ** 2 * m[i][None, :], 1))
vals.extend(val)
rows.extend(np.ones(len(neighs_idx)) * i)
cols.extend(neighs_idx)
if self.report:
progress.update()
if self.report:
progress.finish()
vals = np.hstack(vals)
vals[np.isnan(vals)] = 0
self.graph, self.graph_neg = vals_to_csr(
vals, rows, cols, shape=(n_obs, n_obs), split_negative=True
)
if not retain_scale:
dX /= norm(dX)[:, None]
dX[np.isnan(dX)] = 0 # zero diff in a steady-state
probs = TA[i, indices] if densify else T[i].data
V_emb[i] = probs.dot(dX) - probs.mean() * dX.sum(0)
if retain_scale:
X = (
adata.layers["Ms"]
if "Ms" in adata.layers.keys()
else adata.layers["spliced"]
)
delta = T.dot(X[:, vgenes]) - X[:, vgenes]
if issparse(delta):
delta = delta.A
cos_proj = (V * delta).sum(1) / norm(delta)
V_emb *= np.clip(cos_proj[:, None] * 10, 0, 1)
if autoscale:
V_emb /= 3 * quiver_autoscale(X_emb, V_emb)
if f"{vkey}_params" in adata.uns.keys():
adata.uns[f"{vkey}_params"]["embeddings"] = (
[]
if "embeddings" not in adata.uns[f"{vkey}_params"]
else list(adata.uns[f"{vkey}_params"]["embeddings"])
)
adata.uns[f"{vkey}_params"]["embeddings"].extend([basis])
vkey += f"_{basis}"
adata.obsm[vkey] = V_emb