Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _assign_update_info(spike_ids, old_spike_clusters, new_spike_clusters):
old_clusters = _unique(old_spike_clusters)
new_clusters = _unique(new_spike_clusters)
largest_old_cluster = np.bincount(old_spike_clusters).argmax()
descendants = list(set(zip(old_spike_clusters, new_spike_clusters)))
update_info = UpdateInfo(
description='assign',
spike_ids=list(spike_ids),
spike_clusters=list(new_spike_clusters),
added=list(new_clusters),
deleted=list(old_clusters),
descendants=descendants,
largest_old_cluster=int(largest_old_cluster),
)
return update_info
spike_ids = _as_array(spike_ids)
if len(new_spike_clusters) == 1 and len(spike_ids) > 1:
new_spike_clusters = np.ones(len(spike_ids), dtype=np.int64) * new_spike_clusters[0]
old_spike_clusters = self._spike_clusters[spike_ids]
assert len(spike_ids) == len(old_spike_clusters)
assert len(new_spike_clusters) == len(spike_ids)
# Update the spikes per cluster structure.
old_clusters = _unique(old_spike_clusters)
# NOTE: shortcut to a merge if this assignment is effectively a merge
# i.e. if all spikes are assigned to a single cluster.
# The fact that spike selection has been previously extended to
# whole clusters is critical here.
new_clusters = _unique(new_spike_clusters)
if len(new_clusters) == 1:
return self._do_merge(spike_ids, old_clusters, new_clusters[0])
# We return the UpdateInfo structure.
up = _assign_update_info(spike_ids, old_spike_clusters, new_spike_clusters)
# We update the new cluster id (strictly increasing during a session).
self._new_cluster_id = max(self._new_cluster_id, max(up.added) + 1)
# We make the assignments.
self._spike_clusters[spike_ids] = new_spike_clusters
# OPTIM: we update spikes_per_cluster manually.
new_spc = _spikes_per_cluster(new_spike_clusters, spike_ids)
self._update_cluster_ids(to_remove=old_clusters, to_add=new_spc)
return up
def _extend_spikes(spike_ids, spike_clusters):
"""Return all spikes belonging to the clusters containing the specified
spikes."""
# We find the spikes belonging to modified clusters.
# What are the old clusters that are modified by the assignment?
old_spike_clusters = spike_clusters[spike_ids]
unique_clusters = _unique(old_spike_clusters)
# Now we take all spikes from these clusters.
changed_spike_ids = _spikes_in_clusters(spike_clusters, unique_clusters)
# These are the new spikes that need to be reassigned.
extended_spike_ids = np.setdiff1d(changed_spike_ids, spike_ids, assume_unique=True)
return extended_spike_ids
def make_depths(self):
"""Make spikes.depths.npy and clusters.depths.npy."""
p = self.dir_path
channel_positions = self.model.channel_positions
assert channel_positions.ndim == 2
cluster_channels = np.load(p / 'clusters.peakChannel.npy')
assert cluster_channels.ndim == 1
n_clusters = cluster_channels.shape[0]
spike_clusters = self.model.spike_clusters
assert spike_clusters.ndim == 1
n_spikes = spike_clusters.shape[0]
self.cluster_ids = _unique(self.model.spike_clusters)
clusters_depths = channel_positions[cluster_channels, 1]
assert clusters_depths.shape == (n_clusters,)
spike_clusters_rel = _index_of(spike_clusters, self.cluster_ids)
assert spike_clusters_rel.max() < clusters_depths.shape[0]
spikes_depths = clusters_depths[spike_clusters_rel]
assert spikes_depths.shape == (n_spikes,)
np.save(p / 'spikes.depths.npy', spikes_depths)
np.save(p / 'clusters.depths.npy', clusters_depths)
def _assign_update_info(spike_ids, old_spike_clusters, new_spike_clusters):
old_clusters = _unique(old_spike_clusters)
new_clusters = _unique(new_spike_clusters)
largest_old_cluster = np.bincount(old_spike_clusters).argmax()
descendants = list(set(zip(old_spike_clusters, new_spike_clusters)))
update_info = UpdateInfo(
description='assign',
spike_ids=list(spike_ids),
spike_clusters=list(new_spike_clusters),
added=list(new_clusters),
deleted=list(old_clusters),
descendants=descendants,
largest_old_cluster=int(largest_old_cluster),
)
return update_info