Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_grid_filter_valid(self):
lons = np.array([-170, -30, 30, 170])
lats = np.array([20, -40, 50, -80])
swath_def = geometry.SwathDefinition(lons, lats)
filter_area = geometry.AreaDefinition('test', 'test', 'test',
{'proj' : 'eqc', 'lon_0' : 0.0, 'lat_0' : 0.0},
8, 8,
(-20037508.34, -10018754.17, 20037508.34, 10018754.17))
filter = np.array([[1, 1, 1, 1, 0, 0, 0, 0],
[1, 1, 1, 1, 0, 0, 0, 0],
[1, 1, 1, 1, 0, 0, 0, 0],
[1, 1, 1, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 1, 1, 1],
[0, 0, 0, 0, 1, 1, 1, 1],
[0, 0, 0, 0, 1, 1, 1, 1],
[0, 0, 0, 0, 1, 1, 1, 1],
])
grid_filter = geo_filter.GridFilter(filter_area, filter)
valid_index = grid_filter.get_valid_index(swath_def)
expected = np.array([1, 0, 0, 1])
def test_swath_not_equal(self):
lats1 = np.array([65.9, 65.86, 65.82, 65.78])
lons = np.array([1.2, 1.3, 1.4, 1.5])
lats2 = np.array([65.91, 65.85, 65.80, 65.75])
swath_def = geometry.SwathDefinition(lons, lats1)
swath_def2 = geometry.SwathDefinition(lons, lats2)
self.assertFalse(swath_def == swath_def2, 'swath_defs are not expected to be equal')
def test_overlaps(self):
"""Test if two areas overlap.
"""
lons1 = np.array([[0, 90], [-90, 180]])
lats1 = np.array([[89, 89], [89, 89]])
area1 = geometry.SwathDefinition(lons1, lats1)
lons2 = np.array([[45, 135], [-45, -135]])
lats2 = np.array([[89, 89], [89, 89]])
area2 = geometry.SwathDefinition(lons2, lats2)
self.assertTrue(area1.overlaps(area2))
self.assertTrue(area2.overlaps(area1))
lons1 = np.array([[0, 45], [135, 90]])
lats1 = np.array([[89, 89], [89, 89]])
area1 = geometry.SwathDefinition(lons1, lats1)
lons2 = np.array([[180, -135], [-45, -90]])
lats2 = np.array([[89, 89], [89, 89]])
area2 = geometry.SwathDefinition(lons2, lats2)
def patch_geometry():
"""Patching the geometry module
"""
geometry.OldAreaDefinition = geometry.AreaDefinition
geometry.AreaDefinition = FakeAreaDefinition
geometry.OldSwathDefinition = geometry.SwathDefinition
geometry.SwathDefinition = FakeSwathDefinition
def test_self_map_multi(self):
data = np.column_stack((self.tb37v, self.tb37v, self.tb37v))
swath_def = geometry.SwathDefinition(lons=self.lons, lats=self.lats)
if sys.version_info < (2, 6):
res = kd_tree.resample_gauss(swath_def, data, swath_def,
radius_of_influence=70000, sigmas=[56500, 56500, 56500])
else:
with warnings.catch_warnings(record=True) as w:
res = kd_tree.resample_gauss(swath_def, data, swath_def,
radius_of_influence=70000, sigmas=[56500, 56500, 56500])
self.assertFalse(len(w) != 1, 'Failed to create neighbour radius warning')
self.assertFalse(('Possible more' not in str(w[0].message)), 'Failed to create correct neighbour radius warning')
self.assertAlmostEqual(res[:, 0].sum() / 100., 668848.082208, 1,
msg='Failed self mapping swath multi for channel 1')
self.assertAlmostEqual(res[:, 1].sum() / 100., 668848.082208, 1,
msg='Failed self mapping swath multi for channel 2')
self.assertAlmostEqual(res[:, 2].sum() / 100., 668848.082208, 1,
msg='Failed self mapping swath multi for channel 3')
Description of parameter `lon` (the default is -76.392900).
radius : type
Description of parameter `radius` (the default is 12e3).
Returns
-------
type
Description of returned object.
"""
from pyresample import utils, geometry
from numpy import arange, vstack, ones
lons = array()
grid1 = geometry.SwathDefinition(lons=arr.longitude, lats=arr.latitude)
grid2 = geometry.SwathDefinition(lons=vstack([lon]), lats=vstack([lat]))
row, col = utils.generate_nearest_neighbour_linesample_arrays(
grid1, grid2, radius)
row = row.flatten()
col = col.flatten()
return arr.sel(x=col).sel(y=row).squeeze()
else:
array = calibrate_refl(subdata, uncertainty, indices)
for (i, idx) in enumerate(indices):
satscene[band_names[idx]] = array[i]
# Get the orbit number
mda = data.attributes()["CoreMetadata.0"]
orbit_idx = mda.index("ORBITNUMBER")
satscene.orbit = mda[orbit_idx + 111:orbit_idx + 116]
# # Get the geolocation
lat, lon = get_lat_lon(satscene, None)
from pyresample import geometry
satscene.area = geometry.SwathDefinition(lons=lon, lats=lat)
# Trimming out dead sensor lines (detectors) on aqua:
# (in addition channel 21 is noisy)
if satscene.satname == "aqua":
for band in ["6", "27", "36"]:
if not satscene[band].is_loaded() or satscene[band].data.mask.all():
continue
width = satscene[band].data.shape[1]
height = satscene[band].data.shape[0]
indices = satscene[band].data.mask.sum(1) < width
if indices.sum() == height:
continue
satscene[band] = satscene[band].data[indices, :]
satscene[band].area = geometry.SwathDefinition(
lons=satscene.area.lons[indices,:],
lats=satscene.area.lats[indices,:])
cols_full = np.arange(shape[1])
rows_full = np.arange(shape[0])
satint = SatelliteInterpolator((lons, lats),
(row_indices,
column_indices),
(rows_full, cols_full))
#satint.fill_borders("y", "x")
lons, lats = satint.interpolate()
try:
from pyresample import geometry
lons = np.ma.masked_array(lons, nodata_mask)
lats = np.ma.masked_array(lats, nodata_mask)
scene.area = geometry.SwathDefinition(lons=lons,
lats=lats)
except ImportError:
scene.area = None
scene.lat = lats
scene.lon = lons
LOG.info("Loading PPS parameters done.")
if minlon > maxlon:
in_lon = np.concatenate((lon[minlon:], lon[0:maxlon]))
in_data = np.concatenate((z[minlat:maxlat, minlon:],
z[minlat:maxlat, 0:maxlon]), 1)
else:
in_lon = lon[minlon:maxlon]
in_data = z[minlat:maxlat, minlon:maxlon]
res = in_data.transpose() * -1
lats, lons = np.meshgrid(lat[minlat:maxlat], in_lon)
orig_def = pyresample.geometry.SwathDefinition(
lons=lons, lats=lats)
target_def = pyresample.geometry.SwathDefinition(
lons=target_lon.astype(np.float64),
lats=target_lat.astype(np.float64))
data = pyresample.kd_tree.resample_nearest(
orig_def, res,
target_def,
radius_of_influence=500000,
fill_value=None,
nprocs=4)
def do_save(filename, data):
np.save(filename, data.filled())
if not os.path.isdir(CACHE_DIR):
os.makedirs(CACHE_DIR)
else:
raise TypeError('orig_lons and orig_lats variable either a DataArray or numpy.ndarray. \n'
'Found type(orig_lons) = %s and type(orig_lats) = %s' %
(type(orig_lons), type(orig_lats)))
if type(orig_field) == xr.core.dataarray.DataArray:
orig_field = orig_field.values
elif type(orig_field) != np.ndarray and \
type(orig_field) != np.ma.core.MaskedArray :
raise TypeError('orig_field must be a type of DataArray, ndarray, or MaskedArray. \n'
'Found type(orig_field) = %s' % type(orig_field))
# prepare for the nearest neighbor mapping
# first define the lat lon points of the original data
orig_grid = pr.geometry.SwathDefinition(lons=orig_lons_1d,
lats=orig_lats_1d)
# the latitudes to which we will we interpolate
num_lats = int((new_grid_max_lat - new_grid_min_lat) / new_grid_delta_lat) + 1
num_lons = int((new_grid_max_lon - new_grid_min_lon) / new_grid_delta_lat) + 1
if (num_lats > 0) and (num_lons > 0):
# linspace is preferred when using floats!
lat_tmp = np.linspace(new_grid_min_lat, new_grid_max_lat, num=int(num_lats))
lon_tmp = np.linspace(new_grid_min_lon, new_grid_max_lon, num=int(num_lons))
new_grid_lon, new_grid_lat = np.meshgrid(lon_tmp, lat_tmp)
# define the lat lon points of the two parts.
new_grid = pr.geometry.GridDefinition(lons=new_grid_lon,
lats=new_grid_lat)