Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def check_open_with_api(index, time_slices):
with rasterio.Env():
from datacube import Datacube
dc = Datacube(index=index)
input_type_name = 'ls5_nbar_albers'
input_type = dc.index.products.get_by_name(input_type_name)
geobox = geometry.GeoBox(200, 200, Affine(25, 0.0, 638000, 0.0, -25, 6276000), geometry.CRS('EPSG:28355'))
observations = dc.find_datasets(product='ls5_nbar_albers', geopolygon=geobox.extent)
group_by = query_group_by('time')
sources = dc.group_datasets(observations, group_by)
data = dc.load_data(sources, geobox, input_type.measurements.values())
assert data.blue.shape == (time_slices, 200, 200)
chunk_profile = {'time': 1, 'x': 100, 'y': 100}
lazy_data = dc.load_data(sources, geobox, input_type.measurements.values(), dask_chunks=chunk_profile)
assert lazy_data.blue.shape == (time_slices, 200, 200)
assert (lazy_data.blue.load() == data.blue).all()
def rio_geobox(meta):
""" Construct geobox from src.meta of opened rasterio dataset
"""
if 'crs' not in meta or 'transform' not in meta:
return None
h, w = (meta['height'], meta['width'])
crs = dc_crs_from_rio(meta['crs'])
transform = meta['transform']
return GeoBox(w, h, transform, crs)
gbox = gbx.zoom_out(gbox, 0.873)
yy, roi = _read(gbox)
assert roi[0].start > 0 and roi[1].start > 0
assert (yy[0] == -999).all()
yy_expect, _ = rio_slurp(mm.path, gbox)
np.testing.assert_array_equal(yy, yy_expect)
gbox = gbx.zoom_out(mm.gbox[3:-3, 10:-10], 2.1)
yy, roi = _read(gbox)
assert roi_shape(roi) == gbox.shape
assert not (yy == -999).any()
gbox = GeoBox.from_geopolygon(mm.gbox.extent.to_crs(epsg3857).buffer(50),
resolution=mm.gbox.resolution)
assert gbox.extent.contains(mm.gbox.extent.to_crs(epsg3857))
assert gbox.crs != mm.gbox.crs
yy, roi = _read(gbox)
assert roi[0].start > 0 and roi[1].start > 0
assert (yy[0] == -999).all()
gbox = gbx.zoom_out(gbox, 4)
yy, roi = _read(gbox, resampling='average')
nvalid = (yy != -999).sum()
nempty = (yy == -999).sum()
assert nvalid > nempty
def _get_geobox(args):
width = int(args['width'])
height = int(args['height'])
minx, miny, maxx, maxy = map(float, args['bbox'].split(','))
crs = geometry.CRS(args['srs'])
affine = Affine.translation(minx, miny) * Affine.scale((maxx - minx) / width, (maxy - miny) / height)
return geometry.GeoBox(width, height, affine, crs)
def affine_transform_pix(gbox: GeoBox, transform: Affine) -> GeoBox:
"""
Apply affine transform on pixel side.
:param transform: Affine matrix mapping from new pixel coordinate space to
pixel coordinate space of input gbox
:returns: GeoBox of the same pixel shape but covering different region,
pixels in the output gbox relate to input geobox via `transform`
X_old_pix = transform * X_new_pix
"""
H, W = gbox.shape
A = gbox.affine*transform
return GeoBox(W, H, A, gbox.crs)
def rotate(gbox: GeoBox, deg: float) -> GeoBox:
"""
Rotate GeoBox around the center.
It's as if you stick a needle through the center of the GeoBox footprint
and rotate it counter clock wise by supplied number of degrees.
Note that from pixel point of view image rotates the other way. If you have
source image with an arrow pointing right, and you rotate GeoBox 90 degree,
in that view arrow should point down (this is assuming usual case of inverted
y-axis)
"""
h, w = gbox.shape
c0 = gbox.transform*(w*0.5, h*0.5)
A = Affine.rotation(deg, c0)*gbox.transform
return GeoBox(w, h, A, gbox.crs)
from datacube.utils.geometry import CRS, GeoBox
from math import pi
R = 6378137
origin = pi * R
res0 = 2 * pi * R / tile_size
res = res0*(2**(-zoom))
tsz = 2 * pi * R * (2**(-zoom)) # res*tile_size
# maps pixel coord to meters in EPSG:3857
#
transform = Affine(res, 0, tx*tsz - origin,
0, -res, origin - ty*tsz)
return GeoBox(tile_size, tile_size, transform, CRS('epsg:3857'))
def _xarray_geobox(obj):
crs = _norm_crs(_get_crs(obj))
if crs is None:
return None
dims = crs.dimensions
return geometry.GeoBox(obj[dims[1]].size, obj[dims[0]].size, obj.affine, crs)
def _get_data_for_type(self, dataset_type, sources, measurements, geopolygon, slices=None, chunks=None):
dt_data = {}
datasets = list(chain.from_iterable(g for _, g in numpy.ndenumerate(sources)))
if not geopolygon:
geopolygon = get_bounds(datasets, dataset_type.grid_spec.crs)
geobox = geometry.GeoBox.from_geopolygon(geopolygon.to_crs(dataset_type.grid_spec.crs),
dataset_type.grid_spec.resolution)
if slices:
_rename_spatial_keys(slices, geobox.dimensions)
geo_slices = [slices.get(dim, slice(None)) for dim in geobox.dimensions]
geobox = geobox[geo_slices]
for dim, dim_slice in slices.items():
if dim in sources.dims:
sources = sources.isel(dim=dim_slice)
dt_data.update(self._get_data_for_dims(dataset_type, sources, geobox))
dt_data.update(self._get_data_for_measurement(dataset_type, sources, measurements, geobox, dask_chunks=chunks))
return dt_data
if grid_spec is None or grid_spec.crs is None:
raise ValueError("Product has no default CRS. Must specify 'output_crs' and 'resolution'")
crs = grid_spec.crs
if resolution is None:
if grid_spec.resolution is None:
raise ValueError("Product has no default resolution. Must specify 'resolution'")
resolution = grid_spec.resolution
align = align or grid_spec.alignment
if geopolygon is None:
geopolygon = query_geopolygon(**query)
if geopolygon is None:
geopolygon = get_bounds(datasets, crs)
return geometry.GeoBox.from_geopolygon(geopolygon, resolution, crs, align)