Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_read_raster_window_input_list(cleantopo_br):
process_zoom = 5
conf = dict(**cleantopo_br.dict)
conf["output"].update(metatiling=1)
with mapchete.open(conf) as mp:
mp.batch_process(process_zoom)
tiles = [
(tile, mp.config.output.get_path(tile))
for tile in mp.config.output_pyramid.tiles_from_bounds(
mp.config.bounds, process_zoom
)
if path_exists(mp.config.output.get_path(tile))
]
upper_tile = next(mp.get_process_tiles(process_zoom - 1))
assert len(tiles) > 1
resampled = resample_from_array(
in_raster=create_mosaic(
[(tile, read_raster_window(path, tile)) for tile, path in tiles]
),
out_tile=upper_tile
)
resampled2 = read_raster_window(
[p for _, p in tiles], upper_tile, src_nodata=0, dst_nodata=0
)
assert resampled.dtype == resampled2.dtype
assert resampled.shape == resampled2.shape
assert np.array_equal(resampled.mask, resampled2.mask)
# TODO slight rounding errors occur
assert np.allclose(resampled, resampled2, rtol=0.01)
assert np.all(np.where(out_array == 1, True, False))
# not intersecting tile
out_tile = BufferedTilePyramid("geodetic").tile(7, 0, 0)
out_array = resample_from_array(in_data, in_tile.affine, out_tile)
assert isinstance(out_array, ma.masked_array)
assert out_array.mask.all()
# data as tuple
in_data = (np.ones(in_tile.shape[1:]), )
out_tile = BufferedTilePyramid("geodetic").tile(6, 10, 10)
out_array = resample_from_array(in_data, in_tile.affine, out_tile)
# deprecated
resample_from_array(in_data, in_tile.affine, out_tile, nodataval=-9999)
# errors
with pytest.raises(TypeError):
in_data = "invalid_type"
resample_from_array(in_data, in_tile.affine, out_tile)
with pytest.raises(TypeError):
in_data = np.ones(in_tile.shape[0])
resample_from_array(in_data, in_tile.affine, out_tile)
out_array = resample_from_array(in_data, in_tile.affine, out_tile)
assert isinstance(out_array, ma.masked_array)
assert out_array.mask.all()
# data as tuple
in_data = (np.ones(in_tile.shape[1:]), )
out_tile = BufferedTilePyramid("geodetic").tile(6, 10, 10)
out_array = resample_from_array(in_data, in_tile.affine, out_tile)
# deprecated
resample_from_array(in_data, in_tile.affine, out_tile, nodataval=-9999)
# errors
with pytest.raises(TypeError):
in_data = "invalid_type"
resample_from_array(in_data, in_tile.affine, out_tile)
with pytest.raises(TypeError):
in_data = np.ones(in_tile.shape[0])
resample_from_array(in_data, in_tile.affine, out_tile)
)
for x in output_tiles
])
])
else:
lower_tiles = [
y for y in chain(*[x.get_children() for x in output_tiles])
]
mosaic = raster.create_mosaic(
[
(lower_tile, self.output_reader.read(lower_tile))
for lower_tile in lower_tiles
],
nodata=self.output_reader.output_params["nodata"]
)
process_data = raster.resample_from_array(
in_raster=mosaic.data,
in_affine=mosaic.affine,
out_tile=self.tile,
resampling=self.config_baselevels["lower"],
nodata=self.output_reader.output_params["nodata"]
)
logger.debug((self.tile.id, "generated from baselevel", str(t)))
return process_data
# performance reasons as for the usual case overview tiles do not need the
# process pyramid pixelbuffers.
tile = self.config_baselevels["tile_pyramid"].tile(*self.tile.id)
# get output_tiles that intersect with process tile
output_tiles = (
list(self.output_reader.pyramid.tiles_from_bounds(tile.bounds, tile.zoom))
if tile.pixelbuffer > self.output_reader.pyramid.pixelbuffer
else self.output_reader.pyramid.intersecting(tile)
)
with Timer() as t:
# resample from parent tile
if baselevel == "higher":
parent_tile = self.tile.get_parent()
process_data = raster.resample_from_array(
self.output_reader.read(parent_tile),
in_affine=parent_tile.affine,
out_tile=self.tile,
resampling=self.config_baselevels["higher"],
nodata=self.output_reader.output_params["nodata"]
)
# resample from children tiles
elif baselevel == "lower":
if self.output_reader.pyramid.pixelbuffer:
lower_tiles = set([
y for y in chain(*[
self.output_reader.pyramid.tiles_from_bounds(
x.bounds, x.zoom + 1
)
for x in output_tiles
])
def _interpolate_from_baselevel(self, tile=None, baselevel=None):
with Timer() as t:
# resample from parent tile
if baselevel == "higher":
parent_tile = tile.get_parent()
process_data = raster.resample_from_array(
in_raster=self.get_raw_output(parent_tile, _baselevel_readonly=True),
in_affine=parent_tile.affine,
out_tile=tile,
resampling=self.config.baselevels["higher"],
nodataval=self.config.output.nodata
)
# resample from children tiles
elif baselevel == "lower":
mosaic, mosaic_affine = raster.create_mosaic([
(
child_tile,
self.get_raw_output(child_tile, _baselevel_readonly=True)
)
for child_tile in self.config.baselevels["tile_pyramid"].tile(
*tile.id
).get_children()