Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parser.add_argument("uvfits2", help="name of second uvfits file to compare to first.")
args = parser.parse_args()
uvfits_file1 = args.uvfits1
if not op.isfile(uvfits_file1):
raise IOError("There is no file named {}".format(args.uvfits_file1))
uvfits_file2 = args.uvfits2
if not op.isfile(uvfits_file2):
raise IOError("There is no file named {}".format(args.uvfits_file2))
uv1 = UVData()
uv1.read_uvfits(uvfits_file1)
uv2 = UVData()
uv2.read_uvfits(uvfits_file2)
if uv1 == uv2:
print("UVData objects from files are equal")
else:
print("UVData objects from files are not equal")
del uv1
del uv2
parser = argparse.ArgumentParser(description=("Generate basic simulation parameters from uvfits."))
parser.add_argument('file_in', metavar='', type=str, nargs='+')
parser.add_argument('-p','--paraml_fname', type=str, default='')
args = parser.parse_args()
uv_fname = args.file_in[0]
yaml_fname= args.paraml_fname
if yaml_fname == '':
yaml_fname = '.'.join(os.path.basenem(uv_fname).split('.')[:-1]) + '.yaml' #Replace extension
input_uv = UVData()
input_uv.read_uvfits(uv_fname, read_data=False)
freq_array = input_uv.freq_array[0,:].tolist()
time_array = input_uv.time_array.tolist()
param_dict = dict(
start_time = time_array[0],
end_time = time_array[-1],
Ntimes = input_uv.Ntimes,
integration_time = input_uv.integration_time,
start_freq = freq_array[0],
end_freq = freq_array[-1],
channel_width = input_uv.channel_width,
)
a.add_argument(
"--filetype",
default="uvfits",
type=str,
help="filetype, options=['uvfits', 'miriad']",
)
# get args
args = a.parse_args()
if os.path.exists(args.file_out) and args.overwrite is False:
print("{} exists. Use --overwrite to overwrite the file.".format(args.file_out))
sys.exit(0)
uv_obj = UVData()
if args.filetype == "uvfits":
uv_obj.read_uvfits(args.file_in)
elif args.filetype == "miriad":
uv_obj.read_miriad(args.file_in)
else:
raise IOError("didn't recognize filetype {}".format(args.filetype))
large_ant_nums = sorted(
uv_obj.antenna_numbers[np.where(uv_obj.antenna_numbers > 254)[0]]
)
new_nums = sorted(set(range(255)) - set(uv_obj.antenna_numbers))
if len(new_nums) < len(large_ant_nums):
raise ValueError("too many antennas in dataset, cannot renumber all below 255")
new_nums = new_nums[-1 * len(large_ant_nums) :]
renumber_dict = dict(list(zip(large_ant_nums, new_nums)))
def write_dataparams_rst(write_file=None):
UV = UVData()
out = "UVData Parameters\n==========================\n"
out += (
"These are the standard attributes of UVData objects.\n\nUnder the hood "
"they are actually properties based on UVParameter objects.\n\nAngle type "
"attributes also have convenience properties named the same thing \nwith "
"'_degrees' appended through which you can get or set the value in "
"degrees.\n\nSimilarly location type attributes (which are given in "
"topocentric xyz coordinates) \nhave convenience properties named the "
"same thing with '_lat_lon_alt' and \n'_lat_lon_alt_degrees' appended "
"through which you can get or set the values using \nlatitude, longitude and "
"altitude values in radians or degrees and meters.\n\n"
)
out += "Required\n----------------\n"
out += (
"These parameters are required to have a sensible UVData object and \n"
"are required for most kinds of uv data files."
'observation.npz contains numpy array of data from the SNAP')
parser=argparse.ArgumentParser(description=desc)
parser.add_argument('--config','-c',dest='config',help='configuration yaml file')
parser.add_argument('--data','-z',dest='data',help='data npz file')
args=parser.parse_args()
with open(args.config) as configfile:
config=yaml.load(configfile)
data=np.load(args.data)
#instatiate uvdata object
data_uv=UVData()
data_uv.Ntimes=len(data['times'])
data_uv.Nbls=len(data['ant1_array'])
data_uv.Nblts=data_uv.Ntimes*data_uv.Nbls
data_uv.Nfreqs=len(data['frequencies'])
data_uv.Npols=len(data['polarizations'])
data_uv.vis_units='uncalib'
data_uv.Nspws=1 #!!!Assume a single spectral window.!!!
#instantiate flag array with no flags
data_uv.flag_array=np.empty((data_uv.Nblts,data_uv.Nspws,
data_uv.Nfreqs,data_uv.Npols),dtype=bool)
data_uv.flag_array[:]=False
#instantiate nsamples array with equal unity weights.
data_uv.nsample_array=np.ones((data_uv.Nblts,data_uv.Nspws,
data_uv.Nfreqs,data_uv.Npols),dtype=float)
#create frequency array
Returns
-------
:class:~`pyuvdata.UVData` instance containing simulated visibilities.
"""
if mpi is None:
raise ImportError("You need mpi4py to use the uvsim module. "
"Install it by running pip install pyuvsim[sim] "
"or pip install pyuvsim[all] if you also want the "
"line_profiler installed.")
mpi.start_mpi()
rank = mpi.get_rank()
comm = mpi.get_comm()
Npus = mpi.get_Npus()
if not isinstance(input_uv, UVData):
raise TypeError("input_uv must be UVData object")
if not ((input_uv.Npols == 4) and (input_uv.polarization_array.tolist() == [-5, -6, -7, -8])):
raise ValueError("input_uv must have XX,YY,XY,YX polarization")
# The root node will initialize our simulation
# Read input file and make uvtask list
if rank == 0 and not quiet:
print('Nbls:', input_uv.Nbls, flush=True)
print('Ntimes:', input_uv.Ntimes, flush=True)
print('Nfreqs:', input_uv.Nfreqs, flush=True)
print('Nsrcs:', catalog.Ncomponents, flush=True)
if rank == 0:
uv_container = simsetup._complete_uvdata(input_uv, inplace=False)
if 'world' in input_uv.extra_keywords:
uv_container.extra_keywords['world'] = input_uv.extra_keywords['world']
obs_params: str or dict
Either an obsparam file name or a dictionary of parameters.
input_uv: :class:~`pyuvdata.UVData`
Used to set location for mock catalogs and for horizon cuts.
return_recarray: bool
Return a recarray instead of a :class:~`pyradiosky.SkyModel` instance.
Default is True.
Returns
-------
skydata: numpy.recarray or :class:~`pyradiosky.SkyModel`
Source catalog filled with data.
source_list_name: str
Catalog identifier for metadata.
"""
if input_uv is not None and not isinstance(input_uv, UVData):
raise TypeError("input_uv must be UVData object")
if isinstance(obs_params, str):
with open(obs_params, 'r') as pfile:
param_dict = yaml.safe_load(pfile)
param_dict['config_path'] = os.path.dirname(obs_params)
else:
param_dict = obs_params
# Parse source selection options
select_options = ['min_flux', 'max_flux', 'horizon_buffer']
source_select_kwds = {}
source_params = param_dict['sources']
if 'catalog' in source_params:
if 'object_name' not in param_dict:
tloc = EarthLocation.from_geocentric(*uvparam_dict['telescope_location'], unit='m')
time = Time(uvparam_dict['time_array'][0], scale='utc', format='jd')
src, _ = create_mock_catalog(time, arrangement='zenith', array_location=tloc)
if 'sources' in param_dict:
source_file_name = os.path.basename(param_dict['sources']['catalog'])
uvparam_dict['object_name'] = '{}_ra{:.4f}_dec{:.4f}'.format(
source_file_name, src.ra.deg[0], src.dec.deg[0]
)
else:
uvparam_dict['object_name'] = 'Unspecified'
else:
uvparam_dict['object_name'] = param_dict['object_name']
uv_obj = UVData()
# use the __iter__ function on UVData to get list of UVParameters on UVData
valid_param_names = [getattr(uv_obj, param).name for param in uv_obj]
for k in valid_param_names:
if k in param_dict:
setattr(uv_obj, k, param_dict[k])
if k in uvparam_dict:
setattr(uv_obj, k, uvparam_dict[k])
bls = np.array(
[
uv_obj.antnums_to_baseline(uv_obj.antenna_numbers[j], uv_obj.antenna_numbers[i])
for i in range(0, uv_obj.Nants_data)
for j in range(i, uv_obj.Nants_data)
]
)
from pyuvsim.data import DATA_PATH as SIM_DATA_PATH
# Read in a uvfits file and automatically generate yaml files.
# Will assume the same beam_id for all antennas for now.
parser = argparse.ArgumentParser(description=("Extracts antenna position info from uvfits."))
parser.add_argument('file_in', metavar='', type=str, nargs='+')
parser.add_argument('-l', '--layout_csv_name', default=None)
parser.add_argument('-t', '--telescope_config_name', default=None)
parser.add_argument('-b', '--beam_filepath', type=str,
default=os.path.join(SIM_DATA_PATH, 'HERA_NicCST.uvbeam'))
args = parser.parse_args()
uvd = UVData()
uvd.read(args.file_in[0])
pyuvsim.simsetup.uvdata_to_telescope_config(
uvd, args.beam_filepath, layout_csv_name=args.layout_csv_name,
telescope_config_name=args.telescope_config_name,
return_names=False, path_out='.'
)
if splitext[1] == ".uvh5":
outfilename = splitext[0] + ".uvfits"
elif splitext[1] in [".ms", ".MS"]:
outfilename = splitext[0] + ".uvfits"
elif splitext[1] == ".sav":
outfilename = splitext[0] + ".uvfits"
else:
outfilename = filename + ".uvfits"
else:
outfilename = args.output_filename
if os.path.exists(outfilename) and args.overwrite is False:
print("{} exists, not overwriting...".format(outfilename))
continue
# read in file
UV = pyuvdata.UVData()
UV.read(filename)
if UV.phase_type == "drift":
# phase data
if args.phase_time is not None:
UV.phase_to_time(Time(args.phase_time, format="jd", scale="utc"))
if args.verbose:
print("phasing {} to time {}".format(filename, args.phase_time))
else:
UV.phase_to_time(Time(UV.time_array[0], format="jd", scale="utc"))
if args.verbose:
print("phasing {} to time {}".format(filename, UV.time_array[0]))
# write data
UV.history += history