Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ingest(raster_file, tmpdir):
from terracotta.scripts import cli
outfile = tmpdir / 'out.sqlite'
input_pattern = str(raster_file.dirpath('{name}.tif'))
runner = CliRunner()
result = runner.invoke(cli.cli, ['ingest', input_pattern, '-o', str(outfile)])
assert result.exit_code == 0
assert outfile.check()
from terracotta import get_driver
driver = get_driver(str(outfile), provider='sqlite')
assert driver.key_names == ('name',)
assert driver.get_datasets() == {('img',): str(raster_file)}
def _s3_db_factory(keys, datasets=None):
from terracotta import get_driver
with tempfile.TemporaryDirectory() as tmpdir:
dbfile = Path(tmpdir) / 'tc.sqlite'
driver = get_driver(dbfile)
driver.create(keys)
if datasets:
for keys, path in datasets.items():
driver.insert(keys, path)
with open(dbfile, 'rb') as f:
db_bytes = f.read()
conn = boto3.resource('s3')
conn.create_bucket(Bucket=bucketname)
s3 = boto3.client('s3')
s3.put_object(Bucket=bucketname, Key='tc.sqlite', Body=db_bytes)
return f's3://{bucketname}/tc.sqlite'
def test_remote_database_cache(s3_db_factory, raster_file, monkeypatch):
keys = ('some', 'keys')
dbpath = s3_db_factory(keys)
from terracotta import get_driver
driver = get_driver(dbpath)
with monkeypatch.context() as m:
# replace TTL cache timer by manual timer
m.setattr(driver, '_checkdb_cache', TTLCache(maxsize=1, ttl=1, timer=Timer()))
assert len(driver._checkdb_cache) == 0
with driver.connect():
assert driver.key_names == keys
assert driver.get_datasets() == {}
modification_date = os.path.getmtime(driver.path)
s3_db_factory(keys, datasets={('some', 'value'): str(raster_file)})
# no change yet
assert driver.get_datasets() == {}
assert os.path.getmtime(driver.path) == modification_date
def test_colormap_consistency(use_testdb, testdb, raster_file_xyz,
stretch_range, cmap_name):
"""Test consistency between /colormap and images returned by /singleband"""
import terracotta
from terracotta.xyz import get_tile_data
from terracotta.handlers import singleband, colormap
ds_keys = ['val21', 'x', 'val22']
# get image with applied stretch and colormap
raw_img = singleband.singleband(ds_keys, raster_file_xyz, stretch_range=stretch_range,
colormap=cmap_name)
img_data = np.asarray(Image.open(raw_img).convert('RGBA'))
# get raw data to compare to
driver = terracotta.get_driver(testdb)
with driver.connect():
tile_data = get_tile_data(driver, ds_keys, tile_xyz=raster_file_xyz,
tile_size=img_data.shape[:2])
# make sure all pixel values are included in colormap
num_values = stretch_range[1] - stretch_range[0] + 1
# get colormap for given stretch
cmap = colormap.colormap(colormap=cmap_name, stretch_range=stretch_range,
num_values=num_values)
cmap = dict(row.values() for row in cmap)
# test nodata
nodata_mask = tile_data.mask
assert np.all(img_data[nodata_mask, -1] == 0)
def test_datasets_handler(testdb, use_testdb):
import terracotta
from terracotta.handlers import datasets
driver = terracotta.get_driver(str(testdb))
keys = driver.key_names
assert datasets.datasets()
assert datasets.datasets() == [dict(zip(keys, pair)) for pair in driver.get_datasets().keys()]
# check key order
assert all(tuple(ds.keys()) == keys for ds in datasets.datasets())
def test_remote_database(s3_db_factory):
keys = ('some', 'keys')
dbpath = s3_db_factory(keys)
from terracotta import get_driver
driver = get_driver(dbpath)
assert driver.key_names == keys
config=None, database_provider=None):
from terracotta import get_driver, update_settings
from terracotta.flask_api import run_app
if config is not None:
update_settings(config)
if (database is None) == (raster_pattern is None):
raise click.UsageError('Either --database or --raster-pattern must be given')
if database is None:
dbfile = tempfile.NamedTemporaryFile(suffix='.sqlite', delete=False)
dbfile.close()
keys, raster_files = raster_pattern
driver = get_driver(dbfile.name, provider='sqlite')
with driver.connect():
driver.create(keys)
for key, filepath in raster_files.items():
driver.insert(key, filepath, compute_metadata=False)
else:
driver = get_driver(database, provider=database_provider)
run_app(driver, debug=debug, profile=profile, preview=not no_browser)
def datasets(some_keys: Mapping[str, str] = None,
page: int = 0, limit: int = 500) -> 'List[OrderedDict[str, str]]':
"""List all available key combinations"""
settings = get_settings()
driver = get_driver(settings.DRIVER_PATH, provider=settings.DRIVER_PROVIDER)
with driver.connect():
dataset_keys = driver.get_datasets(
where=some_keys, page=page, limit=limit
).keys()
key_names = driver.key_names
return [OrderedDict(zip(key_names, ds_keys)) for ds_keys in dataset_keys]
def create_database(raster_pattern, output_file, overwrite=False, skip_metadata=False):
"""Create a new raster database from a collection of raster files.
This command only supports the creation of an SQLite database without any additional metadata.
For more sophisticated use cases use the Python API.
"""
import tqdm
from terracotta import get_driver
if output_file.is_file() and not overwrite:
click.confirm(f'Output file {output_file} exists. Continue?', abort=True)
keys, raster_files = raster_pattern
driver = get_driver(output_file)
pbar = tqdm.tqdm(raster_files.items())
with driver.connect():
driver.create(keys, drop_if_exists=True)
for key, filepath in pbar:
pbar.set_postfix({'file': filepath})
driver.insert(key, filepath, compute_metadata=not skip_metadata)