Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
full = self._float_params[par]
to_set[full] = float(val)
return AnalyticBeam(model, **to_set)
path = beam_model # beam_model = path to beamfits
uvb = UVBeam()
if use_shared_mem and (mpi.world_comm is not None):
if mpi.rank == 0:
uvb.read_beamfits(path)
uvb.peak_normalize()
for key, attr in uvb.__dict__.items():
if not isinstance(attr, parameter.UVParameter):
continue
if key == '_data_array':
uvb.__dict__[key].value = mpi.shared_mem_bcast(attr.value, root=0)
else:
uvb.__dict__[key].value = mpi.world_comm.bcast(attr.value, root=0)
mpi.world_comm.Barrier()
else:
uvb.read_beamfits(path)
for key, val in self.uvb_params.items():
setattr(uvb, key, val)
uvb.extra_keywords['beam_path'] = path
return uvb
beamfile = '/users/alanman/data/alanman/NickFagnoniBeams/HERA_NicCST_fullfreq.uvbeam'
beam_list = [beamfile]
mock_keywords = {'mock_arrangement': 'random', 'Nsrcs': args.Nsrcs,
'min_alt': min_alt, 'time': input_uv.time_array[0]}
print("Beam: {}".format(beam_list[0]))
params['sources'].update(**mock_keywords)
# Catalog setup
catalog, _ = simsetup.initialize_catalog_from_params(params)
comm = mpi.world_comm
input_uv = comm.bcast(input_uv, root=0)
beam_list = comm.bcast(beam_list, root=0)
beam_dict = comm.bcast(beam_dict, root=0)
catalog = mpi.shared_mem_bcast(catalog, root=0)
if rank == 0:
print("Starting simulation.")
sys.stdout.flush()
uv_out = uvsim.run_uvdata_uvsim(input_uv, beam_list, beam_dict=beam_dict, catalog=catalog)
memory_usage_GB = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1e6
comm.Barrier()
memory_usage_GB = comm.gather(memory_usage_GB, root=0)
if rank == 0:
elapsed_time_sec = pytime.time() - t0
print('Elapsed: ' + str(elapsed_time_sec))
with open(args.time_out, 'w') as timefile:
timefile.write(str(elapsed_time_sec))