Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
beam_list: :class:~`pyuvsim.BeamList
BeamList carrying beam model (in object mode).
beam_dict: dict
Map of antenna numbers to index in beam_list.
Yields
------
Iterable of UVTask objects.
"""
# The task_ids refer to tasks on the flattened meshgrid.
if not isinstance(input_uv, UVData):
raise TypeError("input_uv must be UVData object.")
# Skymodel will now be passed in as a catalog array.
if not isinstance(catalog, SkyModelData):
raise TypeError("catalog must be a SkyModelData object.")
# Splitting the catalog for memory's sake.
Nsrcs_total = catalog.Ncomponents
if Nsky_parts > 1:
Nsky_parts = int(Nsky_parts)
src_iter = [simutils.iter_array_split(s, Nsrcs_total, Nsky_parts)[0]
for s in range(Nsky_parts)]
else:
src_iter = [range(Nsrcs_total)]
# Build the antenna list.
antenna_names = input_uv.antenna_names
antennas = []
antpos_enu, antnums = input_uv.get_ENU_antpos()
for num, antname in enumerate(antenna_names):
if beam_dict is None:
mock_keywords['world'] = 'moon'
else:
localframe = 'altaz'
mock_keywords['world'] = 'earth'
source_coord = SkyCoord(alt=Angle(alts, unit=units.deg), az=Angle(azs, unit=units.deg),
obstime=time, frame=localframe, location=array_location)
icrs_coord = source_coord.transform_to('icrs')
ra = icrs_coord.ra
dec = icrs_coord.dec
names = np.array(['src' + str(si) for si in range(Nsrcs)])
stokes = np.zeros((4, 1, Nsrcs))
stokes[0, :] = fluxes
catalog = pyradiosky.SkyModel(names, ra, dec, stokes, 'flat')
if return_data:
catalog = SkyModelData(catalog)
if get_rank() == 0 and save:
np.savez('mock_catalog_' + arrangement, ra=ra.rad, dec=dec.rad, alts=alts, azs=azs,
fluxes=fluxes)
return catalog, mock_keywords
mpi.start_mpi()
rank = mpi.get_rank()
comm = mpi.get_comm()
input_uv = UVData()
beam_list = None
beam_dict = None
skydata = SkyModelData()
if rank == 0:
input_uv, beam_list, beam_dict = simsetup.initialize_uvdata_from_params(params)
skydata, source_list_name = simsetup.initialize_catalog_from_params(
params, input_uv, return_recarray=False
)
skydata = simsetup.SkyModelData(skydata)
input_uv = comm.bcast(input_uv, root=0)
beam_list = comm.bcast(beam_list, root=0)
beam_dict = comm.bcast(beam_dict, root=0)
skydata.share(root=0)
uv_out = run_uvdata_uvsim(
input_uv, beam_list, beam_dict=beam_dict, catalog=skydata, quiet=quiet
)
if rank == 0:
if isinstance(params, str):
with open(params, 'r') as pfile:
param_dict = yaml.safe_load(pfile)
else:
param_dict = params
Parameters
----------
inds: range or index array
Indices to select along the component axis.
Returns
-------
SkyModelData
A new SkyModelData with Ncomp axes downselected.
Notes
-----
If inds is a range object, this method will avoid copying data in numpy arrays,
such that the returned SkyModelData object carries views into the current object's arrays.
"""
new_sky = SkyModelData()
new_sky.Ncomponents = len(inds)
new_sky.nside = self.nside
new_sky.component_type = self.component_type
if self.name is not None:
new_sky.name = self.name[inds]
if isinstance(inds, range):
new_sky.stokes_I = self.stokes_I[:, slice(inds.start, inds.stop, inds.step)]
else:
new_sky.stokes_I = self.stokes_I[:, inds]
new_sky.ra = self.ra[inds]
new_sky.dec = self.dec[inds]
new_sky.Nfreqs = self.Nfreqs
new_sky.spectral_type = self.spectral_type
if self.reference_frequency is not None:
Returned only if return_uv is True.
"""
if mpi is None:
raise ImportError("You need mpi4py to use the uvsim module. "
"Install it by running pip install pyuvsim[sim] "
"or pip install pyuvsim[all] if you also want the "
"line_profiler installed.")
mpi.start_mpi()
rank = mpi.get_rank()
comm = mpi.get_comm()
input_uv = UVData()
beam_list = None
beam_dict = None
skydata = SkyModelData()
if rank == 0:
input_uv, beam_list, beam_dict = simsetup.initialize_uvdata_from_params(params)
skydata, source_list_name = simsetup.initialize_catalog_from_params(
params, input_uv, return_recarray=False
)
skydata = simsetup.SkyModelData(skydata)
input_uv = comm.bcast(input_uv, root=0)
beam_list = comm.bcast(beam_list, root=0)
beam_dict = comm.bcast(beam_dict, root=0)
skydata.share(root=0)
uv_out = run_uvdata_uvsim(
input_uv, beam_list, beam_dict=beam_dict, catalog=skydata, quiet=quiet
)