Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Antennas, times and blt_inds all need to be combined into a set of
# blts indices to keep.
# test for blt_inds presence before adding inds from antennas & times
if blt_inds is not None:
blt_inds = uvutils.get_iterable(blt_inds)
history_update_string += 'baseline-times'
n_selects += 1
if antenna_names is not None:
if antenna_nums is not None:
raise ValueError(
'Only one of antenna_nums and antenna_names can be provided.')
antenna_names = uvutils.get_iterable(antenna_names)
antenna_nums = []
for s in antenna_names:
if s not in self.antenna_names:
raise ValueError(
'Antenna name {a} is not present in the antenna_names array'.format(a=s))
antenna_nums.append(self.antenna_numbers[np.where(
np.array(self.antenna_names) == s)[0]])
if antenna_nums is not None:
antenna_nums = uvutils.get_iterable(antenna_nums)
if n_selects > 0:
history_update_string += ', antennas'
else:
history_update_string += 'antennas'
n_selects += 1
inds1 = np.zeros(0, dtype=np.int)
# Use union (or) to join antenna_names/nums & ant_pairs_nums
ant_blt_inds = np.array(list(set(ant_blt_inds).union(bls_blt_inds)))
else:
ant_blt_inds = bls_blt_inds
if ant_blt_inds is not None:
if blt_inds is not None:
# Use intesection (and) to join antenna_names/nums/ant_pairs_nums with blt_inds
# handled differently because of the time aspect (which is anded with antennas below)
blt_inds = np.array(
list(set(blt_inds).intersection(ant_blt_inds)), dtype=np.int)
else:
blt_inds = ant_blt_inds
if times is not None:
times = uvutils.get_iterable(times)
if n_selects > 0:
history_update_string += ', times'
else:
history_update_string += 'times'
n_selects += 1
time_blt_inds = np.zeros(0, dtype=np.int)
for jd in times:
if jd in self.time_array:
time_blt_inds = np.append(
time_blt_inds, np.where(self.time_array == jd)[0])
else:
raise ValueError(
'Time {t} is not present in the time_array'.format(t=jd))
if blt_inds is not None:
if min(blt_inds) < 0:
raise ValueError('blt_inds contains indices that are negative')
blt_inds = list(sorted(set(list(blt_inds))))
if freq_chans is not None:
freq_chans = uvutils.get_iterable(freq_chans)
if frequencies is None:
frequencies = self.freq_array[0, freq_chans]
else:
frequencies = uvutils.get_iterable(frequencies)
frequencies = np.sort(list(set(frequencies)
| set(self.freq_array[0, freq_chans])))
if frequencies is not None:
frequencies = uvutils.get_iterable(frequencies)
if n_selects > 0:
history_update_string += ', frequencies'
else:
history_update_string += 'frequencies'
n_selects += 1
freq_inds = np.zeros(0, dtype=np.int)
# this works because we only allow one SPW. This will have to be reworked when we support more.
freq_arr_use = self.freq_array[0, :]
for f in frequencies:
if f in freq_arr_use:
freq_inds = np.append(
freq_inds, np.where(freq_arr_use == f)[0])
else:
raise ValueError(
'Frequency {f} is not present in the freq_array'.format(f=f))
freq_ind_separation = freq_inds[1:] - freq_inds[:-1]
if np.min(freq_ind_separation) < np.max(freq_ind_separation):
warnings.warn('Selected frequencies are not evenly spaced. This '
'will make it impossible to write this data out to '
'some file types')
elif np.max(freq_ind_separation) > 1:
warnings.warn('Selected frequencies are not contiguous. This '
'will make it impossible to write this data out to '
'some file types.')
freq_inds = list(sorted(set(list(freq_inds))))
else:
freq_inds = None
if polarizations is not None:
polarizations = uvutils.get_iterable(polarizations)
if n_selects > 0:
history_update_string += ', polarizations'
else:
history_update_string += 'polarizations'
n_selects += 1
pol_inds = np.zeros(0, dtype=np.int)
for p in polarizations:
if isinstance(p, str):
p_num = uvutils.polstr2num(p)
else:
p_num = p
if p_num in self.polarization_array:
pol_inds = np.append(pol_inds, np.where(
self.polarization_array == p_num)[0])
else:
if blt_inds is not None:
if len(blt_inds) == 0:
raise ValueError(
'No baseline-times were found that match criteria')
if max(blt_inds) >= self.Nblts:
raise ValueError(
'blt_inds contains indices that are too large')
if min(blt_inds) < 0:
raise ValueError('blt_inds contains indices that are negative')
blt_inds = list(sorted(set(list(blt_inds))))
if freq_chans is not None:
freq_chans = uvutils.get_iterable(freq_chans)
if frequencies is None:
frequencies = self.freq_array[0, freq_chans]
else:
frequencies = uvutils.get_iterable(frequencies)
frequencies = np.sort(list(set(frequencies)
| set(self.freq_array[0, freq_chans])))
if frequencies is not None:
frequencies = uvutils.get_iterable(frequencies)
if n_selects > 0:
history_update_string += ', frequencies'
else:
history_update_string += 'frequencies'
n_selects += 1
freq_inds = np.zeros(0, dtype=np.int)
raise ValueError(
'No baseline-times were found that match criteria')
if max(blt_inds) >= self.Nblts:
raise ValueError(
'blt_inds contains indices that are too large')
if min(blt_inds) < 0:
raise ValueError('blt_inds contains indices that are negative')
blt_inds = list(sorted(set(list(blt_inds))))
if freq_chans is not None:
freq_chans = uvutils.get_iterable(freq_chans)
if frequencies is None:
frequencies = self.freq_array[0, freq_chans]
else:
frequencies = uvutils.get_iterable(frequencies)
frequencies = np.sort(list(set(frequencies)
| set(self.freq_array[0, freq_chans])))
if frequencies is not None:
frequencies = uvutils.get_iterable(frequencies)
if n_selects > 0:
history_update_string += ', frequencies'
else:
history_update_string += 'frequencies'
n_selects += 1
freq_inds = np.zeros(0, dtype=np.int)
# this works because we only allow one SPW. This will have to be reworked when we support more.
freq_arr_use = self.freq_array[0, :]
for f in frequencies:
if f in freq_arr_use:
if len(key) == 1:
if (key < 5) or (type(key) is str): interpreted as a
polarization number/name, return all blts for that pol.
else: interpreted as a baseline number. Return all times and
polarizations for that baseline.
if len(key) == 2: interpreted as an antenna pair. Return all
times and pols for that baseline.
if len(key) == 3: interpreted as antenna pair and pol (ant1, ant2, pol).
Return all times for that baseline, pol. pol may be a string.
Returns:
blt_ind1: numpy array with blt indices for antenna pair.
blt_ind2: numpy array with blt indices for conjugate antenna pair.
pol_ind: numpy array with polarization indices
"""
key = uvutils.get_iterable(key)
if type(key) is str:
# Single string given, assume it is polarization
pol_ind = np.where(self.polarization_array == uvutils.polstr2num(key))[0]
if len(pol_ind) == 0:
raise KeyError('Polarization {pol} not found in data.'.format(pol=key))
blt_ind1 = np.arange(self.Nblts)
blt_ind2 = np.array([], dtype=np.int64)
elif len(key) == 1:
key = key[0] # For simplicity
if isinstance(key, collections.Iterable):
# Nested tuple. Call function again.
blt_ind1, blt_ind2, pol_ind = self._key2inds(key)
elif key < 5:
# Small number, assume it is a polarization number a la AIPS memo
pol_ind = np.where(self.polarization_array == key)[0]
if len(pol_ind) == 0:
if antenna_names is not None:
if antenna_nums is not None:
raise ValueError(
'Only one of antenna_nums and antenna_names can be provided.')
antenna_names = uvutils.get_iterable(antenna_names)
antenna_nums = []
for s in antenna_names:
if s not in self.antenna_names:
raise ValueError(
'Antenna name {a} is not present in the antenna_names array'.format(a=s))
antenna_nums.append(self.antenna_numbers[np.where(
np.array(self.antenna_names) == s)[0]])
if antenna_nums is not None:
antenna_nums = uvutils.get_iterable(antenna_nums)
if n_selects > 0:
history_update_string += ', antennas'
else:
history_update_string += 'antennas'
n_selects += 1
inds1 = np.zeros(0, dtype=np.int)
inds2 = np.zeros(0, dtype=np.int)
for ant in antenna_nums:
if ant in self.ant_1_array or ant in self.ant_2_array:
wh1 = np.where(self.ant_1_array == ant)[0]
wh2 = np.where(self.ant_2_array == ant)[0]
if len(wh1) > 0:
inds1 = np.append(inds1, list(wh1))
if len(wh2) > 0:
inds2 = np.append(inds2, list(wh2))
else:
if ant_str is not None:
if not (antenna_nums is None and antenna_names is None
and bls is None and polarizations is None):
raise ValueError(
'Cannot provide ant_str with antenna_nums, antenna_names, '
'bls, or polarizations.')
else:
bls, polarizations = self.parse_ants(ant_str)
# Antennas, times and blt_inds all need to be combined into a set of
# blts indices to keep.
# test for blt_inds presence before adding inds from antennas & times
if blt_inds is not None:
blt_inds = uvutils.get_iterable(blt_inds)
history_update_string += 'baseline-times'
n_selects += 1
if antenna_names is not None:
if antenna_nums is not None:
raise ValueError(
'Only one of antenna_nums and antenna_names can be provided.')
antenna_names = uvutils.get_iterable(antenna_names)
antenna_nums = []
for s in antenna_names:
if s not in self.antenna_names:
raise ValueError(
'Antenna name {a} is not present in the antenna_names array'.format(a=s))
antenna_nums.append(self.antenna_numbers[np.where(
np.array(self.antenna_names) == s)[0]])