Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Iterable of UVTask objects.
"""
# The task_ids refer to tasks on the flattened meshgrid.
if not isinstance(input_uv, UVData):
raise TypeError("input_uv must be UVData object.")
# Skymodel will now be passed in as a catalog array.
if not isinstance(catalog, SkyModelData):
raise TypeError("catalog must be a SkyModelData object.")
# Splitting the catalog for memory's sake.
Nsrcs_total = catalog.Ncomponents
if Nsky_parts > 1:
Nsky_parts = int(Nsky_parts)
src_iter = [simutils.iter_array_split(s, Nsrcs_total, Nsky_parts)[0]
for s in range(Nsky_parts)]
else:
src_iter = [range(Nsrcs_total)]
# Build the antenna list.
antenna_names = input_uv.antenna_names
antennas = []
antpos_enu, antnums = input_uv.get_ENU_antpos()
for num, antname in enumerate(antenna_names):
if beam_dict is None:
beam_id = 0
else:
beam_id = beam_dict[antname]
antennas.append(Antenna(antname, num, antpos_enu[num], beam_id))
baselines = {}
Ntimes = input_uv.Ntimes
(1) Npus < Nbltf -- Split by Nbltf, split sources in the task loop for memory's sake.
(2) Nbltf < Npus and Nsrcs > Npus -- Split by Nsrcs only
(3) (Nsrcs, Nbltf) < Npus -- Split by Nbltf
- Split by instrument axes here.
- Within the task loop, decide on source chunks and make skymodels on the fly.
"""
Nbltf = Nbls * Ntimes * Nfreqs
split_srcs = False
if (Nbltf < Npus) and (Npus < Nsrcs):
split_srcs = True
if split_srcs:
src_inds, Nsrcs_local = simutils.iter_array_split(rank, Nsrcs, Npus)
task_inds = range(Nbltf)
Ntasks_local = Nbltf
else:
task_inds, Ntasks_local = simutils.iter_array_split(rank, Nbltf, Npus)
src_inds = range(Nsrcs)
Nsrcs_local = Nsrcs
return task_inds, src_inds, Ntasks_local, Nsrcs_local
Nsky_parts = max(Nsky_parts, 1)
if Nsky_parts > Nsrcs:
raise ValueError("Insufficient memory for simulation.")
Ntasks_tot = Ntimes * Nbls * Nfreqs * Nsky_parts
local_task_iter = uvdata_to_task_iter(
task_inds, input_uv, catalog.subselect(src_inds),
beam_list, beam_dict, Nsky_parts=Nsky_parts
)
summed_task_dict = {}
Ntasks_tot = comm.reduce(Ntasks_tot, op=mpi.MPI.MAX, root=0)
if rank == 0 and not quiet:
print("Tasks: ", Ntasks_tot, flush=True)
pbar = simutils.progsteps(maxval=Ntasks_tot)
engine = UVEngine()
count = mpi.Counter()
for task in local_task_iter:
engine.set_task(task)
if task.uvdata_index not in summed_task_dict.keys():
summed_task_dict[task.uvdata_index] = task
if summed_task_dict[task.uvdata_index].visibility_vector is None:
summed_task_dict[task.uvdata_index].visibility_vector = engine.make_visibility()
else:
summed_task_dict[task.uvdata_index].visibility_vector += engine.make_visibility()
count.next()
if rank == 0 and not quiet:
pbar.update(count.current_value())
uv_container.extra_keywords['world'] = input_uv.extra_keywords['world']
Nbls = input_uv.Nbls
Ntimes = input_uv.Ntimes
Nfreqs = input_uv.Nfreqs
Nsrcs = catalog.Ncomponents
task_inds, src_inds, Ntasks_local, Nsrcs_local = _make_task_inds(
Nbls, Ntimes, Nfreqs, Nsrcs, rank, Npus
)
# Construct beam objects from strings
beam_list.set_obj_mode(use_shared_mem=True)
# Estimating required memory to decide how to split source array.
mem_avail = (simutils.get_avail_memory()
- mpi.get_max_node_rss(return_per_node=True) * 2**30)
Npus_node = mpi.node_comm.Get_size()
skymodel_mem_footprint = (
simutils.estimate_skymodel_memory_usage(Nsrcs, catalog.Nfreqs) * Npus_node
)
# Allow up to 50% of available memory for SkyModel data.
skymodel_mem_max = 0.5 * mem_avail
Nsky_parts = np.ceil(skymodel_mem_footprint / float(skymodel_mem_max))
Nsky_parts = max(Nsky_parts, 1)
if Nsky_parts > Nsrcs:
raise ValueError("Insufficient memory for simulation.")
Ntasks_tot = Ntimes * Nbls * Nfreqs * Nsky_parts
Corresponding HEALPix indices for hpmap.
freqs : array_like, float
Frequencies in Hz. Shape (Nfreqs)
Returns
-------
SkyModel
Notes
-----
Currently, this function only converts a HEALPix map with a frequency axis.
"""
Nside = astropy_healpix.npix_to_nside(hpmap.shape[-1])
ra, dec = astropy_healpix.healpix_to_lonlat(indices, Nside)
stokes = np.zeros((4, len(freqs), len(indices)))
stokes[0] = (hpmap.T / simutils.jy2Tsr(freqs,
bm=astropy_healpix.nside_to_pixel_area(Nside).value, mK=False)
).T
sky = SkyModel(indices.astype('str'), ra, dec, stokes, freq_array=freqs, Nfreqs=len(freqs))
return sky
# TODO -- Need a way of passing the spectral_type from catalog to each rank.
# For now, if there are multiple frequencies, assume we have one per simulation frequency.
if self.spectral_type is None:
self.spectral_type = 'flat'
if self.Nfreqs > 1:
self.spectral_type = 'full'
if self.Ncomponents == 1:
self.stokes = self.stokes.reshape(4, Nfreqs, 1)
# The coherency is a 2x2 matrix giving electric field correlation in Jy
# Multiply by .5 to ensure that Trace sums to I not 2*I
# Shape = (2,2,Ncomponents)
self.coherency_radec = simutils.stokes_to_coherency(self.stokes)
self.time = None
assert np.all(
[self.Ncomponents == l for l in [self.ra.size, self.dec.size, self.stokes.shape[2]]]
), 'Inconsistent quantity dimensions.'
Interpolation method for frequencies.
Note -- This overrides whatever method may be set on the
UVBeam objects.
Returns
-------
jones_matrix : (2,2) ndarray, dtype float
The first axis is feed, the second axis is vector component
on the sky in az/za.
"""
# get_direction_jones needs to be defined on UVBeam
# 2x2 array of Efield vectors in alt/az
# convert to UVBeam az/za convention
source_za, source_az = simutils.altaz_to_zenithangle_azimuth(
source_alt_az[0], source_alt_az[1]
)
if isinstance(frequency, units.Quantity):
freq = np.array([frequency.to('Hz').value])
else:
freq = np.array([frequency])
if array.beam_list[self.beam_id].data_normalization != 'peak':
array.beam_list[self.beam_id].peak_normalize()
if freq_interp_kind is not None:
array.beam_list[self.beam_id].freq_interp_kind = freq_interp_kind
if interpolation_function is not None:
array.beam_list[self.beam_id].interpolation_function = interpolation_function
Nsrcs = catalog.Ncomponents
task_inds, src_inds, Ntasks_local, Nsrcs_local = _make_task_inds(
Nbls, Ntimes, Nfreqs, Nsrcs, rank, Npus
)
# Construct beam objects from strings
beam_list.set_obj_mode(use_shared_mem=True)
# Estimating required memory to decide how to split source array.
mem_avail = (simutils.get_avail_memory()
- mpi.get_max_node_rss(return_per_node=True) * 2**30)
Npus_node = mpi.node_comm.Get_size()
skymodel_mem_footprint = (
simutils.estimate_skymodel_memory_usage(Nsrcs, catalog.Nfreqs) * Npus_node
)
# Allow up to 50% of available memory for SkyModel data.
skymodel_mem_max = 0.5 * mem_avail
Nsky_parts = np.ceil(skymodel_mem_footprint / float(skymodel_mem_max))
Nsky_parts = max(Nsky_parts, 1)
if Nsky_parts > Nsrcs:
raise ValueError("Insufficient memory for simulation.")
Ntasks_tot = Ntimes * Nbls * Nfreqs * Nsky_parts
local_task_iter = uvdata_to_task_iter(
task_inds, input_uv, catalog.subselect(src_inds),
beam_list, beam_dict, Nsky_parts=Nsky_parts
)