Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
imap2 = imap.copy()[...,::-1]
oalm = curvedsky.map2alm(imap2.copy(),lmax=lmax)
rmap2 = curvedsky.alm2map(oalm,enmap.empty(shape,wcs),spin=0)
assert np.all(np.isclose(rmap[0],rmap2[0]))
assert np.all(np.isclose(rmap[1],rmap2[1]))
assert np.all(np.isclose(rmap[2],rmap2[2]))
# Flat-sky
px = 2.0
N = 300
shape,iwcs = enmap.geometry(pos=(0,0),res=np.deg2rad(px/60.),shape=(300,300))
shape = (3,) + shape
a = enmap.zeros(shape,iwcs)
a = a[...,::-1]
wcs = a.wcs
seed = 100
imap = enmap.rand_map(shape,wcs,ps_cmb,seed=seed)
kmap = enmap.map2harm(imap.copy())
rmap = enmap.harm2map(kmap,spin=0) # reference map
imap = imap[...,::-1]
kmap = enmap.map2harm(imap.copy())
rmap2 = enmap.harm2map(kmap,spin=0)[...,::-1] # comparison map
assert np.all(np.isclose(rmap[0],rmap2[0]))
assert np.all(np.isclose(rmap[1],rmap2[1],atol=1e0))
assert np.all(np.isclose(rmap[2],rmap2[2],atol=1e0))
if rot:
print("Computing rotated positions")
s1, s2 = rot.split(",")
opos = coordinates.transform(s2, s1, pmap[::-1], pol=ncomp == 3)
pmap[...] = opos[1::-1]
if len(opos) == 3:
psi = -opos[2].copy()
del opos
print("Projecting")
res = curvedsky.alm2map_pos(alm, pmap)
if rot and ncomp == 3:
print("Rotating polarization vectors")
res[1:3] = enmap.rotate_pol(res[1:3], psi)
else:
print("Projecting")
res = enmap.zeros((len(alm),) + shape[-2:], wcs, dtype)
res = curvedsky.alm2map(alm, res)
if return_alm: return res,alm
return res
def hwexpand(mflat, nrow=-1, ncol=-1, transpose=False):
"""Stack the maps in mflat[n,ny,nx] into a single flat map mflat[nrow,ncol,ny,nx]"""
n, ny, nx = mflat.shape
if nrow < 0 and ncol < 0:
ncol = int(np.ceil(n**0.5))
if nrow < 0: nrow = (n+ncol-1)//ncol
if ncol < 0: ncol = (n+nrow-1)//nrow
if not transpose:
omap = enmap.zeros([nrow,ncol,ny,nx],mflat.wcs,mflat.dtype)
omap.reshape(-1,ny,nx)[:n] = mflat
else:
omap = enmap.zeros([ncol,nrow,ny,nx],mflat.wcs,mflat.dtype)
omap.reshape(-1,ny,nx)[:n] = mflat
omap = np.transpose(omap,(1,0,2,3))
return omap
def eval_srcs_loop(posmap, poss, amps, beam, cres, nhit, cell_srcs, dtype=np.float64, op=np.add, verbose=False):
# Loop through each cell
ncy, ncx = nhit.shape
model = enmap.zeros(amps.shape[-1:]+posmap.shape[-2:], posmap.wcs, dtype)
for cy in range(ncy):
for cx in range(ncx):
nsrc = nhit[cy,cx]
if verbose: print("map cell %5d/%d with %5d srcs" % (cy*ncx+cx+1, ncy*ncx, nsrc))
if nsrc == 0: continue
srcs = cell_srcs[cy,cx,:nsrc]
y1,y2 = (cy+0)*cres[0], (cy+1)*cres[0]
x1,x2 = (cx+0)*cres[1], (cx+1)*cres[1]
pixpos = posmap[:,y1:y2,x1:x2]
srcpos = poss[srcs].T # [2,nsrc]
srcamp = amps[srcs].T # [ncomp,nsrc]
r = utils.angdist(pixpos[::-1,None,:,:],srcpos[::-1,:,None,None])
bpix = (r - beam[0,0])/(beam[0,1]-beam[0,0])
# Evaluate the beam at these locations
bval = utils.interpol(beam[1], bpix[None], mode="constant", order=1, mask_nan=False) # [nsrc,ry,rx]
cmodel = srcamp[:,:,None,None]*bval
# simpler later.
wcs = wcsutils.WCS(naxis=2)
wcs.wcs.ctype = ["RA---CAR","DEC--CAR"]
wcs.wcs.crval = [ra_ref,0]
wcs.wcs.cdelt = [360./nx,-180./nytot]
wcs.wcs.crpix = [nx/2.0+1,nytot/2.0+1]
# Then find the subset that includes the dec range we want
y1= utils.nint(wcs.wcs_world2pix(0,decrange[0],0)[1])
y2= utils.nint(wcs.wcs_world2pix(0,decrange[1],0)[1])
y1, y2 = min(y1,y2), max(y1,y2)
# Offset wcs to start at our target range
ny = y2-y1
wcs.wcs.crpix[1] -= y1
# Construct the map. +1 to put extra pixel at pole when we are fullsky
if verbose: print("Allocating shape %s dtype %s intermediate map" % (dims+(ny+1,nx),np.dtype(dtype).char))
tmap = enmap.zeros(dims+(ny+1,nx),wcs,dtype=dtype)
return tmap
if oshape is None:
if res is None: res = min(np.abs(imap.wcs.wcs.cdelt))*utils.degree/2
oshape, owcs = enmap.thumbnail_geometry(r=r, res=res, proj=proj)
# Check if we should be doing polarization rotation
pol_compat = imap.ndim >= 3 and imap.shape[-3] == 3
if pol is None: pol = pol_compat
if pol and not pol_compat: raise ValueError("Polarization rotation requested, but can't interpret map shape %s as IQU map" % (str(imap.shape)))
nsrc = len(coords)
if verbose: print("Extracting %d %dx%d thumbnails from %s map" % (nsrc, oshape[-2], oshape[-1], str(imap.shape)))
opos = enmap.posmap(oshape, owcs)
# Get the pixel area around each of the coordinates
rtot = r + apod
apod_pix = utils.nint(apod/(np.min(np.abs(imap.wcs.wcs.cdelt))*utils.degree))
pixboxes = enmap.neighborhood_pixboxes(imap.shape, imap.wcs, coords, rtot)
# Define our output maps, which we will fill below
omaps = enmap.zeros((nsrc,)+imap.shape[:-2]+oshape, owcs, imap.dtype)
for si, pixbox in enumerate(pixboxes):
if oversample > 1:
# Make the pixbox fft-friendly
for i in range(2):
pixbox[1,i] = pixbox[0,i] + fft.fft_len(pixbox[1,i]-pixbox[0,i], direction="above", factors=[2,3,5])
ithumb = imap.extract_pixbox(pixbox)
if extensive: ithumb /= ithumb.pixsizemap()
ithumb = ithumb.apod(apod_pix, fill="median")
if filter is not None: ithumb = filter(ithumb)
if verbose:
print("%4d/%d %6.2f %6.2f %8.2f %dx%d" % (si+1, nsrc, coords[si,0]/utils.degree, coords[si,1]/utils.degree, np.max(ithumb), ithumb.shape[-2], ithumb.shape[-1]))
# Oversample using fourier if requested. We do this because fourier
# interpolation is better than spline interpolation overall
if oversample > 1:
fshape = utils.nint(np.array(oshape[-2:])*oversample)
ithumb = ithumb.resample(fshape, method="fft")
for i1 in range(0, shape[-2], bsize):
curvedsky.alm2map(kappa_alm, kappa_map[...,i1:i1+bsize,:])
del kappa_alm
if "a" in output: grad_map = enmap.empty((2,)+shape[-2:], wcs, dtype=dtype)
if "u" in output: cmb_raw = enmap.empty(shape, wcs, dtype=dtype)
if "l" in output: cmb_obs = enmap.empty(shape, wcs, dtype=dtype)
# Then loop over dec bands
for i1 in range(0, shape[-2], bsize):
i2 = min(i1+bsize, shape[-2])
lshape, lwcs = enmap.slice_geometry(shape, wcs, (slice(i1,i2),slice(None)))
if "p" in output:
if verbose: print("Computing phi map")
curvedsky.alm2map(phi_alm, phi_map[...,i1:i2,:])
if verbose: print("Computing grad map")
if "a" in output: grad = grad_map[...,i1:i2,:]
else: grad = enmap.zeros((2,)+lshape[-2:], lwcs, dtype=dtype)
curvedsky.alm2map(phi_alm, grad, deriv=True)
if "l" not in output: continue
if verbose: print("Computing observed coordinates")
obs_pos = enmap.posmap(lshape, lwcs)
if verbose: print("Computing alpha map")
raw_pos = enmap.samewcs(offset_by_grad(obs_pos, grad, pol=shape[-3]>1, geodesic=geodesic), obs_pos)
del obs_pos, grad
if "u" in output:
if verbose: print("Computing unlensed map")
curvedsky.alm2map(cmb_alm, cmb_raw[...,i1:i2,:], spin=spin)
if verbose: print("Computing lensed map")
cmb_obs[...,i1:i2,:] = curvedsky.alm2map_pos(cmb_alm, raw_pos[:2], oversample=oversample, spin=spin)
if raw_pos.shape[0] > 2 and np.any(raw_pos[2]):
if verbose: print("Rotating polarization")
cmb_obs[...,i1:i2,:] = enmap.rotate_pol(cmb_obs[...,i1:i2,:], raw_pos[2])
del raw_pos