Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_auto_detect(driver_path, provider):
from terracotta import drivers
db = drivers.get_driver(driver_path)
assert db.__class__.__name__ == DRIVER_CLASSES[provider]
assert drivers.get_driver(driver_path, provider=provider) is db
def test_metadata_cache_insertion(tmpdir, raster_file):
from terracotta import drivers
dbfile = tmpdir.join('test.sqlite')
db = drivers.get_driver(str(dbfile), provider='sqlite')
keys = ('some', 'keys')
db.create(keys)
db.insert(['some', 'value'], str(raster_file))
metadata_cache = db._metadata_cache
db.key_names
assert ('keys', db) in metadata_cache
db.db_version
assert ('db_version', db) in metadata_cache
db.get_datasets(page=0, limit=10)
assert ('datasets', db, None, 0, 10) in metadata_cache
def test_compute_metadata_nocrick(big_raster_file_nodata, monkeypatch):
with rasterio.open(str(big_raster_file_nodata)) as src:
data = src.read(1, masked=True)
valid_data = np.ma.masked_invalid(data).compressed()
convex_hull = convex_hull_exact(src)
from terracotta import exceptions
import terracotta.drivers.raster_base
with monkeypatch.context() as m:
m.setattr(terracotta.drivers.raster_base, 'has_crick', False)
with pytest.warns(exceptions.PerformanceWarning):
mtd = terracotta.drivers.raster_base.RasterDriver.compute_metadata(
str(big_raster_file_nodata), use_chunks=True
)
# compare
np.testing.assert_allclose(mtd['valid_percentage'], 100 * valid_data.size / data.size)
np.testing.assert_allclose(mtd['range'], (valid_data.min(), valid_data.max()))
np.testing.assert_allclose(mtd['mean'], valid_data.mean())
np.testing.assert_allclose(mtd['stdev'], valid_data.std())
# allow error of 1%, since we only compute approximate quantiles
np.testing.assert_allclose(
mtd['percentiles'],
np.percentile(valid_data, np.arange(1, 100)),
rtol=2e-2
)
def test_compute_metadata_nocrick(big_raster_file, monkeypatch):
with rasterio.open(str(big_raster_file)) as src:
data = src.read(1)
valid_data = data[np.isfinite(data) & (data != src.nodata)]
dataset_shape = list(rasterio.features.dataset_features(
src, bidx=1, as_mask=True, geographic=True
))
convex_hull = MultiPolygon([shape(s['geometry']) for s in dataset_shape]).convex_hull
from terracotta import exceptions
import terracotta.drivers.raster_base
with monkeypatch.context() as m:
m.setattr(terracotta.drivers.raster_base, 'has_crick', False)
with pytest.warns(exceptions.PerformanceWarning):
mtd = terracotta.drivers.raster_base.RasterDriver.compute_metadata(
str(big_raster_file), use_chunks=True
)
# compare
np.testing.assert_allclose(mtd['valid_percentage'], 100 * valid_data.size / data.size)
np.testing.assert_allclose(mtd['range'], (valid_data.min(), valid_data.max()))
np.testing.assert_allclose(mtd['mean'], valid_data.mean())
np.testing.assert_allclose(mtd['stdev'], valid_data.std())
# allow error of 1%, since we only compute approximate quantiles
np.testing.assert_allclose(
mtd['percentiles'],
np.percentile(valid_data, np.arange(1, 100)),
data = src.read(1)
valid_data = data[np.isfinite(data) & (data != src.nodata)]
dataset_shape = list(rasterio.features.dataset_features(
src, bidx=1, as_mask=True, geographic=True
))
convex_hull = MultiPolygon([shape(s['geometry']) for s in dataset_shape]).convex_hull
from terracotta import exceptions
import terracotta.drivers.raster_base
with monkeypatch.context() as m:
m.setattr(terracotta.drivers.raster_base, 'has_crick', False)
with pytest.warns(exceptions.PerformanceWarning):
mtd = terracotta.drivers.raster_base.RasterDriver.compute_metadata(
str(big_raster_file), use_chunks=True
)
# compare
np.testing.assert_allclose(mtd['valid_percentage'], 100 * valid_data.size / data.size)
np.testing.assert_allclose(mtd['range'], (valid_data.min(), valid_data.max()))
np.testing.assert_allclose(mtd['mean'], valid_data.mean())
np.testing.assert_allclose(mtd['stdev'], valid_data.std())
# allow error of 1%, since we only compute approximate quantiles
np.testing.assert_allclose(
mtd['percentiles'],
np.percentile(valid_data, np.arange(1, 100)),
rtol=2e-2
)
def test_invalid_insertion(monkeypatch, driver_path, provider, raster_file):
from terracotta import drivers
db = drivers.get_driver(driver_path, provider=provider)
keys = ('keyname',)
db.create(keys)
def throw(*args, **kwargs):
raise NotImplementedError()
with monkeypatch.context() as m:
m.setattr(db, 'compute_metadata', throw)
db.insert(['bar'], str(raster_file), skip_metadata=True)
with pytest.raises(NotImplementedError):
db.insert(['foo'], str(raster_file), skip_metadata=False)
datasets = db.get_datasets()
def test_creation_invalid_description(driver_path, provider):
from terracotta import drivers
db = drivers.get_driver(driver_path, provider=provider)
keys = ('some', 'keynames')
with pytest.raises(ValueError):
db.create(keys, key_descriptions={'unknown_key': 'blah'})
def test_multiprocessing_fallback(driver_path, provider, raster_file, monkeypatch):
import concurrent.futures
from importlib import reload
from terracotta import drivers
def dummy(*args, **kwargs):
raise OSError('monkeypatched')
with monkeypatch.context() as m:
m.setattr(concurrent.futures, 'ProcessPoolExecutor', dummy)
import terracotta.drivers.raster_base
reload(terracotta.drivers.raster_base)
db = drivers.get_driver(driver_path, provider=provider)
keys = ('some', 'keynames')
db.create(keys)
db.insert(['some', 'value'], str(raster_file))
db.insert(['some', 'other_value'], str(raster_file))
data1 = db.get_raster_tile(['some', 'value'], tile_size=(256, 256))
assert data1.shape == (256, 256)
data2 = db.get_raster_tile(['some', 'other_value'], tile_size=(256, 256))
assert data2.shape == (256, 256)
np.testing.assert_array_equal(data1, data2)
def test_creation(driver_path, provider):
from terracotta import drivers
db = drivers.get_driver(driver_path, provider=provider)
keys = ('some', 'keynames')
db.create(keys)
assert db.key_names == keys
assert db.get_datasets() == {}
def test_insertion_and_retrieval(driver_path, provider, raster_file):
from terracotta import drivers
db = drivers.get_driver(driver_path, provider=provider)
keys = ('some', 'keynames')
db.create(keys)
db.insert(['some', 'value'], str(raster_file))
data = db.get_datasets()
assert list(data.keys()) == [('some', 'value')]
assert data[('some', 'value')] == str(raster_file)
metadata = db.get_metadata(('some', 'value'))
assert all(key in metadata for key in METADATA_KEYS)