Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# init coordinates
if init_pos in adata.obsm.keys():
init_coords = adata.obsm[init_pos]
elif init_pos == 'paga' or init_pos:
init_coords = get_init_pos_from_paga(
adata, adjacency, random_state=random_state
)
else:
np.random.seed(random_state)
init_coords = np.random.random((adjacency.shape[0], 2))
# see whether fa2 is installed
if layout == 'fa':
try:
from fa2 import ForceAtlas2
except ImportError:
logg.warning(
"Package 'fa2' is not installed, falling back to layout 'fr'."
"To use the faster and better ForceAtlas2 layout, "
"install package 'fa2' (`pip install fa2`)."
)
layout = 'fr'
# actual drawing
if layout == 'fa':
forceatlas2 = ForceAtlas2(
# Behavior alternatives
outboundAttractionDistribution=False, # Dissuade hubs
linLogMode=False, # NOT IMPLEMENTED
adjustSizes=False, # Prevent overlap (NOT IMPLEMENTED)
edgeWeightInfluence=1.0,
# Performance
jitterTolerance=1.0, # Tolerance
barnesHutOptimize=True,
nx.draw_networkx_edges(g_dir, pos, ax=ax, width=widths, edge_color='black', arrowsize=arrowsize)
if export_to_gexf:
if isinstance(colors[0], tuple):
from matplotlib.colors import rgb2hex
colors = [rgb2hex(c) for c in colors]
for count, n in enumerate(nx_g_solid.nodes()):
nx_g_solid.node[count]['label'] = str(node_labels[count])
nx_g_solid.node[count]['color'] = str(colors[count])
nx_g_solid.node[count]['viz'] = dict(position=dict(
x=1000 * pos[count][0],
y=1000 * pos[count][1],
z=0,
))
filename = settings.writedir / 'paga_graph.gexf'
logg.warning(f'exporting to {filename}')
settings.writedir.mkdir(parents=True, exist_ok=True)
nx.write_gexf(nx_g_solid, settings.writedir / 'paga_graph.gexf')
ax.set_frame_on(frameon)
ax.set_xticks([])
ax.set_yticks([])
# groups sizes
if groups_key is not None and groups_key + '_sizes' in adata.uns:
groups_sizes = adata.uns[groups_key + '_sizes']
else:
groups_sizes = np.ones(len(node_labels))
base_scale_scatter = 2000
base_pie_size = (base_scale_scatter / (np.sqrt(adjacency_solid.shape[0]) + 10)
* node_size_scale)
median_group_size = np.median(groups_sizes)
def _check_var_names_type(var_names, var_group_labels, var_group_positions):
"""
checks if var_names is a dict. Is this is the cases, then set the
correct values for var_group_labels and var_group_positions
Returns
-------
var_names, var_group_labels, var_group_positions
"""
if isinstance(var_names, cabc.Mapping):
if var_group_labels is not None or var_group_positions is not None:
logg.warning(
"`var_names` is a dictionary. This will reset the current "
"value of `var_group_labels` and `var_group_positions`."
)
var_group_labels = []
_var_names = []
var_group_positions = []
start = 0
for label, vars_list in var_names.items():
if isinstance(vars_list, str):
vars_list = [vars_list]
# use list() in case var_list is a numpy array or pandas series
_var_names.extend(list(vars_list))
var_group_labels.append(label)
var_group_positions.append((start, start + len(vars_list) - 1))
start += len(vars_list)
var_names = _var_names
var_names_idx_ordered = []
for idx in categories_idx_ordered:
position = var_group_positions[idx]
_var_names = var_names[position[0] : position[1] + 1]
var_names_idx_ordered.extend(
range(position[0], position[1] + 1)
)
positions_ordered.append(
(position_start, position_start + len(_var_names) - 1)
)
position_start += len(_var_names)
labels_ordered.append(var_group_labels[idx])
var_group_labels = labels_ordered
var_group_positions = positions_ordered
else:
logg.warning(
"Groups are not reordered because the `groupby` categories "
"and the `var_group_labels` are different.\n"
f"categories: {_format_first_three_categories(categories)}\n"
f"var_group_labels: {_format_first_three_categories(var_group_labels)}"
)
else:
var_names_idx_ordered = None
return dict(
categories_idx_ordered=categories_idx_ordered,
var_names_idx_ordered=var_names_idx_ordered,
var_group_labels=var_group_labels,
var_group_positions=var_group_positions,
)
def _tail_mean_estimate(adata, mask, model='rough'):
# Method ZINB will be implemented soon
if model not in {'rough', 'zinb'}:
model = 'rough'
logg.warning('Model should be either rough or zinb (zero-inflated negative binomial)')
X = adata.X[mask, :]
n_cells = X.shape[0]
n_genes = X.shape[1]
means = np.zeros((n_genes,))
if model == 'rough':
if issparse(X):
n_nonzero_elements = X.getnnz(axis=0)
# More efficient to use in flattened form, use indexing. Since highly sparsified, no memory issue
# Note that fulldata is flattened
fulldata = X.data
left = 0
right = 0
for i, j in enumerate(n_nonzero_elements):
right = right + j
means[i] = np.mean(fulldata[left:right])
left = right
def savefig(writekey, dpi=None, ext=None):
"""Save current figure to file.
The `filename` is generated as follows:
filename = settings.figdir / (writekey + settings.plot_suffix + '.' + settings.file_format_figs)
"""
if dpi is None:
# we need this as in notebooks, the internal figures are also influenced by 'savefig.dpi' this...
if not isinstance(rcParams['savefig.dpi'], str) and rcParams['savefig.dpi'] < 150:
if settings._low_resolution_warning:
logg.warning(
'You are using a low resolution (dpi<150) for saving figures.\n'
'Consider running `set_figure_params(dpi_save=...)`, which will '
"adjust `matplotlib.rcParams['savefig.dpi']`"
)
settings._low_resolution_warning = False
else:
dpi = rcParams['savefig.dpi']
settings.figdir.mkdir(parents=True, exist_ok=True)
if ext is None: ext = settings.file_format_figs
filename = settings.figdir / f'{writekey}{settings.plot_suffix}.{ext}'
# output the following msg at warning level; it's really important for the user
logg.warning(f'saving figure to file {filename}')
pl.savefig(filename, dpi=dpi, bbox_inches='tight')
which = 'LM' if sort == 'decrease' else 'SM'
# it pays off to increase the stability with a bit more precision
matrix = matrix.astype(np.float64)
evals, evecs = scipy.sparse.linalg.eigsh(
matrix, k=n_comps, which=which, ncv=ncv
)
evals, evecs = evals.astype(np.float32), evecs.astype(np.float32)
if sort == 'decrease':
evals = evals[::-1]
evecs = evecs[:, ::-1]
logg.info(
' eigenvalues of transition matrix\n'
' {}'.format(str(evals).replace('\n', '\n '))
)
if self._number_connected_components > len(evals)/2:
logg.warning('Transition matrix has many disconnected components!')
self._eigen_values = evals
self._eigen_basis = evecs
early_exaggeration=early_exaggeration,
learning_rate=learning_rate,
)
n_jobs = settings.n_jobs if n_jobs is None else n_jobs
# deal with different tSNE implementations
X_tsne = None
if n_jobs >= 1 and use_fast_tsne:
try:
from MulticoreTSNE import MulticoreTSNE as TSNE
tsne = TSNE(n_jobs=n_jobs, **params_sklearn)
logg.info(" using the 'MulticoreTSNE' package by Ulyanov (2017)")
# need to transform to float64 for MulticoreTSNE...
X_tsne = tsne.fit_transform(X.astype('float64'))
except ImportError:
logg.warning(
'Consider installing the package MulticoreTSNE '
'(https://github.com/DmitryUlyanov/Multicore-TSNE). '
'Even for n_jobs=1 this speeds up the computation considerably '
'and might yield better converged results.'
)
if X_tsne is None:
from sklearn.manifold import TSNE
from . import _tsne_fix # fix by D. DeTomaso for sklearn < 0.19
# unfortunately, sklearn does not allow to set a minimum number
# of iterations for barnes-hut tSNE
tsne = TSNE(**params_sklearn)
logg.info(' using sklearn.manifold.TSNE with a fix by D. DeTomaso')
X_tsne = tsne.fit_transform(X)
# update AnnData instance
adata.obsm['X_tsne'] = X_tsne # annotate samples with tSNE coordinates