Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_indices = snap.particle_indices(particle_type)
else:
_indices = [snap.particle_indices(particle_type)]
for type_indices in _indices:
if len(type_indices) == 0:
continue
if chunk_size is not None:
n_chunks = int(len(type_indices) / chunk_size)
array_chunks = np.array_split(type_indices, n_chunks)
else:
n_chunks = 1
array_chunks = [type_indices]
if verbose and n_chunks > 1:
logger.info(f'Number of chunks: {n_chunks}', flush=True)
logger.info(f'Chunk size: {chunk_size}', flush=True)
for idx, indices in enumerate(array_chunks):
if verbose:
if n_chunks > 1:
logger.info(
f'Finding neighbours for chunk: {idx}...', flush=True
)
else:
logger.info(f'Finding neighbours...', flush=True)
_neighbours = snap.get_many_neighbours(indices)
neighbours = List()
for neigh in _neighbours:
neighbours.append(np.array(neigh))
if verbose:
def next(self, number: int = 1):
"""Visualize next snap."""
idx = self._where + number
if idx < len(self):
self._fn(idx)
self._where += number
else:
logger.info('Too far forward. Going to last snap.')
self._fn(len(self) - 1)
self._where = len(self) - 1
ax=ax,
**kwargs,
)
images += ax.images
else:
images = [image for ax in fig.axes for image in ax.images]
if text is not None:
_text = ax.text(
0.9, 0.9, text[0], ha='right', transform=ax.transAxes, **text_kwargs
)
if tqdm is not None:
pbar = tqdm(total=len(snaps))
else:
logger.info(
'progress bar not available\ntry pip install tqdm --or-- conda install tqdm'
)
def animate(idx):
if tqdm is not None:
pbar.update(n=1)
_kwargs = {k: v for k, v in kwargs.items() if k in _interp_kwargs}
for quantity, x, y, extent, image in zip(quantities, xs, ys, extents, images):
if extent == (-1, -1, -1, -1):
_extent = get_extent_from_percentile(snaps[idx], x, y)
else:
_extent = extent
interp_data = interpolate(
snap=snaps[idx],
interp=interp,
quantity=quantity,
lines = ax.lines
if text is not None:
texts = [
ax.text(
0.9, 0.9, text[0], ha='right', transform=ax.transAxes, **text_kwargs
)
]
else:
lines = [line for ax in fig.axes for line in ax.lines]
if text is not None:
texts = [text for ax in fig.axes for text in ax.texts]
if tqdm is not None:
pbar = tqdm(total=len(snaps))
else:
logger.info(
'progress bar not available\ntry pip install tqdm --or-- conda install tqdm'
)
def animate(idx):
if tqdm is not None:
pbar.update(n=1)
subsnaps = snaps[idx].subsnaps_as_list()
num_subsnaps = len(subsnaps)
for idxi, y in enumerate(ys):
_xlim, _ylim = (0.0, 0.0), (0.0, 0.0)
for idxj, subsnap in enumerate(subsnaps):
line = lines[idxi * num_subsnaps + idxj]
_x = subsnap[x]
_y = subsnap[y]
line.set_data(_x, _y)
if adaptive_limits:
for idx, indices in enumerate(array_chunks):
if verbose:
if n_chunks > 1:
logger.info(
f'Finding neighbours for chunk: {idx}...', flush=True
)
else:
logger.info(f'Finding neighbours...', flush=True)
_neighbours = snap.get_many_neighbours(indices)
neighbours = List()
for neigh in _neighbours:
neighbours.append(np.array(neigh))
if verbose:
if n_chunks > 1:
logger.info(
f'Summing over neighbours for chunk: {idx}...', flush=True
)
else:
logger.info(f'Summing over neighbours...', flush=True)
result[indices] = compute_function(
indices=indices,
neighbours=neighbours,
type_indices=type_indices,
position=position,
smoothing_length=smoothing_length,
mass=mass,
kernel_function=kernel_function,
kernel_gradient_function=kernel_gradient_function,
verbose=verbose,
**compute_function_kwargs,
def prev(self, number: int = 1):
"""Visualize previous snap."""
idx = self._where - number
if idx > 0:
self._fn(idx)
self._where -= number
else:
logger.info('Too far back. Going to first snap.')
self._fn(0)
self._where = 0
ind = {key: self.particle_indices(key) for key in self.particle_type}
for idx, neigh in enumerate(neighbours):
if self['type'][idx] == self.particle_type['gas']:
_neighbours[idx] = ind['gas'][neigh]
elif self['type'][idx] == self.particle_type['dust']:
_neighbours[idx] = ind['dust'][self['sub_type'][idx] - 1][neigh]
elif self['type'][idx] == self.particle_type['boundary']:
_neighbours[idx] = ind['boundary'][self['sub_type'][idx] - 1][neigh]
elif self['type'][idx] == self.particle_type['star']:
_neighbours[idx] = ind['star'][neigh]
elif self['type'][idx] == self.particle_type['darkmatter']:
_neighbours[idx] = ind['darkmatter'][neigh]
elif self['type'][idx] == self.particle_type['bulge']:
_neighbours[idx] = ind['bulge'][neigh]
logger.info(' Done!', flush=True)
return _neighbours
lines = ax.lines
if text is not None:
texts = [
ax.text(
0.9, 0.9, text[0], ha='right', transform=ax.transAxes, **text_kwargs
)
]
else:
lines = [line for ax in fig.axes for line in ax.lines]
if text is not None:
texts = [text for ax in fig.axes for text in ax.texts]
if tqdm is not None:
pbar = tqdm(total=len(profiles))
else:
logger.info(
'progress bar not available\ntry pip install tqdm --or-- conda install tqdm'
)
def animate(idx):
if tqdm is not None:
pbar.update(n=1)
for line, quantity in zip(lines, quantities):
x, y = profiles[idx]['radius'], profiles[idx][quantity]
line.set_data(x, y)
if adaptive_limits:
ax = line.axes
ylim = _get_range(y, (0, 0))
ax.set(ylim=ylim)
if text is not None:
for _text in texts:
_text.set_text(text[idx])
'Wendland C4'.
chunk_size : optional
The size of chunks, in terms of particle number, for neighbour
finding. If the chunk size is too large then the neighbour
finding algorithm (scipy.spatial.cKDTree.query_ball_point) runs
out of memory. Default is None.
verbose : optional
If True, print progress. Default is False.
Returns
-------
ndarray
The derivative of the quantity.
"""
end = '\n' if verbose else ''
logger.info(
f'Calculating {derivative}... may take some time...', end=end, flush=True
)
if derivative not in ('grad', 'div', 'curl'):
raise ValueError('derivative must be in ("grad", "div", "curl")')
density = snap['density']
quantity_array: ndarray = snap[quantity]
compute_function = _compute_derivative
compute_function_kwargs = {
'density': density,
'quantity_array': quantity_array,
}
if derivative == 'grad':