Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
xkey=xkey,
tkey=tkey,
basis=basis,
n_neighbors=n_neighbors,
approx=approx,
n_recurse_neighbors=n_recurse_neighbors,
random_neighbors_at_max=random_neighbors_at_max,
sqrt_transform=sqrt_transform,
gene_subset=gene_subset,
compute_uncertainties=compute_uncertainties,
report=True,
mode_neighbors=mode_neighbors,
)
if isinstance(basis, str):
logg.warn(
f"The velocity graph is computed on {basis} embedding coordinates.\n"
f" Consider computing the graph in an unbiased manner \n"
f" on full expression space by not specifying basis.\n"
)
logg.info("computing velocity graph", r=True)
vgraph.compute_cosines()
adata.uns[f"{vkey}_graph"] = vgraph.graph
adata.uns[f"{vkey}_graph_neg"] = vgraph.graph_neg
if vgraph.uncertainties is not None:
adata.uns[f"{vkey}_graph_uncertainties"] = vgraph.uncertainties
adata.obs[f"{vkey}_self_transition"] = vgraph.self_prob
adata = data.copy() if copy else data
layers = [layer for layer in {"spliced", "unspliced"} if layer in adata.layers]
if any([not_yet_normalized(adata.layers[layer]) for layer in layers]):
normalize_per_cell(adata)
if n_neighbors is not None and n_neighbors > get_n_neighs(adata):
if use_rep is None:
use_rep = "X_pca"
neighbors(
adata, n_neighbors=n_neighbors, use_rep=use_rep, n_pcs=n_pcs, method=method
)
verify_neighbors(adata)
if "spliced" not in adata.layers.keys() or "unspliced" not in adata.layers.keys():
logg.warn("Skipping moments, because un/spliced counts were not found.")
else:
logg.info(f"computing moments based on {mode}", r=True)
connectivities = get_connectivities(
adata, mode, n_neighbors=n_neighbors, recurse_neighbors=False
)
adata.layers["Ms"] = (
csr_matrix.dot(connectivities, csr_matrix(adata.layers["spliced"]))
.astype(np.float32)
.A
)
adata.layers["Mu"] = (
csr_matrix.dot(connectivities, csr_matrix(adata.layers["unspliced"]))
.astype(np.float32)
.A
)
adj_tree=None,
root=0,
layout_kwds=None,
):
import networkx as nx
np.random.seed(random_state)
random.seed(random_state)
nx_g_solid = nx.Graph(adjacency_solid)
if layout is None:
layout = "fr"
if layout == "fa":
try:
import fa2
except:
logg.warn(
"Package 'fa2' is not installed, falling back to layout 'fr'."
"To use the faster and better ForceAtlas2 layout, "
"install package 'fa2' (`pip install fa2`)."
)
layout = "fr"
if layout == "fa":
init_coords = (
np.random.random((adjacency_solid.shape[0], 2))
if init_pos is None
else init_pos.copy()
)
forceatlas2 = fa2.ForceAtlas2(
outboundAttractionDistribution=False,
linLogMode=False,
adjustSizes=False,
edgeWeightInfluence=1.0,
filter_genes_dispersion(adata, n_top_genes=n_top_genes, flavor=flavor)
log_advised = (
np.allclose(adata.X[:10].sum(), adata.layers["spliced"][:10].sum())
if "spliced" in adata.layers.keys()
else True
)
if log and log_advised:
log1p(adata)
if log and log_advised:
logg.info("Logarithmized X.")
elif log and not log_advised:
logg.warn("Did not modify X as it looks preprocessed already.")
elif log_advised and not log:
logg.warn("Consider logarithmizing X with `scv.pp.log1p` for better results.")
return adata if copy else None
If set to None, the layers {'X', 'spliced', 'unspliced'} are considered for
normalization upon testing whether they have already been normalized
(by checking type of entries: int -> unprocessed, float -> processed).
copy: `bool` (default: `False`)
Return a copy of `adata` instead of updating it.
**kwargs:
Keyword arguments passed to pp.normalize_per_cell (e.g. counts_per_cell).
Returns
-------
Returns or updates `adata` depending on `copy`.
"""
adata = data.copy() if copy else data
if "spliced" not in adata.layers.keys() or "unspliced" not in adata.layers.keys():
logg.warn("Could not find spliced / unspliced counts.")
filter_genes(
adata,
min_counts=min_counts,
min_counts_u=min_counts_u,
min_cells=min_cells,
min_cells_u=min_cells_u,
min_shared_counts=min_shared_counts,
min_shared_cells=min_shared_cells,
)
if layers_normalize is not None and "enforce" not in kwargs:
kwargs["enforce"] = True
normalize_per_cell(adata, layers=layers_normalize, **kwargs)
if n_top_genes is not None:
velocity vectors for each individual cell
variance_velocity: `.layers`
velocity vectors for the cell variances
velocity_offset, velocity_beta, velocity_gamma, velocity_r2: `.var`
parameters
"""
adata = data.copy() if copy else data
if not use_raw and 'Ms' not in adata.layers.keys(): moments(adata)
logg.info('computing velocities', r=True)
strings_to_categoricals(adata)
if mode is None or (mode == 'dynamical' and 'fit_alpha' not in adata.var.keys()):
mode = 'stochastic'
logg.warn('Falling back to stochastic model. For the dynamical model run tl.recover_dynamics first.')
if mode in {'dynamical', 'dynamical_residuals'}:
from .dynamical_model_utils import mRNA, vectorize, get_reads, get_vars, get_divergence
gene_subset = ~np.isnan(adata.var['fit_alpha'].values)
vdata = adata[:, gene_subset]
alpha, beta, gamma, scaling, t_ = get_vars(vdata)
connect = not adata.uns['recover_dynamics']['use_raw']
kwargs_ = {'kernel_width': None, 'normalized': True, 'var_scale': True, 'reg_par': None, 'min_confidence': 1e-2,
'constraint_time_increments': False, 'fit_steady_states': True, 'fit_basal_transcription': None,
'use_connectivities': connect, 'time_connectivities': connect, 'use_latent_time': use_latent_time}
kwargs_.update(adata.uns['recover_dynamics'])
kwargs_.update(**kwargs)
if 'residuals' in mode: