Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_mosaic_antimeridian():
"""Create mosaic using tiles on opposing antimeridian sides."""
zoom = 5
row = 0
pixelbuffer = 5
tp = BufferedTilePyramid("geodetic", pixelbuffer=pixelbuffer)
west = tp.tile(zoom, row, 0)
east = tp.tile(zoom, row, tp.matrix_width(zoom) - 1)
mosaic = create_mosaic([
(west, np.ones(west.shape).astype("uint8")),
(east, np.ones(east.shape).astype("uint8") * 2)
])
assert isinstance(mosaic, ReferencedRaster)
# Huge array gets initialized because the two tiles are on opposing sides of the
# projection area. The below test should pass if the tiles are stitched together next
# to each other.
assert mosaic.data.shape == (1, west.height, west.width * 2 - 2 * pixelbuffer)
assert mosaic.data[0][0][0] == 2
assert mosaic.data[0][0][-1] == 1
# If tiles from opposing sides from Antimeridian are mosaicked it will happen that the
in_tile=tile, in_data=data, out_profile=out_profile,
out_path=path
)
with rasterio.open(path, 'r') as src:
assert src.read().any()
assert src.meta["driver"] == out_profile["driver"]
assert src.transform == tile.affine
if out_profile["compress"]:
assert src.compression == Compression(
out_profile["compress"].upper())
finally:
shutil.rmtree(path, ignore_errors=True)
# with metatiling
tile = BufferedTilePyramid("geodetic", metatiling=4).tile(5, 1, 1)
data = ma.masked_array(np.ones((2, ) + tile.shape))
out_tile = BufferedTilePyramid("geodetic").tile(5, 5, 5)
out_profile = dict(
driver="GTiff", count=2, dtype="uint8", compress="lzw", nodata=0,
height=out_tile.height, width=out_tile.width,
affine=out_tile.affine)
try:
write_raster_window(
in_tile=tile, in_data=data, out_profile=out_profile,
out_tile=out_tile, out_path=path
)
with rasterio.open(path, 'r') as src:
assert src.shape == out_tile.shape
assert src.read().any()
assert src.meta["driver"] == out_profile["driver"]
assert src.transform == out_profile["transform"]
finally:
shutil.rmtree(path, ignore_errors=True)
def test_read_raster_window_reproject(dummy1_3857_tif, minmax_zoom):
"""Read array with read_raster_window."""
zoom = 8
# with reproject
config_raw = minmax_zoom.dict
config_raw["input"].update(file1=dummy1_3857_tif)
config = MapcheteConfig(config_raw)
rasterfile = config.params_at_zoom(zoom)["input"]["file1"]
dummy1_bbox = rasterfile.bbox()
pixelbuffer = 5
tile_pyramid = BufferedTilePyramid("geodetic", pixelbuffer=pixelbuffer)
tiles = list(tile_pyramid.tiles_from_geom(dummy1_bbox, zoom))
# target window out of CRS bounds
band = read_raster_window(
dummy1_3857_tif, tile_pyramid.tile(12, 0, 0))
assert isinstance(band, ma.MaskedArray)
assert band.mask.all()
# not intersecting tile
tiles.append(tile_pyramid.tile(zoom, 1, 1)) # out of CRS bounds
tiles.append(tile_pyramid.tile(zoom, 16, 1)) # out of file bbox
for tile in tiles:
for band in read_raster_window(dummy1_3857_tif, tile):
assert isinstance(band, ma.MaskedArray)
assert band.shape == tile.shape
bands = read_raster_window(dummy1_3857_tif, tile, [1])
assert isinstance(bands, ma.MaskedArray)
assert bands.shape == tile.shape
def test_read_raster_window(dummy1_tif, minmax_zoom):
"""Read array with read_raster_window."""
zoom = 8
# without reproject
config = MapcheteConfig(minmax_zoom.path)
rasterfile = config.params_at_zoom(zoom)["input"]["file1"]
dummy1_bbox = rasterfile.bbox()
pixelbuffer = 5
tile_pyramid = BufferedTilePyramid("geodetic", pixelbuffer=pixelbuffer)
tiles = list(tile_pyramid.tiles_from_geom(dummy1_bbox, zoom))
# add edge tile
tiles.append(tile_pyramid.tile(8, 0, 0))
for tile in tiles:
width, height = tile.shape
for band in read_raster_window(dummy1_tif, tile):
assert isinstance(band, ma.MaskedArray)
assert band.shape == (width, height)
for index in range(1, 4):
band = read_raster_window(dummy1_tif, tile, index)
assert isinstance(band, ma.MaskedArray)
assert band.shape == (width, height)
for index in [None, [1, 2, 3]]:
band = read_raster_window(dummy1_tif, tile, index)
assert isinstance(band, ma.MaskedArray)
assert band.ndim == 3
def test_s3_write_output_data(mp_s3_tmpdir):
"""Write and read output."""
output_params = dict(
grid="geodetic",
format="PNG_hillshade",
path=mp_s3_tmpdir,
pixelbuffer=0,
metatiling=1
)
output = png_hillshade.OutputDataWriter(output_params)
assert output.path == mp_s3_tmpdir
assert output.file_extension == ".png"
tp = BufferedTilePyramid("geodetic")
tile = tp.tile(5, 5, 5)
# get_path
assert output.get_path(tile) == os.path.join(*[
mp_s3_tmpdir, "5", "5", "5"+".png"])
# profile
assert isinstance(output.profile(tile), dict)
# write full array
data = np.ones(tile.shape) * 128
output.write(tile, data)
# tiles_exist
assert output.tiles_exist(tile)
# read
data = output.read(tile)
assert isinstance(data, np.ndarray)
assert not data.mask.any()
def test_process_tile_open(example_mapchete):
"""Raise ValueError on MapcheteProcess.open()."""
config = MapcheteConfig(example_mapchete.path)
tile = BufferedTilePyramid("mercator").tile(7, 1, 1)
user_process = mapchete.MapcheteProcess(
tile=tile,
params=config.params_at_zoom(tile.zoom),
input=config.get_inputs_for_tile(tile),
)
with pytest.raises(ValueError):
user_process.open("nonexisting_id")
def test_read_raster_window_mask(s2_band):
"""No resampling artefacts on mask edges."""
tile = BufferedTilePyramid("geodetic").tile(zoom=13, row=1918, col=8905)
data = read_raster_window(
s2_band, tile, resampling="cubic", src_nodata=0, dst_nodata=0)
assert data.any()
assert not np.where(data == 1, True, False).any()
def write_output_metadata(output_params):
"""Dump output JSON and verify parameters if output metadata exist."""
if "path" in output_params:
metadata_path = os.path.join(output_params["path"], "metadata.json")
logger.debug("check for output %s", metadata_path)
try:
existing_params = read_output_metadata(metadata_path)
logger.debug("%s exists", metadata_path)
logger.debug("existing output parameters: %s", pformat(existing_params))
existing_tp = existing_params["pyramid"]
current_params = params_to_dump(output_params)
logger.debug("current output parameters: %s", pformat(current_params))
current_tp = BufferedTilePyramid(**current_params["pyramid"])
if existing_tp != current_tp: # pragma: no cover
raise MapcheteConfigError(
"pyramid definitions between existing and new output do not match: "
"%s != %s" % (existing_tp, current_tp)
)
existing_format = existing_params["driver"]["format"]
current_format = current_params["driver"]["format"]
if existing_format != current_format: # pragma: no cover
raise MapcheteConfigError(
"existing output format does not match new output format: "
"%s != %s" % (
(existing_format, current_format)
)
)
except FileNotFoundError:
logger.debug("%s does not exist", metadata_path)
def params_to_dump(params):
# in case GridDefinition was not yet initialized
return dict(
pyramid=BufferedTilePyramid(
grid=params["grid"],
tile_size=params.get("tile_size", 256),
metatiling=params.get("metatiling", 1),
pixelbuffer=params.get("pixelbuffer", 0),
).to_dict(),
driver={
k: v
for k, v in params.items()
if k not in ["path", "grid", "pixelbuffer", "metatiling"]
}
)
)
params["pyramid"]["grid"]["shape"] = [1, 2]
if "crs" in grid and isinstance(grid["crs"], str):
crs = CRS.from_string(grid["crs"])
warnings.warn(
DeprecationWarning(
"Deprecated 'srs' found in %s: '%s'. "
"Use WKT representation instead: %s" % (
metadata_json, grid["crs"], pformat(dict(wkt=crs.to_wkt()))
)
)
)
params["pyramid"]["grid"].update(srs=dict(wkt=crs.to_wkt()))
params.update(
pyramid=BufferedTilePyramid(
params["pyramid"]["grid"],
metatiling=params["pyramid"].get("metatiling", 1),
tile_size=params["pyramid"].get("tile_size", 256),
pixelbuffer=params["pyramid"].get("pixelbuffer", 0)
)
)
return params