Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'You need to run `pp.neighbors` first '
'to compute a neighborhood graph.'
)
if adjacency is None:
adjacency = adata.uns['neighbors']['connectivities']
if restrict_to is not None:
restrict_key, restrict_categories = restrict_to
adjacency, restrict_indices = restrict_adjacency(
adata,
restrict_key,
restrict_categories,
adjacency,
)
if flavor in {'vtraag', 'igraph'}:
if flavor == 'igraph' and resolution is not None:
logg.warning(
'`resolution` parameter has no effect for flavor "igraph"'
)
if directed and flavor == 'igraph':
directed = False
if not directed: logg.debug(' using the undirected graph')
g = _utils.get_igraph_from_adjacency(adjacency, directed=directed)
if use_weights:
weights = np.array(g.es["weight"]).astype(np.float64)
else:
weights = None
if flavor == 'vtraag':
import louvain
if partition_type is None:
partition_type = louvain.RBConfigurationVertexPartition
if resolution is not None:
partition_kwargs["resolution_parameter"] = resolution
def paul15() -> AnnData:
"""\
Development of Myeloid Progenitors [Paul15]_.
Non-logarithmized raw data.
The data has been sent out by Email from the Amit Lab. An R version for
loading the data can be found here
https://github.com/theislab/scAnalysisTutorial
Returns
-------
Annotated data matrix.
"""
logg.warning(
'In Scanpy 0.*, this returned logarithmized data. '
'Now it returns non-logarithmized data.'
)
import h5py
filename = settings.datasetdir / 'paul15/paul15.h5'
backup_url = 'http://falexwolf.de/data/paul15.h5'
_utils.check_presence_download(filename, backup_url)
with h5py.File(filename, 'r') as f:
X = f['data.debatched'][()]
gene_names = f['data.debatched_rownames'][()].astype(str)
cell_names = f['data.debatched_colnames'][()].astype(str)
clusters = f['cluster.id'][()].flatten().astype(int)
infogenes_names = f['info.genes_strings'][()].astype(str)
# each row has to correspond to a observation, therefore transpose
adata = AnnData(X.transpose())
>>> import scanpy.external as sce
>>> import phate
>>> tree_data, tree_clusters = phate.tree.gen_dla(
... n_dim=100,
... n_branch=20,
... branch_length=100,
... )
>>> tree_data.shape
(2000, 100)
>>> adata = AnnData(tree_data)
>>> sce.tl.phate(adata, k=5, a=20, t=150)
>>> adata.obsm['X_phate'].shape
(2000, 2)
>>> sce.pl.phate(adata)
"""
start = logg.info('computing PHATE')
adata = adata.copy() if copy else adata
verbosity = settings.verbosity if verbose is None else verbose
verbose = verbosity if isinstance(verbosity, bool) else verbosity >= 2
n_jobs = settings.n_jobs if n_jobs is None else n_jobs
try:
import phate
except ImportError:
raise ImportError(
'You need to install the package `phate`: please run `pip install '
'--user phate` in a terminal.'
)
X_phate = phate.PHATE(
n_components=n_components,
k=k,
a=a,
n_landmark=n_landmark,
if X.shape[0] < 4096:
X = pairwise_distances(X, metric=metric, **metric_kwds)
metric = 'precomputed'
knn_indices, knn_distances, forest = compute_neighbors_umap(
X, n_neighbors, random_state, metric=metric, metric_kwds=metric_kwds)
# very cautious here
try:
if forest:
self._rp_forest = _make_forest_dict(forest)
except:
pass
# write indices as attributes
if write_knn_indices:
self.knn_indices = knn_indices
self.knn_distances = knn_distances
start_connect = logg.debug('computed neighbors', time=start_neighbors)
if not use_dense_distances or method in {'umap', 'rapids'}:
# we need self._distances also for method == 'gauss' if we didn't
# use dense distances
self._distances, self._connectivities = _compute_connectivities_umap(
knn_indices,
knn_distances,
self._adata.shape[0],
self.n_neighbors,
)
# overwrite the umap connectivities if method is 'gauss'
# self._distances is unaffected by this
if method == 'gauss':
self._compute_connectivities_diffmap()
logg.debug('computed connectivities', time=start_connect)
self._number_connected_components = 1
if issparse(self._connectivities):
smp, var = self._normalize_indices(index)
X = self.X[smp, var]
smp_ann = self.smp[smp]
var_ann = self.var[var]
assert smp_ann.shape[0] == X.shape[0], (smp, smp_ann)
assert var_ann.shape[0] == X.shape[1], (var, var_ann)
add_ann = self.add.copy()
# slice sparse spatrices of n_smps × n_smps in self.add
if not (isinstance(smp, slice) and
smp.start is None and smp.step is None and smp.stop is None):
raised_warning = False
for k, v in self.add.items(): # TODO: make sure this really works as expected
if isinstance(v, sp.spmatrix) and v.shape == (self.n_smps, self.n_smps):
add_ann[k] = v.tocsc()[:, smp].tocsr()[smp, :]
if not raised_warning:
logg.warn('Slicing adjacency matrices can be dangerous. '
'Consider recomputing the data graph.')
raised_warning = True
adata = AnnData(X, smp_ann, var_ann, add_ann)
return adata
)
# Palantir normalizations
if not inplace:
adata = adata.copy()
data_df = adata.to_df()
if normalize:
data_df = palantir.preprocess.normalize_counts(data_df)
logg.info('data normalized ...')
if log_transform:
data_df = palantir.preprocess.log_transform(data_df)
logg.info('data log transformed ...')
if filter_low:
data_df = palantir.preprocess.filter_counts_data(data_df)
logg.info(
'data filtered for low counts:\n'
'\tcell_min_molecules=1000\n'
'\tgenes_min_cells=10'
)
if normalize or log_transform or filter_low:
adata.uns['palantir_norm_data'] = data_df
# Processing
logg.info('PCA in progress ...')
pca_projections, var_r = palantir.utils.run_pca(data_df)
adata.uns['palantir_pca_results'] = dict(
pca_projections=pca_projections,
variance_ratio=var_r,
)
`termine_states` parameter.
>>> start_cell = 'Run5_164698952452459'
>>> pr_res = d.palantir.core.run_palantir(d.ms_data, start_cell, num_waypoints=500)
>>> palantir.plot.plot_palantir_results(pr_res, d.tsne)
.. note::
A `start_cell` must be defined for every data set. The start cell for
this dataset was chosen based on high expression of CD34.
For further demonstration of palantir visualizations please follow this notebook
`Palantir_sample_notebook.ipynb `_.
It provides a comprehensive guide to draw *gene expression trends*, amongst other things.
"""
logg.info('Palantir diffusion maps')
try:
import palantir
except ImportError:
raise ImportError(
'\nplease install palantir: \n\n'
'\tgit clone git://github.com/dpeerlab/Palantir.git\n'
'\tcd Palantir\n'
'\tsudo -H pip3 install .'
)
# Palantir normalizations
if not inplace:
adata = adata.copy()
data_df = adata.to_df()
adata: AnnData,
color_map: Union[str, Colormap] = None,
show: Optional[bool] = None,
save: Optional[bool] = None,
as_heatmap: bool = True,
):
"""\
Heatmap of pseudotime series.
Parameters
----------
as_heatmap
Plot the timeseries as heatmap.
"""
if adata.n_vars > 100:
logg.warning(
'Plotting more than 100 genes might take some while, '
'consider selecting only highly variable genes, for example.'
)
# only if number of genes is not too high
if as_heatmap:
# plot time series as heatmap, as in Haghverdi et al. (2016), Fig. 1d
timeseries_as_heatmap(
adata.X[adata.obs['dpt_order_indices'].values],
var_names=adata.var_names,
highlights_x=adata.uns['dpt_changepoints'],
color_map=color_map,
)
else:
# plot time series as gene expression vs time
timeseries(
adata.X[adata.obs['dpt_order_indices'].values],
)
try:
path1 = [str(x) for x in nx.shortest_path(g1, int(r), int(s))]
except nx.NetworkXNoPath:
path1 = None
try:
path2 = [str(x) for x in nx.shortest_path(g2, int(r2), int(s2))]
except nx.NetworkXNoPath:
path2 = None
if path1 is None and path2 is None:
# consistent behavior
n_paths += 1
n_agreeing_paths += 1
n_steps += 1
n_agreeing_steps += 1
logg.debug('there are no connecting paths in both graphs')
continue
elif path1 is None or path2 is None:
# non-consistent result
n_paths += 1
n_steps += 1
continue
if len(path1) >= len(path2):
path_mapped = [asso_groups1[l] for l in path1]
path_compare = path2
path_compare_id = 2
path_compare_orig_names = [[orig_names2[int(s)] for s in l] for l in path_compare]
path_mapped_orig_names = [[orig_names2[int(s)] for s in l] for l in path_mapped]
else:
path_mapped = [asso_groups2[l] for l in path2]
path_compare = path1
path_compare_id = 1
def _init_iroot(self):
self.iroot = None
# set iroot directly
if 'iroot' in self._adata.uns:
if self._adata.uns['iroot'] >= self._adata.n_obs:
logg.warning(
f'Root cell index {self._adata.uns["iroot"]} does not '
f'exist for {self._adata.n_obs} samples. It’s ignored.'
)
else:
self.iroot = self._adata.uns['iroot']
return
# set iroot via xroot
xroot = None
if 'xroot' in self._adata.uns: xroot = self._adata.uns['xroot']
elif 'xroot' in self._adata.var: xroot = self._adata.var['xroot']
# see whether we can set self.iroot using the full data matrix
if xroot is not None and xroot.size == self._adata.shape[1]:
self._set_iroot_via_xroot(xroot)