Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
uniq_subchain = np.unique(sort_subchain)
diff_subchain = np.ediff1d(sort_subchain)
marks = (diff_subchain > 0)
marks = np.arange(calc)[marks] + 1
marks = np.concatenate(([0], marks, [calc]))
for i, u in enumerate(uniq_subchain):
self.bulk_vel[u] = np.sum(vel[marks[i]:marks[i + 1]], axis=0)
del vel, subchain, sort_subchain
del diff_subchain
# Bring it together, and divide by the previously computed total mass
# of each halo.
self.bulk_vel = self.comm.mpi_allreduce(self.bulk_vel, op='sum')
for groupID in xrange(self.group_count):
self.bulk_vel[groupID] = \
self.bulk_vel[groupID] / self.Tot_M[groupID]
yt_counters("bulk vel. computing")
# Now calculate the RMS velocity of the groups in parallel, very
# similarly to the bulk velocity and re-using some of the arrays.
yt_counters("rms vel computing")
rms_vel_temp = np.zeros((self.group_count, 2), dtype='float64')
if calc:
vel = np.empty((calc, 3), dtype='float64')
vel[:, 0] = xv[select] * ms
vel[:, 1] = yv[select] * ms
vel[:, 2] = zv[select] * ms
vel = vel[sort]
for i, u in enumerate(uniq_subchain):
# This finds the sum locally.
rms_vel_temp[u][0] = np.sum(((vel[marks[i]:marks[i + 1]] - \
self.bulk_vel[u]) / self.Tot_M[u]) ** 2.)
# I could use self.group_sizes...
rms_vel_temp[u][1] = marks[i + 1] - marks[i]
left = ((points[:,i] < temp_LE[i]) * (points[:,i] < temp_RE[i])) * self.period[i]
right = ((points[:,i] > temp_LE[i]) * (points[:,i] > temp_RE[i])) * self.period[i]
shift_points[:,i] = points[:,i] + left - right
is_inside = ( (shift_points >= temp_LE).all(axis=1) * \
(shift_points < temp_RE).all(axis=1) )
send_real_indices[neighbor] = real_indices[is_inside].copy()
send_points[neighbor] = shift_points[is_inside].copy()
send_mass[neighbor] = mass[is_inside].copy()
send_size[neighbor] = len(na.where(is_inside == True)[0])
del points, shift_points, mass, real_indices
yt_counters("Picking padding data to send.")
# Communicate the sizes to send.
self.mine, global_send_count = self._mpi_info_dict(send_size)
del send_size
# Initialize the arrays to receive data.
yt_counters("Initalizing recv arrays.")
recv_real_indices = {}
recv_points = {}
recv_mass = {}
recv_size = 0
for opp_neighbor in self.neighbors:
opp_size = global_send_count[opp_neighbor][self.mine]
recv_real_indices[opp_neighbor] = na.empty(opp_size, dtype='int64')
recv_points[opp_neighbor] = na.empty((opp_size, 3), dtype='float64')
recv_mass[opp_neighbor] = na.empty(opp_size, dtype='float64')
recv_size += opp_size
yt_counters("Initalizing recv arrays.")
# Setup the receiving slots.
yt_counters("MPI stuff.")
hooks = []
for opp_neighbor in self.neighbors:
hooks.append(self._mpi_Irecv_long(recv_real_indices[opp_neighbor], opp_neighbor))
select = (self.chainID != -1)
calc = len(np.where(select == True)[0])
loc = np.empty((calc, 3), dtype='float64')
if self.tree == 'F':
loc[:, 0] = np.concatenate((self.xpos, self.xpos_pad))[select]
loc[:, 1] = np.concatenate((self.ypos, self.ypos_pad))[select]
loc[:, 2] = np.concatenate((self.zpos, self.zpos_pad))[select]
self.__max_memory()
del self.xpos_pad, self.ypos_pad, self.zpos_pad
elif self.tree == 'C':
loc = self.pos[select]
subchain = self.chainID[select]
# First we need to find the maximum density point for all groups.
# I think this will be faster than several vector operations that need
# to pull the entire chainID array out of memory several times.
yt_counters("max dens point")
max_dens_point = np.zeros((self.group_count,4),dtype='float64')
for i,part in enumerate(np.arange(self.size)[select]):
groupID = self.chainID[part]
if part < self.real_size:
real_index = self.index[part]
else:
real_index = self.index_pad[part - self.real_size]
if real_index == self.densest_in_group_real_index[groupID]:
max_dens_point[groupID] = np.array([self.density[part], \
loc[i, 0], loc[i, 1], loc[i, 2]])
del self.index, self.index_pad, self.densest_in_group_real_index
# Now we broadcast this, effectively, with an allsum. Even though
# some groups are on multiple tasks, there is only one densest_in_chain
# and only that task contributed above.
self.max_dens_point = self.comm.mpi_allreduce(max_dens_point, op='sum')
del max_dens_point
def _translate_groupIDs(self, group_count):
"""
Using the maps, convert the particle chainIDs into their locally-final
groupIDs.
"""
yt_counters("translate_groupIDs")
self.I_own = set([])
for i in xrange(int(self.size)):
# Don't translate non-affiliated particles.
if self.chainID[i] == -1: continue
# We want to remove the group tag from padded particles,
# so when we return it to HaloFinding, there is no duplication.
if self.is_inside[i]:
self.chainID[i] = self.reverse_map[self.chainID[i]]
self.I_own.add(self.chainID[i])
else:
self.chainID[i] = -1
del self.is_inside
# Create a densest_in_group, analogous to densest_in_chain.
keys = np.arange(group_count)
vals = np.zeros(group_count)
self.densest_in_group = dict(itertools.izip(keys,vals))
left = ((points[:,i] < temp_LE[i]) * (points[:,i] < temp_RE[i])) * self.period[i]
right = ((points[:,i] > temp_LE[i]) * (points[:,i] > temp_RE[i])) * self.period[i]
shift_points[:,i] = points[:,i] + left - right
is_inside = ( (shift_points >= temp_LE).all(axis=1) * \
(shift_points < temp_RE).all(axis=1) )
send_real_indices[neighbor] = real_indices[is_inside].copy()
send_points[neighbor] = shift_points[is_inside].copy()
send_mass[neighbor] = mass[is_inside].copy()
send_size[neighbor] = is_inside.sum()
del points, shift_points, mass, real_indices
yt_counters("Picking padding data to send.")
# Communicate the sizes to send.
self.mine, global_send_count = self.comm.mpi_info_dict(send_size)
del send_size
# Initialize the arrays to receive data.
yt_counters("Initalizing recv arrays.")
recv_real_indices = {}
recv_points = {}
recv_mass = {}
recv_size = 0
for opp_neighbor in self.neighbors:
opp_size = global_send_count[opp_neighbor][self.mine]
recv_real_indices[opp_neighbor] = np.empty(opp_size, dtype='int64')
recv_points[opp_neighbor] = np.empty((opp_size, 3), dtype='float64')
recv_mass[opp_neighbor] = np.empty(opp_size, dtype='float64')
recv_size += opp_size
yt_counters("Initalizing recv arrays.")
# Setup the receiving slots.
yt_counters("MPI stuff.")
hooks = []
for opp_neighbor in self.neighbors:
hooks.append(self.comm.mpi_nonblocking_recv(recv_real_indices[opp_neighbor], opp_neighbor))
Tot_M[groupID] += ms[i]
del cc, ms
for groupID in xrange(int(self.group_count)):
# Don't divide by zero.
if groupID in self.I_own:
CoM_M[groupID] /= Tot_M[groupID]
CoM_M[groupID] += self.max_dens_point[groupID,1:4] - np.array([0.5,0.5,0.5])
CoM_M[groupID] *= Tot_M[groupID]
# Now we find their global values
self.group_sizes = self.comm.mpi_allreduce(size, op='sum')
CoM_M = self.comm.mpi_allreduce(CoM_M, op='sum')
self.Tot_M = self.comm.mpi_allreduce(Tot_M, op='sum')
self.CoM = np.empty((self.group_count,3), dtype='float64')
for groupID in xrange(int(self.group_count)):
self.CoM[groupID] = CoM_M[groupID] / self.Tot_M[groupID]
yt_counters("CoM")
self.__max_memory()
# Now we find the maximum radius for all groups.
yt_counters("max radius")
max_radius = np.zeros(self.group_count, dtype='float64')
if calc:
com = self.CoM[subchain]
rad = np.fabs(com - loc)
dist = (np.minimum(rad, self.period - rad)**2.).sum(axis=1)
dist = dist[sort]
for i, u in enumerate(uniq_subchain):
max_radius[u] = np.max(dist[marks[i]:marks[i+1]])
# Find the maximum across all tasks.
mylog.info('Fraction of particles in this region in groups: %f' % (float(calc)/self.size))
self.max_radius = self.comm.mpi_allreduce(max_radius, op='max')
self.max_radius = np.sqrt(self.max_radius)
yt_counters("max radius")
map[i] = new
dic_new[new] = dic
dicri_new[new] = self.densest_in_chain_real_index[i]
new += 1
else:
map[i] = -1
for i in range(self.size):
if self.chainID[i] != -1:
self.chainID[i] = map[self.chainID[i]]
del map
self.densest_in_chain = dic_new.copy()
del dic_new
self.densest_in_chain_real_index = dicri_new.copy()
del dicri_new
self.__max_memory()
yt_counters("preconnect pregrouping.")
mylog.info("Preconnected %d chains." % removed)
yt_counters("preconnect_chains")
return chain_count - removed
opp_size = global_send_count[opp_neighbor][self.mine]
self.index_pad[so_far:so_far+opp_size] = recv_real_indices[opp_neighbor]
# Clean up immediately to reduce peak memory usage.
del recv_real_indices[opp_neighbor]
self.xpos_pad[so_far:so_far+opp_size] = recv_points[opp_neighbor][:,0]
self.ypos_pad[so_far:so_far+opp_size] = recv_points[opp_neighbor][:,1]
self.zpos_pad[so_far:so_far+opp_size] = recv_points[opp_neighbor][:,2]
del recv_points[opp_neighbor]
self.mass_pad[so_far:so_far+opp_size] = recv_mass[opp_neighbor]
del recv_mass[opp_neighbor]
so_far += opp_size
yt_counters("Processing padded data.")
# The KDtree node search wants the particles to be in the full box,
# not the expanded dimensions of shifted (<0 or >1 generally) volume,
# so we fix the positions of particles here.
yt_counters("Flipping coordinates around the periodic boundary.")
self.xpos_pad = self.xpos_pad % self.period[0]
self.ypos_pad = self.ypos_pad % self.period[1]
self.zpos_pad = self.zpos_pad % self.period[2]
yt_counters("Flipping coordinates around the periodic boundary.")
self.size = self.index.size + self.index_pad.size
# Now that we have the full size, initialize the chainID array
self.chainID = na.ones(self.size,dtype='int64') * -1
# Clean up explicitly, but these should be empty dicts by now.
del recv_real_indices, hooks, recv_points, recv_mass
yt_counters("Communicate discriminated padding")
hooks = []
for opp_neighbor in self.neighbors:
hooks.append(self.comm.mpi_nonblocking_recv(recv_real_indices[opp_neighbor], opp_neighbor))
hooks.append(self.comm.mpi_nonblocking_recv(recv_points[opp_neighbor], opp_neighbor))
hooks.append(self.comm.mpi_nonblocking_recv(recv_mass[opp_neighbor], opp_neighbor))
# Let's wait here to be absolutely sure that all the receive buffers
# have been created before any sending happens!
self.comm.barrier()
# Now we send the data.
for neighbor in self.neighbors:
hooks.append(self.comm.mpi_nonblocking_send(send_real_indices[neighbor], neighbor))
hooks.append(self.comm.mpi_nonblocking_send(send_points[neighbor], neighbor))
hooks.append(self.comm.mpi_nonblocking_send(send_mass[neighbor], neighbor))
# Now we use the data, after all the comms are done.
self.comm.mpi_Request_Waitall(hooks)
yt_counters("MPI stuff.")
yt_counters("Processing padded data.")
del send_real_indices, send_points, send_mass
# Now we add the data to ourselves.
self.index_pad = np.empty(recv_size, dtype='int64')
self.xpos_pad = np.empty(recv_size, dtype='float64')
self.ypos_pad = np.empty(recv_size, dtype='float64')
self.zpos_pad = np.empty(recv_size, dtype='float64')
self.mass_pad = np.empty(recv_size, dtype='float64')
so_far = 0
for opp_neighbor in self.neighbors:
opp_size = global_send_count[opp_neighbor][self.mine]
self.index_pad[so_far:so_far+opp_size] = recv_real_indices[opp_neighbor]
# Clean up immediately to reduce peak memory usage.
del recv_real_indices[opp_neighbor]
self.xpos_pad[so_far:so_far+opp_size] = recv_points[opp_neighbor][:,0]
self.ypos_pad[so_far:so_far+opp_size] = recv_points[opp_neighbor][:,1]