Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def pixshapemap(shape, wcs, bsize=1000, separable="auto", signed=False):
"""Returns the physical width and heigh of each pixel in the map in radians.
Heavy for big maps. Much faster approaches are possible for known pixelizations."""
if wcsutils.is_plain(wcs):
cdelt = wcs.wcs.cdelt
pshape = np.zeros([2])
pshape[0] = wcs.wcs.cdelt[1]*get_unit(wcs)
pshape[1] = wcs.wcs.cdelt[0]*get_unit(wcs)
if not signed: pshape = np.abs(pshape)
pshape = np.broadcast_to(pshape[:,None,None], (2,)+shape[-2:])
return ndmap(pshape, wcs)
elif separable == True or (separable == "auto" and wcsutils.is_cyl(wcs)):
pshape = pixshapes_cyl(shape, wcs, signed=signed)
pshape = np.broadcast_to(pshape[:,:,None], (2,)+shape[-2:])
return ndmap(pshape, wcs)
else:
pshape = zeros((2,)+shape[-2:], wcs)
# Loop over blocks in y to reduce memory usage
for i1 in range(0, shape[-2], bsize):
i2 = min(i1+bsize, shape[-2])
pix = np.mgrid[i1:i2+1,:shape[-1]+1]
with utils.nowarn():
y, x = pix2sky(shape, wcs, pix, safe=True, corner=True)
del pix
dy = y[1:,1:]-y[:-1,:-1]
dx = x[1:,1:]-x[:-1,:-1]
if not signed: dy, dx = np.abs(dy), np.abs(dx)
cy = np.cos(y)
bad= cy<= 0
cy[bad] = np.mean(cy[~bad])
dx *= 0.5*(cy[1:,1:]+cy[:-1,:-1])
def pixshapemap(shape, wcs, bsize=1000, separable="auto", signed=False):
"""Returns the physical width and heigh of each pixel in the map in radians.
Heavy for big maps. Much faster approaches are possible for known pixelizations."""
if wcsutils.is_plain(wcs):
cdelt = wcs.wcs.cdelt
pshape = np.zeros([2])
pshape[0] = wcs.wcs.cdelt[1]*get_unit(wcs)
pshape[1] = wcs.wcs.cdelt[0]*get_unit(wcs)
if not signed: pshape = np.abs(pshape)
pshape = np.broadcast_to(pshape[:,None,None], (2,)+shape[-2:])
return ndmap(pshape, wcs)
elif separable == True or (separable == "auto" and wcsutils.is_cyl(wcs)):
pshape = pixshapes_cyl(shape, wcs, signed=signed)
pshape = np.broadcast_to(pshape[:,:,None], (2,)+shape[-2:])
return ndmap(pshape, wcs)
else:
pshape = zeros((2,)+shape[-2:], wcs)
# Loop over blocks in y to reduce memory usage
for i1 in range(0, shape[-2], bsize):
i2 = min(i1+bsize, shape[-2])
pix = np.mgrid[i1:i2+1,:shape[-1]+1]
with utils.nowarn():
y, x = pix2sky(shape, wcs, pix, safe=True, corner=True)
del pix
dy = y[1:,1:]-y[:-1,:-1]
dx = x[1:,1:]-x[:-1,:-1]
if not signed: dy, dx = np.abs(dy), np.abs(dx)
def rand_gauss_harm(shape, wcs):
"""Mostly equivalent to np.fft.fft2(np.random.standard_normal(shape)),
but avoids the fft by generating the numbers directly in frequency
domain. Does not enforce the symmetry requried for a real map. If box is
passed, the result will be an enmap."""
return ndmap(np.random.standard_normal(shape)+1j*np.random.standard_normal(shape),wcs)
# If any wcs-associated indices are None, then we don't know how to update the
# wcs, and assume the user knows what it's doing
if any([s is None for s in sel2]):
return ndmap(np.ndarray.__getitem__(self, sel), self.wcs)
if len(sel2) > 2:
raise IndexError("too many indices")
# If the wcs slice includes direct indexing, so that wcs
# axes are lost, then degrade to a normal numpy array,
# since this class assumes that the two last axes are
# wcs axes.
if any([type(s) is not slice for s in sel2]):
return np.asarray(self)[sel]
# Otherwise we will return a full ndmap, including a
# (possibly) sliced wcs.
_, wcs = slice_geometry(self.shape[-2:], self.wcs, sel2)
return ndmap(np.ndarray.__getitem__(self, sel), wcs)
def __getslice__(self, a, b=None, c=None): return self[slice(a,b,c)]
def downgrade(emap, factor, op=np.mean):
"""Returns enmap "emap" downgraded by the given integer factor
(may be a list for each direction, or just a number) by averaging
inside pixels."""
fact = np.full(2, 1, dtype=int)
fact[:] = factor
tshape = emap.shape[-2:]//fact*fact
res = op(np.reshape(emap[...,:tshape[0],:tshape[1]],emap.shape[:-2]+(tshape[0]//fact[0],fact[0],tshape[1]//fact[1],fact[1])),(-3,-1))
try: return ndmap(res, emap[...,::fact[0],::fact[1]].wcs)
except AttributeError: return res
def lmap(shape, wcs, oversample=1):
"""Return a map of all the wavenumbers in the fourier transform
of a map with the given shape and wcs."""
ly, lx = laxes(shape, wcs, oversample=oversample)
data = np.empty((2,ly.size,lx.size))
data[0] = ly[:,None]
data[1] = lx[None,:]
return ndmap(data, wcs)
self.threshold = threshold
@property
def ndim(self): return len(self.shape)
@property
def geometry(self): return self.shape, self.wcs
@property
def npix(self): return self.shape[-2]*self.shape[-1]
def __str__(self): return repr(self)
def __repr__(self): return "ndmap_proxy(fname=%s, shape=%s, wcs=%s, dtype=%s)" % (self.fname, str(self.shape), str(self.wcs), str(self.dtype))
def __getslice__(self, a, b=None, c=None): return self[slice(a,b,c)]
def __getitem__(self, sel): raise NotImplementedError("ndmap_proxy must be subclassed")
# Copy over some methos from ndmap
for name in ["sky2pix", "pix2sky", "box", "pixbox_of", "posmap", "pixmap", "lmap", "modlmap", "modrmap", "area", "pixsize", "pixshape",
"pixsizemap", "pixshapemap", "extent", "distance_from", "center", "extract", "extract_pixbox"]:
setattr(ndmap_proxy, name, getattr(ndmap, name))
class ndmap_proxy_fits(ndmap_proxy):
def __init__(self, hdu, wcs, fname="", threshold=1e7, verbose=False):
self.hdu = hdu
self.verbose = verbose
# Note that 'section' is not part of some HDU types, such as CompImageHDU.
self.use_section = hasattr(hdu, 'section')
if self.use_section:
dtype = fix_endian(hdu.section[(slice(0,1),)*hdu.header["NAXIS"]]).dtype
else:
dtype = fix_endian(hdu.data[(slice(0,1),)*hdu.header["NAXIS"]]).dtype
self.stokes_flips = get_stokes_flips(hdu)
def slist(vals):
return ",".join([str(v) for v in vals])
if verbose and np.any(self.stokes_flips) >= 0:
print("Converting index %s for Stokes axis %s from IAU to COSMO in %s" % (
"""Generates a random map with component covariance
cov in harmonic space, where cov is a (comp,comp,l) array or a
(comp,comp,Ny,Nx) array. Despite the name, the map doesn't need
to be isotropic since 2D power spectra are allowed.
If cov.ndim is 4, cov is assumed to be an array of 2D power spectra.
else cov is assumed to be an array of 1D power spectra.
If pixel_units is True, the 2D power spectra is assumed to be in pixel units,
not in steradians."""
if cov.ndim==4:
if not(pixel_units): cov = cov * np.prod(shape[-2:])/area(shape,wcs )
covsqrt = multi_pow(cov, 0.5)
else:
covsqrt = spec2flat(shape, wcs, massage_spectrum(cov, shape), 0.5, mode="constant")
data = map_mul(covsqrt, rand_gauss_harm(shape, wcs))
return ndmap(data, wcs)
def has_wcs(m):
try:
m.wcs
return True
except AttributeError:
return False
if wcs is None:
if has_wcs(arr):
wcs = arr.wcs
elif isinstance(arr, list) and len(arr) > 0 and has_wcs(arr[0]):
wcs = arr[0].wcs
else:
wcs = wcsutils.WCS(naxis=2)
if copy:
arr = np.asanyarray(arr, dtype=dtype).copy()
return ndmap(arr, wcs)