Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_extract(self):
# Tests that extraction is sensible
shape,wcs = enmap.geometry(pos=(0,0),shape=(500,500),res=0.01)
imap = enmap.enmap(np.random.random(shape),wcs)
smap = imap[200:300,200:300]
sshape,swcs = smap.shape,smap.wcs
smap2 = enmap.extract(imap,sshape,swcs)
pixbox = enmap.pixbox_of(imap.wcs,sshape,swcs)
# Do write and read test
filename = "temporary_extract_map.fits" # NOT THREAD SAFE
enmap.write_map(filename,imap)
smap3 = enmap.read_map(filename,pixbox=pixbox)
os.remove(filename)
assert np.all(np.isclose(smap,smap2))
assert np.all(np.isclose(smap,smap3))
assert wcsutils.equal(smap.wcs,smap2.wcs)
assert wcsutils.equal(smap.wcs,smap3.wcs)
def pixshapemap(shape, wcs, bsize=1000, separable="auto", signed=False):
"""Returns the physical width and heigh of each pixel in the map in radians.
Heavy for big maps. Much faster approaches are possible for known pixelizations."""
if wcsutils.is_plain(wcs):
cdelt = wcs.wcs.cdelt
pshape = np.zeros([2])
pshape[0] = wcs.wcs.cdelt[1]*get_unit(wcs)
pshape[1] = wcs.wcs.cdelt[0]*get_unit(wcs)
if not signed: pshape = np.abs(pshape)
pshape = np.broadcast_to(pshape[:,None,None], (2,)+shape[-2:])
return ndmap(pshape, wcs)
elif separable == True or (separable == "auto" and wcsutils.is_cyl(wcs)):
pshape = pixshapes_cyl(shape, wcs, signed=signed)
pshape = np.broadcast_to(pshape[:,:,None], (2,)+shape[-2:])
return ndmap(pshape, wcs)
else:
pshape = zeros((2,)+shape[-2:], wcs)
# Loop over blocks in y to reduce memory usage
for i1 in range(0, shape[-2], bsize):
i2 = min(i1+bsize, shape[-2])
pix = np.mgrid[i1:i2+1,:shape[-1]+1]
with utils.nowarn():
y, x = pix2sky(shape, wcs, pix, safe=True, corner=True)
del pix
dy = y[1:,1:]-y[:-1,:-1]
dx = x[1:,1:]-x[:-1,:-1]
if not signed: dy, dx = np.abs(dy), np.abs(dx)
cy = np.cos(y)
def topix(pos_off):
unit = utils.degree if not wcsutils.is_plain(map.wcs) else 1.0
pix = map.sky2pix(np.array([float(w) for w in pos_off[:2]])*unit)
pix += np.array([float(w) for w in pos_off[2:]])
return pix[::-1].astype(int)
def skippable(x,y):
Default: Same as arr.
copy : boolean
If true, arr is copied. Otherwise, a referance is kept."""
def has_wcs(m):
try:
m.wcs
return True
except AttributeError:
return False
if wcs is None:
if has_wcs(arr):
wcs = arr.wcs
elif isinstance(arr, list) and len(arr) > 0 and has_wcs(arr[0]):
wcs = arr[0].wcs
else:
wcs = wcsutils.WCS(naxis=2)
if copy:
arr = np.asanyarray(arr, dtype=dtype).copy()
return ndmap(arr, wcs)
def gnomonic_pole_wcs(shape, res):
Ny, Nx = shape[-2:]
wcs = wcsutils.WCS(naxis=2)
wcs.wcs.ctype = ['RA---TAN', 'DEC--TAN']
wcs.wcs.crval = [0., 0.]
wcs.wcs.cdelt[:] = np.rad2deg(res)
wcs.wcs.crpix = [Ny / 2. + 0.5, Nx / 2. + 0.5]
return wcs
def scale_geometry(shape, wcs, scale):
scale = np.zeros(2)+scale
oshape = tuple(shape[:-2])+tuple(utils.nint(shape[-2:]*scale))
owcs = wcsutils.scale(wcs, scale, rowmajor=True)
return oshape, owcs
def __repr__(self):
return "ndmap(%s,%s)" % (np.asarray(self), wcsutils.describe(self.wcs))
def __str__(self): return repr(self)
# Ideally we want to integrate around the real outer edge
# of our patch, which is displaced by half a pixel coordinate
# from the pixel centers, but sometimes those coordinates are
# not valid. The nobcheck should avoid that, but we still include
# them to be safe. For the case where nobcheck avoids invalid values,
# the clip() later cuts off the parts of the pixels that go outside
# bounds. This differs from using the backup points by also handling
# pixels that are offset from the poles by a non-half-integer amount.
for dest_list, test_points in [
(col_lims, [( -0.5, 0.0), ( 0.0, 0.0)]),
(col_lims, [(n1-0.5, 0.0), (n1-1.0, 0.0)]),
(row_lims, [(0.0, -0.5), (0.0, 0.0)]),
(row_lims, [(0.0, n2-0.5), (0.0, n2-1.0)]),
]:
for t in test_points:
if not np.any(np.isnan(wcsutils.nobcheck(wcs).wcs_pix2world([t], 0))):
dest_list.append(np.array(t, float))
break
else:
raise ValueError("Could not identify map_boundary; last test point was %s" % t)
# We want to draw a closed patch connecting the four corners
# of the boundary.
col_lims = [_c[0] for _c in col_lims]
row_lims = [_r[1] for _r in row_lims]
vertices = np.array([
(col_lims[0], row_lims[0]),
(col_lims[1], row_lims[0]),
(col_lims[1], row_lims[1]),
(col_lims[0], row_lims[1]),
(col_lims[0], row_lims[0])])
total = 0
tot_dra = 0
def calc_gridinfo(shape, wcs, steps=[2,2], nstep=[200,200], zenith=False, unit=1):
"""Return an array of line segments representing a coordinate grid
for the given shape and wcs. the steps argument describes the
number of points to use along each meridian."""
if unit in ["d","degree"]: unit = 1.0
elif unit in ["m","arcmin"]: unit = 1.0/60
elif unit in ["s","arcsec"]: unit = 1.0/3600
steps = (np.zeros([2])+steps)*unit
nstep = np.zeros([2],dtype=int)+nstep
gridinfo = Gridinfo()
if wcsutils.is_plain(wcs):
box = np.sort(enmap.box(shape, wcs),0)
start = np.floor(box[0]/steps)*steps
nline = np.floor(box[1]/steps)-np.floor(box[0]/steps)+1
else:
box = np.array([[-90.,0.],[90.,360.]])
start = np.array([-90.,0.])
nline = np.array([180.,360.])/steps+1
gridinfo.lon = []
gridinfo.lat = []
gridinfo.shape = shape[-2:]
gridinfo.wcs = wcs
# Draw lines of longitude
for phi in start[1] + np.arange(nline[1])*steps[1]:
# Loop over theta
pixs = np.array(wcsutils.nobcheck(wcs).wcs_world2pix(phi, np.linspace(box[0,0],box[1,0],nstep[0],endpoint=True), 0)).T
def project(map, shape, wcs, order=3, mode="constant", cval=0.0, force=False, prefilter=True, mask_nan=False, safe=True, bsize=1000):
"""Project the map into a new map given by the specified
shape and wcs, interpolating as necessary. Handles nan
regions in the map by masking them before interpolating.
This uses local interpolation, and will lose information
when downgrading compared to averaging down."""
# Skip expensive operation if map is compatible
if not force:
if wcsutils.equal(map.wcs, wcs) and tuple(shape[-2:]) == tuple(shape[-2:]):
return map.copy()
elif wcsutils.is_compatible(map.wcs, wcs) and mode == "constant":
return extract(map, shape, wcs, cval=cval)
omap = zeros(map.shape[:-2]+shape[-2:], wcs, map.dtype)
# Save memory by looping over rows
for i1 in range(0, shape[-2], bsize):
i2 = min(i1+bsize, shape[-2])
somap = omap[...,i1:i2,:]
pix = map.sky2pix(somap.posmap(), safe=safe)
y1 = max(np.min(pix[0]).astype(int)-3,0)
y2 = min(np.max(pix[0]).astype(int)+3,map.shape[-2])
pix[0] -= y1
somap[:] = utils.interpol(map[...,y1:y2,:], pix, order=order, mode=mode, cval=cval, prefilter=prefilter, mask_nan=mask_nan)
return omap