Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_warped_vrt_set_src_crs(path_rgb_byte_tif, tmpdir):
"""A VirtualVRT without CRS works only with src_crs"""
path_crs_unset = str(tmpdir.join('rgb_byte_crs_unset.tif'))
_copy_update_profile(path_rgb_byte_tif, path_crs_unset, crs=None)
with rasterio.open(path_rgb_byte_tif) as src:
original_crs = src.crs
with rasterio.open(path_crs_unset) as src:
with pytest.raises(Exception):
with WarpedVRT(src, dst_crs=DST_CRS) as vrt:
pass
with WarpedVRT(src, src_crs=original_crs, dst_crs=DST_CRS) as vrt:
assert vrt.src_crs == original_crs
def test_boundless_masks_read_prohibited(path_rgb_byte_tif):
"""Boundless masks read of a VRT is prohibited"""
with rasterio.open(path_rgb_byte_tif) as src, WarpedVRT(src) as vrt:
with pytest.raises(ValueError):
vrt.read_masks(
boundless=True,
window=Window(-200, -200, 1000, 1000),
out_shape=((3, 600, 600)),
)
def test_open_datasets(capfd, path_rgb_byte_tif):
"""Number of open datasets is expected"""
with rasterio.Env() as env:
with rasterio.open(path_rgb_byte_tif) as src:
env._dump_open_datasets()
captured = capfd.readouterr()
assert "1 N GTiff" in captured.err
assert "1 S GTiff" not in captured.err
with WarpedVRT(src) as vrt:
env._dump_open_datasets()
captured = capfd.readouterr()
assert "2 N GTiff" in captured.err
env._dump_open_datasets()
captured = capfd.readouterr()
assert "1 N GTiff" not in captured.err
xres = (right - left) / dst_width
yres = (top - bottom) / dst_height
dst_transform = affine.Affine(
xres, 0.0, left, 0.0, -yres, top)
# The 'complex' test case hits the affected code path
vrt_options = {'dst_crs': dst_crs}
if complex:
vrt_options.update(
dst_crs=dst_crs,
dst_height=dst_height,
dst_width=dst_width,
dst_transform=dst_transform,
resampling=Resampling.nearest)
with WarpedVRT(src, **vrt_options) as vrt:
rio_shutil.copy(vrt, vrt_path, driver='VRT')
with rasterio.open(vrt_path) as src:
assert src.crs
Defaults to ``None``, in which case the value of `src_nodata` will be
used if provided, or ``0`` otherwise.
Returns
-------
A :py:class:`rasterio.vrt.WarpedVRT` instance with the transformation.
"""
vrt_params = dict(
crs=crs,
resampling=Resampling.bilinear,
src_nodata=src_nodata,
dst_nodata=dst_nodata)
return WarpedVRT(source, **vrt_params)
def _get_vrt(src: DatasetReader, rs_method: int) -> WarpedVRT:
from terracotta.drivers.raster_base import RasterDriver
target_crs = RasterDriver._TARGET_CRS
vrt_transform, vrt_width, vrt_height = calculate_default_transform(
src.crs, target_crs, src.width, src.height, *src.bounds
)
vrt = WarpedVRT(
src, crs=target_crs, resampling=rs_method, transform=vrt_transform,
width=vrt_width, height=vrt_height
)
return vrt
forward_band_tags: bool, optional
Forward band tags to output bands.
Ref: https://github.com/cogeotiff/rio-cogeo/issues/19
quiet: bool, optional (default: False)
Mask processing steps.
temporary_compression: str, optional
Compression used for the intermediate file, default is deflate.
"""
if isinstance(indexes, int):
indexes = (indexes,)
config = config or {}
with rasterio.Env(**config):
with ExitStack() as ctx:
if isinstance(source, (DatasetReader, DatasetWriter, WarpedVRT)):
src_dst = source
else:
src_dst = ctx.enter_context(rasterio.open(source))
meta = src_dst.meta
indexes = indexes if indexes else src_dst.indexes
nodata = nodata if nodata is not None else src_dst.nodata
dtype = dtype if dtype else src_dst.dtypes[0]
alpha = utils.has_alpha_band(src_dst)
mask = utils.has_mask_band(src_dst)
if not add_mask and (
(nodata is not None or alpha)
and dst_kwargs.get("compress") in ["JPEG", "jpeg"]
):
warnings.warn(
def _read(
src, indexes, dst_bounds, dst_shape, dst_crs, resampling, src_nodata, dst_nodata
):
height, width = dst_shape[-2:]
if indexes is None:
dst_shape = (len(src.indexes), height, width)
indexes = list(src.indexes)
src_nodata = src.nodata if src_nodata is None else src_nodata
dst_nodata = src.nodata if dst_nodata is None else dst_nodata
dst_left, dst_bottom, dst_right, dst_top = dst_bounds
with WarpedVRT(
src,
crs=dst_crs,
src_nodata=src_nodata,
nodata=dst_nodata,
width=width,
height=height,
transform=affine_from_bounds(
dst_left, dst_bottom, dst_right, dst_top, width, height
),
resampling=Resampling[resampling]
) as vrt:
return vrt.read(
window=vrt.window(*dst_bounds),
out_shape=dst_shape,
indexes=indexes,
masked=True
default_name: str, optional
The name of the data array if none exists. Default is None.
**open_kwargs: kwargs, optional
Optional keyword arguments to pass into rasterio.open().
Returns
-------
:obj:`xarray.Dataset` | :obj:`xarray.DataArray` | List[:obj:`xarray.Dataset`]:
The newly created dataset(s).
"""
parse_coordinates = True if parse_coordinates is None else parse_coordinates
masked = masked or mask_and_scale
vrt_params = None
if isinstance(filename, rasterio.io.DatasetReader):
filename = filename.name
elif isinstance(filename, rasterio.vrt.WarpedVRT):
vrt = filename
filename = vrt.src_dataset.name
vrt_params = dict(
src_crs=vrt.src_crs.to_string(),
crs=vrt.crs.to_string(),
resampling=vrt.resampling,
tolerance=vrt.tolerance,
src_nodata=vrt.src_nodata,
nodata=vrt.nodata,
width=vrt.width,
height=vrt.height,
src_transform=vrt.src_transform,
transform=vrt.transform,
dtype=vrt.working_dtype,
warp_extras=vrt.warp_extras,
)