Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def sample_geometry():
gb = geometry.GeoBox(40, 40, Affine(2500, 0.0, 1200000.0, 0.0, -2500, -4300000.0), geometry.CRS('EPSG:3577'))
json = gb.extent.json
return json
def test_pickleable():
poly = geometry.polygon([(10, 20), (20, 20), (20, 10), (10, 20)], crs=epsg4326)
pickled = pickle.dumps(poly, pickle.HIGHEST_PROTOCOL)
unpickled = pickle.loads(pickled)
assert poly == unpickled
def test_sinusoidal_comparison(self):
a = geometry.CRS("""PROJCS["unnamed",
GEOGCS["Unknown datum based upon the custom spheroid",
DATUM["Not specified (based on custom spheroid)",
SPHEROID["Custom spheroid",6371007.181,0]],
PRIMEM["Greenwich",0],UNIT["degree",0.0174532925199433]],PROJECTION["Sinusoidal"],
PARAMETER["longitude_of_center",0],PARAMETER["false_easting",0],
PARAMETER["false_northing",0],UNIT["Meter",1]]""")
b = geometry.CRS("""PROJCS["unnamed",GEOGCS["unnamed ellipse",
DATUM["unknown",SPHEROID["unnamed",6371007.181,0]],
PRIMEM["Greenwich",0],UNIT["degree",0.0174532925199433]],PROJECTION["Sinusoidal"],
PARAMETER["longitude_of_center",0],PARAMETER["false_easting",0],
PARAMETER["false_northing",0],UNIT["Meter",1]]""")
c = geometry.CRS('+a=6371007.181 +b=6371007.181 +units=m +y_0=0 +proj=sinu +lon_0=0 +no_defs +x_0=0')
assert a == b
assert a == c
assert b == c
assert a != epsg4326
# judgement. What it should test is that reading native from the
# ingested product gives exactly the same results as reading into the
# same GeoBox from the original product. Separate to that there
# should be a read test that confirms that what you read from native
# product while changing projection is of expected value
# Make the retrieved data lower res
ss = 100
shape_x = int(GEOTIFF['shape']['x'] / ss)
shape_y = int(GEOTIFF['shape']['y'] / ss)
pixel_x = int(GEOTIFF['pixel_size']['x'] * ss)
pixel_y = int(GEOTIFF['pixel_size']['y'] * ss)
input_type_name = 'ls5_nbar_albers'
input_type = dc.index.products.get_by_name(input_type_name)
geobox = geometry.GeoBox(shape_x + 2, shape_y + 2,
Affine(pixel_x, 0.0, GEOTIFF['ul']['x'], 0.0, pixel_y, GEOTIFF['ul']['y']),
geometry.CRS(GEOTIFF['crs']))
observations = dc.find_datasets(product='ls5_nbar_albers', geopolygon=geobox.extent)
group_by = query_group_by('time')
sources = dc.group_datasets(observations, group_by)
data = dc.load_data(sources, geobox, input_type.measurements.values())
assert hashlib.md5(data.green.data).hexdigest() == '0f64647bad54db4389fb065b2128025e'
assert hashlib.md5(data.blue.data).hexdigest() == '41a7b50dfe5c4c1a1befbc378225beeb'
for time_slice in range(time_slices):
assert data.blue.values[time_slice][-1, -1] == -999
def test_gridworkflow():
""" Test GridWorkflow with padding option. """
from mock import MagicMock
import datetime
# ----- fake a datacube -----
# e.g. let there be a dataset that coincides with a grid cell
fakecrs = geometry.CRS('EPSG:4326')
grid = 100 # spatial frequency in crs units
pixel = 10 # square pixel linear dimension in crs units
# if cell(0,0) has lower left corner at grid origin,
# and cell indices increase toward upper right,
# then this will be cell(1,-2).
gridspec = GridSpec(crs=fakecrs, tile_size=(grid, grid), resolution=(-pixel, pixel)) # e.g. product gridspec
fakedataset = MagicMock()
fakedataset.extent = geometry.box(left=grid, bottom=-grid, right=2*grid, top=-2*grid, crs=fakecrs)
fakedataset.center_time = t = datetime.datetime(2001, 2, 15)
fakeindex = PickableMock()
fakeindex._db = None
fakeindex.datasets.get_field_names.return_value = ['time'] # permit query on time
fakeindex.datasets.search_eager.return_value = [fakedataset]
def check_open_with_api(index):
from datacube import Datacube
dc = Datacube(index=index)
input_type_name = 'ls5_nbar_albers'
input_type = dc.index.products.get_by_name(input_type_name)
geobox = GeoBox(200, 200, Affine(25, 0.0, 1500000, 0.0, -25, -3900000), geometry.CRS('EPSG:3577'))
observations = dc.find_datasets(product='ls5_nbar_albers', geopolygon=geobox.extent)
group_by = query_group_by('time')
sources = dc.group_datasets(observations, group_by)
data = dc.load_data(sources, geobox, input_type.measurements.values())
assert data.blue.shape == (1, 200, 200)
LAYER_SPEC = {
'ls8_nbar_rgb': {
'product': 'ls8_nbar_albers',
'bands': ('red', 'green', 'blue'),
'extents': geometry.box(100, -50, 160, 0, crs=geometry.CRS('EPSG:4326')),
'time': {
'start': datetime(2013, 1, 1),
'end': datetime(2017, 1, 1),
'period': timedelta(days=0)
}
},
'ls8_l1t_rgb': {
'product': 'ls8_l1t_scene',
'bands': ('red', 'green', 'blue'),
'extents': geometry.box(100, -50, 160, 0, crs=geometry.CRS('EPSG:4326')),
'time': {
'start': datetime(2013, 1, 1),
'end': datetime(2017, 1, 1),
'period': timedelta(days=0)
}
},
'modis_mcd43a4_rgb': {
'product': 'modis_mcd43a4_tile',
'bands': ('Nadir_Reflectance_Band1', 'Nadir_Reflectance_Band4', 'Nadir_Reflectance_Band3'),
'extents': geometry.box(100, -50, 160, 0, crs=geometry.CRS('EPSG:4326')),
'time': {
'start': datetime(2013, 1, 1),
'end': datetime(2017, 1, 1),
'period': timedelta(days=0)
}
}
(input_coords['left'], input_coords['top'])]
return geometry.line(points, crs=input_crs)
else:
if input_coords['top'] == input_coords['bottom']:
points = [(input_coords['left'], input_coords['top']),
(input_coords['right'], input_coords['top'])]
return geometry.line(points, crs=input_crs)
else:
points = [
(input_coords['left'], input_coords['top']),
(input_coords['right'], input_coords['top']),
(input_coords['right'], input_coords['bottom']),
(input_coords['left'], input_coords['bottom']),
(input_coords['left'], input_coords['top'])
]
return geometry.polygon(points, crs=input_crs)
return None
def polygon_from_sources_extents(sources, geobox):
sources_union = geometry.unary_union(source.extent.to_crs(geobox.crs) for source in sources)
valid_data = geobox.extent.intersection(sources_union)
resolution = min([abs(x) for x in geobox.resolution])
return valid_data.simplify(tolerance=resolution * 0.01)